Skip to main content

Overview

This page covers tracing patterns beyond the basics — parallel execution, streaming LLM responses, multi-provider setups, token tracking with different provider SDKs, and sync code.

Concurrent Spans

When your agent runs multiple operations in parallel, use asyncio.gather to run them concurrently. The SDK correctly links each parallel call as a child of the current parent span.
import asyncio
from avaliar import traceable
from openai import AsyncOpenAI

client = AsyncOpenAI()


@traceable("tool")
async def fetch_weather(city: str) -> str:
    # Simulated external call
    await asyncio.sleep(0.1)
    return f"Weather in {city}: Sunny, 22°C"


@traceable("tool")
async def fetch_news(topic: str) -> str:
    await asyncio.sleep(0.1)
    return f"Latest news on {topic}: All quiet."


@traceable(
    "llm",
    model="gpt-4o",
    provider="openai",
)
async def synthesize(messages: list) -> str:
    response = await client.chat.completions.create(
        model="gpt-4o",
        messages=messages,
    )
    return response.choices[0].message.content


@traceable("agent")
async def morning_briefing(city: str, topic: str) -> str:
    # Both tool spans run in parallel under the same parent agent span
    weather, news = await asyncio.gather(
        fetch_weather(city),
        fetch_news(topic),
    )

    messages = [
        {
            "role": "user",
            "content": (
                f"Give me a morning briefing.\n\n"
                f"Weather: {weather}\n"
                f"News: {news}"
            ),
        }
    ]
    return await synthesize(messages)
The resulting trace tree looks like:
morning_briefing  (agent)
  ├── fetch_weather  (tool)   ─ run in parallel
  ├── fetch_news     (tool)   ─ run in parallel
  └── synthesize     (llm)

Multi-Provider Tracing

You can trace calls to different LLM providers in the same application. Set provider accurately so the Avaliar dashboard can break down cost and latency by provider.
from avaliar import traceable
from avaliar.trace import update_current_llm_run
from openai import AsyncOpenAI

openai_client = AsyncOpenAI()

@traceable(
    "llm",
    model="gpt-4o",
    provider="openai",
)
async def openai_generate(messages: list) -> str:
    response = await openai_client.chat.completions.create(
        model="gpt-4o",
        messages=messages,
    )
    update_current_llm_run(
        input_tokens=response.usage.prompt_tokens,
        output_tokens=response.usage.completion_tokens,
    )
    return response.choices[0].message.content

Streaming LLM Responses

For streaming responses, decorate the generator function. The SDK captures the full concatenated response after the generator is exhausted and submits it as a single trace.

Async streaming

from avaliar import traceable
from openai import AsyncOpenAI

client = AsyncOpenAI()


@traceable(
    "llm",
    model="gpt-4o",
    provider="openai",
)
async def stream_generate(messages: list):
    """Yields response chunks. Trace captures the full output."""
    stream = await client.chat.completions.create(
        model="gpt-4o",
        messages=messages,
        stream=True,
    )
    async for chunk in stream:
        content = chunk.choices[0].delta.content
        if content:
            yield content


# Usage: iterate normally — tracing is transparent
async def main() -> None:
    messages = [{"role": "user", "content": "Tell me a short story."}]
    async for chunk in stream_generate(messages):
        print(chunk, end="", flush=True)
    print()  # newline

Sync streaming

from avaliar import traceable
from openai import OpenAI

client = OpenAI()


@traceable(
    "llm",
    model="gpt-4o",
    provider="openai",
)
def stream_generate_sync(messages: list):
    stream = client.chat.completions.create(
        model="gpt-4o",
        messages=messages,
        stream=True,
    )
    for chunk in stream:
        content = chunk.choices[0].delta.content
        if content:
            yield content
Token counts are not available from the streaming API mid-stream. Use update_current_llm_run with the usage object from the final chunk if you need them:
async for chunk in stream:
    if chunk.usage:
        update_current_llm_run(
            input_tokens=chunk.usage.prompt_tokens,
            output_tokens=chunk.usage.completion_tokens,
        )

Synchronous Code

@traceable works with regular (non-async) functions. Use this when integrating with synchronous code or frameworks that don’t support asyncio.
from avaliar import traceable
from openai import OpenAI

client = OpenAI()


@traceable("tool")
def fetch_from_db(query: str) -> list[dict]:
    # Synchronous database call
    return db.execute(query)


@traceable(
    "llm",
    model="gpt-4o",
    provider="openai",
)
def generate_sync(messages: list) -> str:
    response = client.chat.completions.create(
        model="gpt-4o",
        messages=messages,
    )
    return response.choices[0].message.content


