Nodes
Nodes are the building blocks of Mesh workflows. Each node performs a specific task and can emit streaming events.
Node Types
Mesh provides 10 core node types:
| Type | Purpose | Key Features |
|---|---|---|
| StartNode | Graph entry point | Implicit, auto-created |
| EndNode | Graph exit point | Implicit, optional |
| AgentNode | Vel/OpenAI agents | Streaming, auto-detection |
| LLMNode | Direct LLM calls | Streaming, simple |
| ToolNode | Python functions | Sync/async support |
| RAGNode | Document retrieval | Vector search, context enrichment |
| ConditionNode | Branching logic | Multiple conditions |
| LoopNode | Array iteration | JSONPath selection |
| ApprovalNode | Human-in-the-loop | Pause/resume, approval workflows |
| OrchestratorNode | Multi-agent delegation | LLM-driven routing to sub-agents |
1. StartNode
The entry point to your graph. Always implicit (auto-created).
Usage:
graph = StateGraph()
graph.add_node("first", None, node_type="llm")
graph.add_edge("START", "first") # "START" is automatic
graph.set_entry_point("first")
Key Points:
- Don’t create manually
- Always reference as
"START"in edges - Sets up initial execution context
2. EndNode
The exit point of your graph. Optional and implicit.
Usage:
graph.add_node("last", None, node_type="llm")
graph.add_edge("last", "END") # Optional
Key Points:
- Automatically added if nodes have no children
- Triggers
execution_completeevent - Returns final output
3. AgentNode
Wraps Vel or OpenAI Agents SDK agents with automatic detection and streaming.
With Vel:
from vel import Agent as VelAgent
vel_agent = VelAgent(
id="assistant",
model={"provider": "openai", "name": "gpt-4"},
)
graph.add_node("agent", vel_agent, node_type="agent")
With OpenAI Agents SDK:
from agents import Agent
openai_agent = Agent(
name="Assistant",
instructions="You are helpful"
)
# Vel translation by default
graph.add_node("agent", openai_agent, node_type="agent")
# Or use native events
graph.add_node("agent", openai_agent, node_type="agent",
use_native_events=True)
Key Features:
- Auto-detects agent type (Vel vs OpenAI)
- Token-by-token streaming
- Event translation (Vel format by default)
- Chat history management
- System prompt override:
graph.add_node("agent", my_agent, node_type="agent",
system_prompt="Custom prompt: ")
4. LLMNode
Direct LLM calls without agent framework. Simpler but less powerful than AgentNode.
Usage:
graph.add_node("llm", None, node_type="llm",
model="gpt-4",
system_prompt="You are a helpful assistant")
Parameters:
model: OpenAI model name (required)system_prompt: System message (optional, supports variables)temperature: Creativity (0-2, default: 1.0)max_tokens: Output limit (default: None)
Example with Variables:
graph.add_node("llm", None, node_type="llm",
model="gpt-4",
system_prompt="Analyze: . Context: ")
When to Use:
- Quick LLM calls without tools
- Simple text generation
- No need for conversation history
- No tool calling required
5. ToolNode
Execute arbitrary Python functions as tools.
Basic Usage:
def my_tool(input: dict) -> dict:
query = input.get("query", "")
# Your logic here
return {"result": f"Processed: {query}"}
graph.add_node("tool", my_tool, node_type="tool")
Async Tools:
async def fetch_data(input: dict) -> dict:
async with httpx.AsyncClient() as client:
response = await client.get(input["url"])
return {"data": response.json()}
graph.add_node("fetcher", fetch_data, node_type="tool")
With Configuration:
def multiply(input: dict, multiplier: int = 2) -> dict:
value = input.get("value", 0)
return {"result": value * multiplier}
graph.add_node("tool", multiply, node_type="tool",
config={"bindings": {"multiplier": 3}})
Key Features:
- Supports sync and async functions
- Automatic parameter injection
- Error handling with retries
- Access to execution context:
def tool_with_context(input: dict, context: ExecutionContext) -> dict:
# Access session_id, variables, state, etc.
return {"session": context.session_id}
6. RAGNode
Retrieve documents from vector stores for context enrichment in LLM prompts.
Basic Usage:
from mesh.nodes import RAGNode
# Create RAG node
rag_node = RAGNode(
id="rag_0",
query_template="", # What to search for
top_k=5, # Number of documents
similarity_threshold=0.7, # Minimum score
file_id="uuid-123", # Filter to specific file
)
# Inject retriever (dependency injection pattern)
rag_node.set_retriever(retriever)
graph.add_node("rag_0", rag_node, node_type="rag")
Parameters:
id: Node identifier (e.g.,"rag_0")query_template: Search query with variable support (default:"")top_k: Number of documents to retrieve (default: 5)similarity_threshold: Minimum similarity score 0.0-1.0 (default: 0.7)file_id: UUID of specific file to search (optional)folder_uuid: UUID of folder to search across (optional)retriever_type: Type of vector store -"postgres"or"chroma"(default:"postgres")
Query Template - The Search Query
The Query Template determines what text gets embedded and searched against your vector database chunks.
Default: User’s Question
query_template="" # Searches based on user input
Example flow:
User asks: "What cats are good in hot weather?"
↓
Query Template:
↓
Resolved: "What cats are good in hot weather?"
↓
Generate embedding: [0.123, 0.456, ...] (1536 dims)
↓
Search postgres: Returns top 5 similar chunks
Dynamic Queries: Reference Previous Nodes
# Search based on LLM's refined query
query_template=""
# Combine multiple inputs
query_template="Find docs about related to "
# Use tool output
query_template=""
Static Queries: Fixed Search
# Always search for specific topic
query_template="product specifications and features"
# Domain-specific search
query_template="customer support policies"
Output Structure
RAGNode outputs a dictionary with:
{
"formatted": "<CONTEXT>...formatted docs with metadata...</CONTEXT>",
"documents": [
{
"id": "uuid",
"document_id": "file-uuid",
"content": "...",
"page_number": 5,
"heading": "Section Title",
"similarity": 0.89,
"file_title": "Document.pdf",
"folder_uuid": "folder-uuid"
},
...
],
"query": "resolved query text",
"num_results": 5
}
Access in downstream nodes:
- `` - Pre-formatted context block for LLM prompts
- `` - Raw document array for custom processing
- `` - The resolved query that was searched
Complete RAG Flow
from mesh import StateGraph
from mesh.nodes import RAGNode
# 1. Create graph
graph = StateGraph()
# 2. Add RAG node
rag_node = RAGNode(
id="rag_0",
query_template="",
top_k=5,
similarity_threshold=0.7,
file_id="file-uuid-123", # Search specific file
)
# 3. Add LLM that uses retrieved context
graph.add_node("llm", None, node_type="llm",
model="gpt-4",
system_prompt="""
Answer using this context:
Question:
""")
# 4. Connect nodes
graph.add_edge("START", "rag_0")
graph.add_edge("rag_0", "llm")
# 5. Inject retriever (before execution)
rag_node.set_retriever(my_retriever)
# 6. Execute
result = await graph.run(input="What cats are good in hot weather?")
Retriever Setup
RAGNode uses dependency injection for the retriever:
# Example retriever interface
class MyRetriever:
async def search_file(self, query: str, file_id: str,
similarity_threshold: float, limit: int):
# Generate embedding
embedding = await generate_embedding(query)
# Query vector database
results = query_pgvector(embedding, file_id, similarity_threshold, limit)
return results
async def search_folder(self, query: str, folder_uuid: str,
similarity_threshold: float, limit: int):
# Search across folder
...
# Inject into node
retriever = MyRetriever()
rag_node.set_retriever(retriever)
In React Flow Graphs:
When using React Flow parser, inject retrievers after parsing:
from mesh import ReactFlowParser
from mesh.nodes import RAGNode
# Parse graph
graph = parser.parse(react_flow_json)
# Inject retriever into all RAG nodes
retriever = MyRetriever()
for node in graph.nodes.values():
if isinstance(node, RAGNode):
node.set_retriever(retriever)
# Execute
await graph.run(input="...")
File vs Folder Search
File Search - Search within specific document:
RAGNode(
id="rag_0",
file_id="550e8400-e29b-41d4-a716-446655440000", # Specific file UUID
top_k=5
)
Folder Search - Search across all documents in folder:
RAGNode(
id="rag_0",
folder_uuid="abc-123-def", # Folder UUID
top_k=10 # Aggregates top results across all files
)
Note: Use either file_id OR folder_uuid, not both.
Multi-Source RAG
Search multiple knowledge bases by using multiple RAG nodes:
# Search product docs
rag_products = RAGNode(
id="rag_products",
query_template="",
file_id="products-file-id",
top_k=3
)
# Search support tickets
rag_support = RAGNode(
id="rag_support",
query_template="",
file_id="support-file-id",
top_k=3
)
# LLM uses both sources
graph.add_node("llm", None, node_type="llm",
system_prompt="""
Product context:
Support context:
Question:
""")
# Connect
graph.add_edge("START", "rag_products")
graph.add_edge("START", "rag_support") # Parallel retrieval
graph.add_edge("rag_products", "llm")
graph.add_edge("rag_support", "llm")
Query Refinement Pattern
Use an LLM to refine the search query before retrieval:
# Step 1: Refine query
graph.add_node("refiner", None, node_type="llm",
model="gpt-4",
system_prompt="Rewrite as a search query: ")
# Step 2: Search with refined query
rag_node = RAGNode(
id="rag_0",
query_template="", # Use LLM's refined query
top_k=5
)
# Step 3: Answer with context
graph.add_node("answerer", None, node_type="llm",
system_prompt="Answer using: ")
# Connect
graph.add_edge("START", "refiner")
graph.add_edge("refiner", "rag_0")
graph.add_edge("rag_0", "answerer")
When to Use RAGNode
✅ Use RAGNode when:
- You need to ground LLM responses in specific documents
- Working with large knowledge bases (docs, tickets, articles)
- Need up-to-date information from your own data
- Want to cite sources in LLM responses
- Building Q&A systems over documentation
❌ Don’t use RAGNode when:
- Information fits in LLM context window
- No vector database available
- Real-time web search needed (use tool with API instead)
- Documents change too frequently for embeddings
7. ConditionNode
Conditional branching with multiple output paths.
Usage:
from mesh.nodes import Condition
def check_sentiment(output: dict) -> bool:
return "positive" in output.get("content", "").lower()
def check_negative(output: dict) -> bool:
return "negative" in output.get("content", "").lower()
graph.add_node("condition", [
Condition("positive", check_sentiment, "positive_handler"),
Condition("negative", check_negative, "negative_handler"),
], node_type="condition", default_target="neutral_handler")
# Add handlers
graph.add_node("positive_handler", None, node_type="llm")
graph.add_node("negative_handler", None, node_type="llm")
graph.add_node("neutral_handler", None, node_type="llm")
# Connect
graph.add_edge("START", "analyzer")
graph.add_edge("analyzer", "condition")
graph.add_edge("condition", "positive_handler")
graph.add_edge("condition", "negative_handler")
graph.add_edge("condition", "neutral_handler")
Condition Object:
Condition(
name="condition_name", # Identifier
predicate=lambda x: bool, # Function returning True/False
target_node="target_node_id" # Where to route if True
)
Key Features:
- Multiple conditions per node
- Default fallback path
- Predicates can be any callable
- Unfulfilled branches are skipped
Advanced Example:
def is_long_text(output: dict) -> bool:
content = output.get("content", "")
return len(content) > 1000
def is_short_text(output: dict) -> bool:
content = output.get("content", "")
return len(content) <= 100
graph.add_node("router", [
Condition("long", is_long_text, "summarizer"),
Condition("short", is_short_text, "expander"),
], node_type="condition", default_target="normal_handler")
8. LoopNode
Iterate over arrays and execute downstream nodes for each item.
Usage:
graph.add_node("loop", None, node_type="loop",
array_path="$.items",
max_iterations=100)
graph.add_node("processor", None, node_type="llm",
model="gpt-4",
system_prompt="Process item: ")
graph.add_edge("START", "loop")
graph.add_edge("loop", "processor")
Parameters:
array_path: JSONPath to array in input (required)max_iterations: Maximum loop count (default: 100)
JSONPath Examples:
# Top-level array
array_path="$" # Input: [1, 2, 3]
# Nested field
array_path="$.data.items" # Input: {"data": {"items": [...]}}
# Array in result
array_path="$.results[*]" # Input: {"results": [...]}
Iteration Variables:
Access current iteration data:
# - Current item value
# - Current index (0-based)
# - True if first item
# - True if last item
graph.add_node("processor", None, node_type="llm",
system_prompt="""
Item #}:
""")
Example Workflow:
Input:
{
"items": [
{"name": "Alice", "age": 30},
{"name": "Bob", "age": 25}
]
}
Graph:
graph.add_node("loop", None, node_type="loop", array_path="$.items")
graph.add_node("greet", None, node_type="llm",
system_prompt="Greet , age ")
graph.add_edge("START", "loop")
graph.add_edge("loop", "greet")
9. ApprovalNode
Pause execution for human-in-the-loop approval before continuing.
Usage:
from mesh.nodes import ApprovalNode, approve, reject
graph.add_node("approval", ApprovalNode(
id="approval",
approval_id="plan_approval",
approval_message="Please review the plan before execution",
data_extractor=lambda input: {
"plan_title": input.get("title"),
"step_count": len(input.get("steps", [])),
},
))
graph.add_edge("planner", "approval")
graph.add_edge("approval", "executor")
Parameters:
approval_id: Unique identifier for this approval point (required)approval_message: Message displayed to the approver (optional)data_extractor: Function to transform input into display data (optional)timeout_seconds: Timeout in seconds (not yet implemented)
Execution Flow:
Node executes
│
▼
┌─────────────┐
│ Approval │ ──▶ Emit APPROVAL_PENDING event
│ Node │ ──▶ Return approval_pending=True
└──────┬──────┘ ──▶ Execution PAUSES
│
▼
┌─────────────┐
│ User │ (External decision)
│ Decision │
└──────┬──────┘
│
├─ approve() ──▶ APPROVAL_RECEIVED ──▶ Resume execution
│
└─ reject() ──▶ APPROVAL_REJECTED ──▶ End execution
Handling Approval:
from mesh import Executor, ExecutionContext, MemoryBackend
from mesh.nodes import approve, reject
executor = Executor(graph.compile(), MemoryBackend())
context = ExecutionContext(...)
async for event in executor.execute(input, context):
if event.metadata.get("status") == "waiting_for_approval":
# Display approval data to user
approval_data = event.metadata.get("approval_data", {})
print(f"Plan: {approval_data.get('plan_title')}")
# Get user decision
user_input = input("Approve? (y/n): ")
if user_input.lower() == 'y':
result = approve()
else:
result = reject(reason="User declined")
# Resume execution
async for resume_event in executor.resume(context, result):
# Process resumed events
pass
Approval with Modified Data:
# Approve and modify the input for the next node
result = approve(
modified_data={"plan": modified_plan},
approver_id="user@example.com",
metadata={"approved_at": datetime.now().isoformat()},
)
Key Features:
- Pauses execution until approval received
- Stores pending state for resume
- Supports data transformation for display
- Handles both approval and rejection
- Tracks approver identity and metadata
When to Use:
- Research pipelines needing plan review
- Content workflows with editorial approval
- Financial transactions requiring authorization
- Deployment pipelines with manual gates
- Any critical decision point needing human oversight
See the Deep Research Guide for a complete example.
10. OrchestratorNode
LLM-driven delegation to sub-agents at runtime. Instead of static graph edges, an orchestrator dynamically decides which sub-agents to call based on the task.
Visual Canvas Pattern:
In the React Flow canvas, connect AgentFlowNodes as children of the OrchestratorNode:
┌─→ AgentFlowNode (Researcher)
StartNode → Orchestrator ─┼─→ AgentFlowNode (Analyst)
└─→ AgentFlowNode (Writer)
The orchestrator discovers sub-agents from graph edges - no dropdown selection needed!
Basic Usage:
from mesh.nodes import OrchestratorNode
orchestrator = OrchestratorNode(
id="orchestrator_0",
provider="openai",
model_name="gpt-4o",
instruction="""You are a research team coordinator.
For gathering information → use researcher
For analyzing data → use analyst
For writing summaries → use writer""",
result_mode="synthesize",
max_iterations=5,
)
# Inject dependencies
orchestrator.set_flow_loader(my_flow_loader)
orchestrator.set_registry(my_registry)
orchestrator.set_graph(execution_graph) # For sub-agent discovery
Parameters:
provider: LLM provider (openai, anthropic, gemini)model_name: Model for orchestration decisionsinstruction: Instructions for the orchestrator LLMtemperature: Sampling temperature (default: 0.3)result_mode: How to handle outputs (synthesize, stream_through, raw)max_iterations: Maximum sub-agent calls (default: 5)show_sub_agent_events: Stream events from sub-agents (default: True)
Key Features:
- Sub-agents discovered from graph edges (connected AgentFlowNodes)
- Sub-agents are Mesh graphs (not Vel Agents directly)
- LLM decides which sub-agents to call based on task
- Sub-agent descriptions are critical for routing decisions
- Events from sub-agents are prefixed with orchestrator node ID
When to Use:
- Dynamic task routing based on content
- Building agent teams with specialized roles
- Hierarchical delegation patterns
- Complex workflows requiring runtime decisions
See the Orchestrator Guide for a complete walkthrough.
Node Configuration
All nodes support common configuration:
Retry Logic
graph.add_node("flaky_tool", my_function, node_type="tool",
config={
"retry": {
"max_retries": 3,
"delay": 1.0 # seconds
}
})
Metadata
graph.add_node("llm", None, node_type="llm",
metadata={"description": "Analyzer node", "version": "1.0"})
Node Execution
Each node implements the execute() method:
async def execute(
self,
input: Any,
context: ExecutionContext
) -> NodeResult:
# Node logic here
return NodeResult(
output={"content": "..."},
state={"key": "value"},
chat_history=[...],
metadata={...}
)
NodeResult Fields:
output: Data passed to child nodesstate: Updates to shared statechat_history: Conversation updatesmetadata: Extra informationloop_to_node: For loop nodesmax_loops: Loop limit
Best Practices
1. Choose the Right Node Type
# ✅ Use AgentNode for complex interactions with tools
graph.add_node("agent", vel_agent, node_type="agent")
# ✅ Use LLMNode for simple text generation
graph.add_node("llm", None, node_type="llm", model="gpt-4")
# ✅ Use ToolNode for custom logic
graph.add_node("tool", my_function, node_type="tool")
2. Keep Nodes Focused
# ✅ Good: Single responsibility
graph.add_node("analyzer", None, node_type="llm",
system_prompt="Analyze sentiment")
graph.add_node("summarizer", None, node_type="llm",
system_prompt="Summarize results")
# ❌ Bad: Trying to do too much
graph.add_node("do_everything", None, node_type="llm",
system_prompt="Analyze, summarize, and respond")
3. Use Variables
# ✅ Good: Reference previous nodes
graph.add_node("step2", None, node_type="llm",
system_prompt="Based on , ...")
# ❌ Bad: Hardcoded values
graph.add_node("step2", None, node_type="llm",
system_prompt="Based on the analysis, ...")
4. Handle Errors
# ✅ Good: Retries configured
graph.add_node("api_call", fetch_data, node_type="tool",
config={"retry": {"max_retries": 3}})
# Add error handling
graph.add_node("error_handler", None, node_type="llm")
Common Patterns
Pattern: Analyze → Process → Respond
graph.add_node("analyzer", None, node_type="llm", model="gpt-4")
graph.add_node("processor", process_func, node_type="tool")
graph.add_node("responder", None, node_type="llm", model="gpt-4")
graph.add_edge("START", "analyzer")
graph.add_edge("analyzer", "processor")
graph.add_edge("processor", "responder")
Pattern: Agent with Tool
graph.add_node("agent", my_agent, node_type="agent")
graph.add_node("tool", my_tool, node_type="tool")
graph.add_edge("START", "agent")
graph.add_edge("agent", "tool")
Pattern: Conditional Routing
graph.add_node("classifier", None, node_type="llm")
graph.add_node("router", conditions, node_type="condition")
graph.add_node("handler_a", None, node_type="llm")
graph.add_node("handler_b", None, node_type="llm")
graph.add_edge("START", "classifier")
graph.add_edge("classifier", "router")
graph.add_edge("router", "handler_a")
graph.add_edge("router", "handler_b")
See Also
- Graphs - Graph structure
- Execution - How nodes execute
- Events - Node event emission
- Streaming Guide - Streaming patterns
- Variables Guide - Variable resolution