Events
Understanding Mesh’s event streaming system.
Overview
Mesh emits streaming events during execution to provide real-time feedback. Events are provider-agnostic and consistent across all agent types.
Event Types
Execution Events
Event Type | When Emitted | Purpose |
---|---|---|
execution_start |
Graph execution begins | Initialize UI, logging |
execution_complete |
Graph execution finishes | Finalize, get results |
Node Events
Event Type | When Emitted | Purpose |
---|---|---|
node_start |
Node begins execution | Track progress |
node_complete |
Node finishes execution | Capture output |
node_error |
Node encounters error | Handle failures |
Streaming Events
Event Type | When Emitted | Purpose |
---|---|---|
token |
Token-by-token streaming | Real-time output |
message_start |
Message begins | Initialize message |
message_complete |
Message finishes | Finalize message |
tool_call_start |
Tool call begins | Track tool usage |
tool_call_complete |
Tool result available | Handle tool output |
Reasoning Events (o1/o3/Claude Extended Thinking)
Event Type | When Emitted | Purpose |
---|---|---|
reasoning_start |
Reasoning block begins | Initialize thinking UI |
reasoning_token |
Reasoning tokens streaming | Display thinking process |
reasoning_end |
Reasoning block completes | Finalize thinking display |
Metadata Events
Event Type | When Emitted | Purpose |
---|---|---|
response_metadata |
Response metadata available | Track usage, costs, timing |
source |
Citations/sources available | Display grounding references (Gemini) |
file |
File attachment available | Handle multi-modal content |
Multi-Step Events
Event Type | When Emitted | Purpose |
---|---|---|
step_start |
Agent step begins | Track multi-step execution |
step_complete |
Agent step finishes | Capture step metadata |
Custom Events
Event Type | When Emitted | Purpose |
---|---|---|
custom_data |
Custom data event | Extensibility (progress, RLM, etc.) |
Event Structure
ExecutionEvent
@dataclass
class ExecutionEvent:
type: EventType # Event type enum
node_id: str # Source node ID
content: Optional[str] # Text content (for tokens)
output: Optional[Dict] # Structured output
metadata: Dict[str, Any] # Extra information
timestamp: float # Event timestamp
error: Optional[str] # Error message (if any)
Handling Events
Basic Pattern
async for event in executor.execute(input, context):
if event.type == "token":
print(event.content, end="", flush=True)
elif event.type == "execution_complete":
print(f"\nDone! Output: {event.output}")
All Event Types
async for event in executor.execute(input, context):
# Execution lifecycle
if event.type == "execution_start":
print("[Starting execution]")
elif event.type == "execution_complete":
print(f"\n[Complete] Final output: {event.output}")
# Node lifecycle
elif event.type == "node_start":
print(f"\n[{event.node_id}] Starting...")
elif event.type == "node_complete":
print(f"[{event.node_id}] ✓ Complete")
elif event.type == "node_error":
print(f"[{event.node_id}] ✗ Error: {event.error}")
# Streaming
elif event.type == "token":
print(event.content, end="", flush=True)
elif event.type == "message_start":
# Message beginning
pass
elif event.type == "message_complete":
# Message finished
print()
# Tool calls
elif event.type == "tool_call_start":
tool_name = event.metadata.get("tool_name")
print(f"\n[Calling tool: {tool_name}]")
elif event.type == "tool_call_complete":
result = event.output
print(f"[Tool result: {result}]")
# Reasoning (o1/o3/Claude Extended Thinking)
elif event.type == "reasoning_start":
print("\n[Thinking...]")
elif event.type == "reasoning_token":
print(event.content, end="", flush=True)
elif event.type == "reasoning_end":
print("\n[Done thinking]")
# Metadata
elif event.type == "response_metadata":
usage = event.metadata.get("usage", {})
tokens = usage.get("total_tokens", 0)
print(f"\n[Usage: {tokens} tokens]")
elif event.type == "source":
sources = event.metadata.get("sources", [])
print(f"\n[Sources: {len(sources)} citations]")
elif event.type == "file":
filename = event.metadata.get("name")
print(f"\n[File: {filename}]")
# Multi-step execution
elif event.type == "step_start":
step = event.metadata.get("step_index", 0)
print(f"\n[Step {step + 1}]")
elif event.type == "step_complete":
usage = event.metadata.get("usage", {})
print(f"[Step complete - {usage.get('total_tokens', 0)} tokens]")
# Custom data events
elif event.type == "custom_data":
data_type = event.metadata.get("data_type")
print(f"\n[Custom: {data_type}]")
Collecting Response
full_response = ""
metadata = {}
async for event in executor.execute(input, context):
if event.type == "token":
full_response += event.content
elif event.type == "execution_complete":
metadata = event.metadata
print(f"Response: {full_response}")
print(f"Metadata: {metadata}")
Advanced Event Handling
Reasoning Events (o1/o3 Models)
OpenAI’s o1/o3 models and Claude’s Extended Thinking emit reasoning tokens before the final answer:
reasoning_content = ""
response_content = ""
async for event in executor.execute(input, context):
if event.type == "reasoning_start":
print("🤔 Thinking...\n")
elif event.type == "reasoning_token":
# Display thinking process
reasoning_content += event.content
print(event.content, end="", flush=True)
elif event.type == "reasoning_end":
print("\n\n✅ Done thinking\n")
elif event.type == "token":
# Regular response tokens
response_content += event.content
print(event.content, end="", flush=True)
print(f"\nReasoning: {len(reasoning_content)} chars")
print(f"Response: {len(response_content)} chars")
Usage Tracking
Track token usage and costs in real-time:
total_tokens = 0
total_cost = 0.0
async for event in executor.execute(input, context):
if event.type == "response_metadata":
usage = event.metadata.get("usage", {})
model = event.metadata.get("model_id", "")
prompt_tokens = usage.get("prompt_tokens", 0)
completion_tokens = usage.get("completion_tokens", 0)
total_tokens = usage.get("total_tokens", 0)
# Calculate cost (example rates)
if "gpt-4" in model:
cost = (prompt_tokens * 0.03 + completion_tokens * 0.06) / 1000
elif "gpt-3.5" in model:
cost = (prompt_tokens * 0.001 + completion_tokens * 0.002) / 1000
total_cost += cost
print(f"\n💰 Tokens: {total_tokens} | Cost: ${cost:.4f}")
elif event.type == "step_complete":
# Per-step usage (multi-step agents)
step_usage = event.metadata.get("usage", {})
step_tokens = step_usage.get("total_tokens", 0)
print(f"Step {event.metadata.get('step_index') + 1}: {step_tokens} tokens")
Source Citations (Gemini Grounding)
Handle citations and grounding sources:
citations = []
async for event in executor.execute(input, context):
if event.type == "source":
sources = event.metadata.get("sources", [])
for source in sources:
url = source.get("url")
title = source.get("title", "Unknown")
citations.append({"url": url, "title": title})
print(f"\n📚 Source: {title}")
print(f" URL: {url}")
elif event.type == "token":
print(event.content, end="", flush=True)
# Display all citations
if citations:
print("\n\nReferences:")
for i, cite in enumerate(citations, 1):
print(f"{i}. {cite['title']} - {cite['url']}")
Multi-Modal Files
Handle file attachments (images, PDFs, etc.):
files_received = []
async for event in executor.execute(input, context):
if event.type == "file":
filename = event.metadata.get("name")
mime_type = event.metadata.get("mime_type")
content = event.metadata.get("content") # Base64 encoded
files_received.append({
"name": filename,
"type": mime_type,
"data": content
})
print(f"\n📎 File: {filename} ({mime_type})")
# Save or process file
if mime_type.startswith("image/"):
import base64
with open(filename, "wb") as f:
f.write(base64.b64decode(content))
print(f" Saved to {filename}")
Custom Progress Events
Monitor custom progress indicators:
async for event in executor.execute(input, context):
if event.type == "custom_data":
data_type = event.metadata.get("data_type")
data = event.content
# Handle different custom event types
if data_type == "data-progress":
percent = data.get("percent", 0)
print(f"\rProgress: {percent}%", end="", flush=True)
elif data_type.startswith("data-rlm-"):
# RLM middleware events
if data_type == "data-rlm-step-start":
step = data.get("step")
print(f"\n[RLM Step {step}]")
elif data_type == "data-rlm-complete":
total_steps = data.get("total_steps")
print(f"\n[RLM Complete: {total_steps} steps]")
# Transient events (don't persist)
if event.metadata.get("transient"):
# Don't save to history
pass
Multi-Step Agent Tracking
Track multi-step agent execution:
steps = []
current_step = None
async for event in executor.execute(input, context):
if event.type == "step_start":
step_index = event.metadata.get("step_index", 0)
current_step = {
"index": step_index,
"content": "",
"usage": {}
}
print(f"\n{'='*50}")
print(f"Step {step_index + 1}")
print(f"{'='*50}")
elif event.type == "token":
if current_step:
current_step["content"] += event.content
print(event.content, end="", flush=True)
elif event.type == "step_complete":
if current_step:
current_step["usage"] = event.metadata.get("usage", {})
current_step["finish_reason"] = event.metadata.get("finish_reason")
steps.append(current_step)
usage = event.metadata.get("usage", {})
tokens = usage.get("total_tokens", 0)
print(f"\n\n✓ Step complete ({tokens} tokens)")
elif event.type == "execution_complete":
print(f"\n\nTotal steps: {len(steps)}")
total_tokens = sum(s["usage"].get("total_tokens", 0) for s in steps)
print(f"Total tokens: {total_tokens}")
Event Translation
Vel-Translated Events (Default)
By default, Mesh uses Vel’s event translators for consistent events:
# OpenAI Agents SDK → Vel events → Mesh events
graph.add_node("agent", openai_agent, node_type="agent")
async for event in executor.execute(input, context):
# Consistent event types regardless of provider
if event.type == "token":
print(event.content, end="")
Native Provider Events (Opt-in)
Use native events from the provider:
# OpenAI native events (no translation)
graph.add_node("agent", openai_agent, node_type="agent",
use_native_events=True)
async for event in executor.execute(input, context):
# Provider-specific event structure
print(event)
See Event Translation Guide for details.
Event Metadata
Events include metadata with extra context:
async for event in executor.execute(input, context):
if event.type == "token":
# Token-specific metadata
block_id = event.metadata.get("text_block_id")
event_subtype = event.metadata.get("event_subtype")
elif event.type == "message_complete":
# Message metadata
finish_reason = event.metadata.get("finish_reason")
message_id = event.metadata.get("message_id")
elif event.type == "tool_call_start":
# Tool metadata
tool_name = event.metadata.get("tool_name")
tool_call_id = event.metadata.get("tool_call_id")
SSE Streaming (FastAPI)
Stream events via Server-Sent Events:
from mesh.streaming import SSEAdapter
@app.post("/execute/stream")
async def stream_execute(request: dict):
adapter = SSEAdapter()
return adapter.to_streaming_response(
executor.execute(request["input"], context)
)
Client receives:
event: execution_start
data: {"type": "execution_start", "node_id": "START", ...}
event: token
data: {"type": "token", "content": "Hello", ...}
event: token
data: {"type": "token", "content": " world", ...}
event: execution_complete
data: {"type": "execution_complete", "output": {...}, ...}
Best Practices
1. Always Handle Errors
async for event in executor.execute(input, context):
if event.type == "node_error":
logger.error(f"Error in {event.node_id}: {event.error}")
# Handle gracefully
2. Flush Token Output
# ✅ Good: Immediate display
if event.type == "token":
print(event.content, end="", flush=True)
# ❌ Bad: Buffered output
if event.type == "token":
print(event.content, end="") # No flush
3. Track Progress
total_tokens = 0
async for event in executor.execute(input, context):
if event.type == "token":
total_tokens += 1
print(f"\rTokens: {total_tokens}", end="")
4. Use Execution Complete
async for event in executor.execute(input, context):
if event.type == "execution_complete":
# Final state is available
final_output = event.output
execution_time = event.metadata.get("duration_ms")
Event Flow Example
execution_start (graph_id=my-graph)
↓
node_start (node_id=analyzer)
↓
message_start
↓
token ("The")
token (" analysis")
token (" shows")
↓
message_complete
↓
node_complete (node_id=analyzer)
↓
node_start (node_id=tool)
↓
tool_call_start (tool_name=search)
↓
tool_call_complete (output={...})
↓
node_complete (node_id=tool)
↓
execution_complete (output={...})
See Also
- Graphs - Graph structure
- Nodes - Node types
- Execution - Execution model
- Streaming Guide - Streaming patterns
- Event Translation - Vel vs native events