Streaming
AgentKavach wraps streaming responses from all four supported providers so output tokens are tracked as chunks arrive. Pass stream=True to guard.create() and iterate the result.
Provider support #
| Provider | Status | Notes |
|---|---|---|
| OpenAI | Supported | stream=True on chat completions. Use stream_options={"include_usage": True} for exact token counts. Without it, output tokens are estimated from delta text. |
| Anthropic | Supported | Output tokens come from the exact output_tokens on the trailing message_delta event; per-chunk content_block_delta events are also counted as a fallback. |
| Google (Gemini) | Supported | Per-chunk .text is estimated chunk-by-chunk; the final chunk's usage_metadata.candidates_token_count provides the exact total. |
| Mistral | Supported | Routed to client.chat.stream() under the hood. Counts delta.content per chunk and prefers usage.completion_tokens when the final chunk includes it. |
OpenAI #
python
from agentkavach import AgentKavach, Budget
guard = AgentKavach(
provider="openai",
api_key="ak_prod_...",
llm_key="sk-...",
agent_name="writer",
budget=Budget.daily(50),
)
for chunk in guard.create(
model="gpt-4o",
messages=[{"role": "user", "content": "Write a sonnet about TLS."}],
stream=True,
stream_options={"include_usage": True},
):
delta = chunk.choices[0].delta.content
if delta:
print(delta, end="", flush=True)ℹ️ Token accounting
With
include_usage, OpenAI emits a final chunk carrying exact token counts; AgentKavach uses that count for cost. Without it, the wrapper estimates output tokens from chunk text (roughly four characters per token).Anthropic #
AgentKavach forwards the stream=True kwarg straight to the Anthropic SDK. Anthropic requires max_tokens on every call.
python
guard = AgentKavach(provider="anthropic", api_key="ak_prod_...", llm_key="sk-ant-...")
stream = guard.create(
model="claude-sonnet-4-0",
messages=[{"role": "user", "content": "Explain HMAC."}],
max_tokens=1024,
stream=True,
)
for event in stream:
# Anthropic emits typed events: content_block_delta carries the text,
# message_delta carries the final usage. AgentKavach handles both.
delta = getattr(event, "delta", None)
text = getattr(delta, "text", None) if delta is not None else None
if text:
print(text, end="", flush=True)Google (Gemini) #
Gemini streaming returns an iterator of GenerateContentResponse chunks. Each chunk exposes .text for the incremental output; the final chunk carries exact token counts under usage_metadata.
python
guard = AgentKavach(provider="google", api_key="ak_prod_...", llm_key="...")
stream = guard.create(
model="gemini-2.0-flash",
contents="Write a haiku about TLS.",
stream=True,
)
for chunk in stream:
if chunk.text:
print(chunk.text, end="", flush=True)Mistral #
Mistral splits streaming and non-streaming into two separate methods on the SDK (chat.stream vs chat.complete). AgentKavach picks the right one based on the stream kwarg, so the caller-facing API stays the same.
python
guard = AgentKavach(provider="mistral", api_key="ak_prod_...", llm_key="...")
stream = guard.create(
model="mistral-large-latest",
messages=[{"role": "user", "content": "Explain HMAC briefly."}],
stream=True,
)
for event in stream:
delta = event.data.choices[0].delta
if delta.content:
print(delta.content, end="", flush=True)How budget tracking works #
- Pre-flight runs before the LLM call. If the budget is already exhausted, the SDK raises
BudgetExceededErrorand never opens the stream. - During the stream, the wrapper counts output tokens chunk by chunk. If the kill switch trips mid-stream (because another threshold was crossed earlier), the iterator raises
BudgetExceededErroron the nextnext(). - Post-flight runs when the stream ends — whether it completes normally, the consumer breaks out of the loop, or a
GeneratorExitfires. Partial usage is always recorded.
ℹ️ GeneratorExit handling
Breaking out of a
for chunk in stream loop closes the generator, which Python signals with GeneratorExit. AgentKavach catches it and records the tokens already streamed; the event is marked partial: true.