Streaming Guide
Token-by-token streaming with Mesh.
Overview
Mesh provides real-time token-by-token streaming for all agent and LLM nodes via AsyncIterator.
Basic Streaming
async for event in executor.execute("Tell me a story", context):
if event.type == "token":
print(event.content, end="", flush=True)
Complete Example
import asyncio
from mesh import StateGraph, Executor, ExecutionContext, MemoryBackend
async def main():
graph = StateGraph()
graph.add_node("llm", None, node_type="llm", model="gpt-4",
system_prompt="Tell a short story")
graph.add_edge("START", "llm")
graph.set_entry_point("llm")
compiled = graph.compile()
executor = Executor(compiled, MemoryBackend())
context = ExecutionContext(
graph_id="streaming",
session_id="session-1",
chat_history=[],
variables={},
state={}
)
print("Story: ", end="")
async for event in executor.execute("Tell me a story", context):
if event.type == "token":
print(event.content, end="", flush=True)
print() # New line
asyncio.run(main())
Event Handling Patterns
Pattern 1: Display Only
async for event in executor.execute(input, context):
if event.type == "token":
print(event.content, end="", flush=True)
Pattern 2: Collect and Display
full_response = ""
async for event in executor.execute(input, context):
if event.type == "token":
full_response += event.content
print(event.content, end="", flush=True)
# Use full_response later
save_to_database(full_response)
Pattern 3: UI Updates
async for event in executor.execute(input, context):
if event.type == "token":
await websocket.send(event.content)
elif event.type == "execution_complete":
await websocket.send({"status": "complete"})
SSE Streaming (FastAPI)
from fastapi import FastAPI
from mesh.streaming import SSEAdapter
app = FastAPI()
@app.post("/execute/stream")
async def stream_execute(request: dict):
adapter = SSEAdapter()
return adapter.to_streaming_response(
executor.execute(request["input"], context)
)
See Quick Start for complete FastAPI example.