@traceable("agent")
def run_pipeline(question: str) -> str:
    records = fetch_from_db(f"SELECT * WHERE topic = '{question}'")
    context = str(records)
    messages = [
        {"role": "system", "content": f"Context: {context}"},
        {"role": "user", "content": question},
    ]
    return generate_sync(messages)
Sync and async functions can be mixed freely in the same trace tree.

Token Tracking

Always call update_current_llm_run from inside a span_type="llm" function to attach token counts. This data drives cost calculations in the Avaliar dashboard.
from avaliar import traceable
from avaliar.trace import update_current_llm_run
from openai import AsyncOpenAI

client = AsyncOpenAI()


@traceable("llm", model="gpt-4o", provider="openai")
async def generate(messages: list) -> str:
    response = await client.chat.completions.create(
        model="gpt-4o",
        messages=messages,
    )

    # Attach token counts — must be called from inside the @traceable function
    update_current_llm_run(
        input_tokens=response.usage.prompt_tokens,
        output_tokens=response.usage.completion_tokens,
    )

    return response.choices[0].message.content
FieldSource
input_tokensresponse.usage.prompt_tokens (OpenAI) / response.usage.input_tokens (Anthropic)
output_tokensresponse.usage.completion_tokens (OpenAI) / response.usage.output_tokens (Anthropic)

Deep Agent Hierarchies

Traces can be arbitrarily deep. This example traces a three-level agent: coordinator → researcher → LLM.
from avaliar import traceable
from openai import AsyncOpenAI

client = AsyncOpenAI()


@traceable(
    "llm",
    model="gpt-4o",
    provider="openai",
)
async def llm_call(messages: list) -> str:
    response = await client.chat.completions.create(
        model="gpt-4o",
        messages=messages,
    )
    return response.choices[0].message.content


@traceable("agent")
async def researcher(topic: str) -> str:
    """Level 2: research sub-agent."""
    messages = [
        {"role": "system", "content": "You are a research assistant."},
        {"role": "user", "content": f"Research this topic: {topic}"},
    ]
    return await llm_call(messages)


@traceable("tool")
async def summarize_findings(findings: list[str]) -> str:
    """Level 2: summarization tool."""
    combined = "\n\n".join(findings)
    messages = [
        {"role": "system", "content": "Summarize the following research findings concisely."},
        {"role": "user", "content": combined},
    ]
    return await llm_call(messages)


@traceable("agent")
async def coordinator(topics: list[str]) -> str:
    """Level 1: top-level coordinator."""
    # Run all researchers concurrently
    import asyncio
    findings = await asyncio.gather(*[researcher(t) for t in topics])

    # Summarize all findings
    return await summarize_findings(list(findings))
Resulting trace tree:
coordinator                    (agent)   — level 1
  ├── researcher: "topic A"    (agent)   — level 2
  │     └── llm_call           (llm)
  ├── researcher: "topic B"    (agent)   — level 2
  │     └── llm_call           (llm)
  └── summarize_findings        (tool)    — level 2
        └── llm_call           (llm)

Custom Metadata

Pass extra fields to the provider API alongside your messages and they’ll be recorded in the trace:
@traceable(
    "llm",
    model="gpt-4o",
    provider="openai",
    temperature=0.2,   # Recorded in generation_info
    top_p=0.9,         # Recorded in generation_info
)
async def precise_generate(messages: list) -> str:
    response = await client.chat.completions.create(
        model="gpt-4o",
        messages=messages,
        temperature=0.2,
        top_p=0.9,
    )
    return response.choices[0].message.content
The temperature and top_p decorator parameters are stored in the trace’s generation_info and shown in the Trace Explorer alongside the prompt and response.

Detection on Specific Spans Only

You don’t need to enable detection on every span — add it only where the safety risk is highest. For example, run detection on the final LLM response but not on intermediate summarization calls:
@traceable(
    "llm",
    model="gpt-4o",
    provider="openai",
)
async def internal_summarize(messages: list) -> str:
    """Internal step — no detection needed."""
    ...


@traceable(
    "llm",
    model="gpt-4o",
    provider="openai",
    detection=True,
    detectors=[DetectorType.PII, DetectorType.TOXICITY],
    detection_mode="cloud",
)
async def user_facing_response(messages: list) -> str:
    """User-facing output — detection enabled."""
    ...

Choosing Between Local and Cloud Detection

LocalCloud
Where it runsYour infrastructureAvaliar’s servers
Additional dependenciesavaliar_eval, OPENAI_API_KEYNone
LatencyDepends on your hardwareLow, managed
Data leaves your environmentNoYes
Plan requirementFreePro
Best forDevelopment, air-gapped systemsProduction
# Development
detection_mode="local"

# Production
detection_mode="cloud"