Streaming

Use AgentKavach with streaming responses from OpenAI, Anthropic, Google, and Mistral.

OpenAI Streaming #

Pass stream=True to guard.create() and iterate over chunks exactly as you would with the OpenAI SDK:

python
guard = AgentKavach(provider="openai", ...)

for chunk in guard.create(model="gpt-4o", messages=msgs, stream=True):
    print(chunk.choices[0].delta.content or "", end="")

Anthropic Streaming #

Anthropic streaming works the same way. Provide max_tokens as required by the Anthropic API:

python
guard = AgentKavach(provider="anthropic", ...)

for chunk in guard.create(
    model="claude-sonnet-4-20250514",
    messages=msgs,
    max_tokens=1024,
    stream=True,
):
    # handle chunk
    pass

Google Streaming #

Google Gemini uses contents instead of messages:

python
guard = AgentKavach(provider="google", ...)

for chunk in guard.create(model="gemini-2.0-flash", contents="...", stream=True):
    # handle chunk
    pass

Mistral Streaming #

Mistral uses an OpenAI-compatible format. Pass stream=True and iterate over chunks using choices[0].delta.content:

python
guard = AgentKavach(provider="mistral", ...)

for chunk in guard.create(model="mistral-large-latest", messages=msgs, stream=True):
    print(chunk.choices[0].delta.content or "", end="")

Budget Tracking with Streaming #

When streaming, cost is calculated after the stream completes. The stream wrapper accumulates tokens as chunks arrive and reports the total cost in post-flight once the stream is fully consumed.

If the budget is exceeded mid-stream, the current stream completes normally — the user receives the full response. However, the next call to guard.create() will raise a BudgetExceededError during pre-flight checks.

Pre-flight vs. Post-flight

Every call goes through two stages:

  • Pre-flight — Checks remaining budget before the LLM call. If budget is already exhausted, raises BudgetExceededError immediately.
  • Post-flight — After the stream completes, calculates actual token usage and cost, updates the spend tracker, and sends telemetry.

ℹ️ GeneratorExit handling

If the consumer stops iterating early (e.g., breaks out of the loop), Python raises GeneratorExit on the stream generator. AgentKavach catches this and still runs post-flight — cost is tracked based on the tokens that were actually streamed before the generator was closed.