Streaming

AgentKavach wraps streaming responses from all four supported providers so output tokens are tracked as chunks arrive. Pass stream=True to guard.create() and iterate the result.

Provider support #

ProviderStatusNotes
OpenAISupportedstream=True on chat completions. Use stream_options={"include_usage": True} for exact token counts. Without it, output tokens are estimated from delta text.
AnthropicSupportedOutput tokens come from the exact output_tokens on the trailing message_delta event; per-chunk content_block_delta events are also counted as a fallback.
Google (Gemini)SupportedPer-chunk .text is estimated chunk-by-chunk; the final chunk's usage_metadata.candidates_token_count provides the exact total.
MistralSupportedRouted to client.chat.stream() under the hood. Counts delta.content per chunk and prefers usage.completion_tokens when the final chunk includes it.

OpenAI #

python
from agentkavach import AgentKavach, Budget

guard = AgentKavach(
    provider="openai",
    api_key="ak_prod_...",
    llm_key="sk-...",
    agent_name="writer",
    budget=Budget.daily(50),
)

for chunk in guard.create(
    model="gpt-4o",
    messages=[{"role": "user", "content": "Write a sonnet about TLS."}],
    stream=True,
    stream_options={"include_usage": True},
):
    delta = chunk.choices[0].delta.content
    if delta:
        print(delta, end="", flush=True)

ℹ️ Token accounting

With include_usage, OpenAI emits a final chunk carrying exact token counts; AgentKavach uses that count for cost. Without it, the wrapper estimates output tokens from chunk text (roughly four characters per token).

Anthropic #

AgentKavach forwards the stream=True kwarg straight to the Anthropic SDK. Anthropic requires max_tokens on every call.

python
guard = AgentKavach(provider="anthropic", api_key="ak_prod_...", llm_key="sk-ant-...")

stream = guard.create(
    model="claude-sonnet-4-0",
    messages=[{"role": "user", "content": "Explain HMAC."}],
    max_tokens=1024,
    stream=True,
)

for event in stream:
    # Anthropic emits typed events: content_block_delta carries the text,
    # message_delta carries the final usage. AgentKavach handles both.
    delta = getattr(event, "delta", None)
    text = getattr(delta, "text", None) if delta is not None else None
    if text:
        print(text, end="", flush=True)

Google (Gemini) #

Gemini streaming returns an iterator of GenerateContentResponse chunks. Each chunk exposes .text for the incremental output; the final chunk carries exact token counts under usage_metadata.

python
guard = AgentKavach(provider="google", api_key="ak_prod_...", llm_key="...")

stream = guard.create(
    model="gemini-2.0-flash",
    contents="Write a haiku about TLS.",
    stream=True,
)

for chunk in stream:
    if chunk.text:
        print(chunk.text, end="", flush=True)

Mistral #

Mistral splits streaming and non-streaming into two separate methods on the SDK (chat.stream vs chat.complete). AgentKavach picks the right one based on the stream kwarg, so the caller-facing API stays the same.

python
guard = AgentKavach(provider="mistral", api_key="ak_prod_...", llm_key="...")

stream = guard.create(
    model="mistral-large-latest",
    messages=[{"role": "user", "content": "Explain HMAC briefly."}],
    stream=True,
)

for event in stream:
    delta = event.data.choices[0].delta
    if delta.content:
        print(delta.content, end="", flush=True)

How budget tracking works #

  • Pre-flight runs before the LLM call. If the budget is already exhausted, the SDK raises BudgetExceededError and never opens the stream.
  • During the stream, the wrapper counts output tokens chunk by chunk. If the kill switch trips mid-stream (because another threshold was crossed earlier), the iterator raises BudgetExceededError on the next next().
  • Post-flight runs when the stream ends — whether it completes normally, the consumer breaks out of the loop, or a GeneratorExit fires. Partial usage is always recorded.

ℹ️ GeneratorExit handling

Breaking out of a for chunk in stream loop closes the generator, which Python signals with GeneratorExit. AgentKavach catches it and records the tokens already streamed; the event is marked partial: true.