Nodes

Nodes are the building blocks of Mesh workflows. Each node performs a specific task and can emit streaming events.

Node Types

Mesh provides 10 core node types:

Type Purpose Key Features
StartNode Graph entry point Implicit, auto-created
EndNode Graph exit point Implicit, optional
AgentNode Vel/OpenAI agents Streaming, auto-detection
LLMNode Direct LLM calls Streaming, simple
ToolNode Python functions Sync/async support
RAGNode Document retrieval Vector search, context enrichment
ConditionNode Branching logic Multiple conditions
LoopNode Array iteration JSONPath selection
ApprovalNode Human-in-the-loop Pause/resume, approval workflows
OrchestratorNode Multi-agent delegation LLM-driven routing to sub-agents

1. StartNode

The entry point to your graph. Always implicit (auto-created).

Usage:

graph = StateGraph()
graph.add_node("first", None, node_type="llm")
graph.add_edge("START", "first")  # "START" is automatic
graph.set_entry_point("first")

Key Points:

  • Don’t create manually
  • Always reference as "START" in edges
  • Sets up initial execution context

2. EndNode

The exit point of your graph. Optional and implicit.

Usage:

graph.add_node("last", None, node_type="llm")
graph.add_edge("last", "END")  # Optional

Key Points:

  • Automatically added if nodes have no children
  • Triggers execution_complete event
  • Returns final output

3. AgentNode

Wraps Vel or OpenAI Agents SDK agents with automatic detection and streaming.

With Vel:

from vel import Agent as VelAgent

vel_agent = VelAgent(
    id="assistant",
    model={"provider": "openai", "name": "gpt-4"},
)

graph.add_node("agent", vel_agent, node_type="agent")

With OpenAI Agents SDK:

from agents import Agent

openai_agent = Agent(
    name="Assistant",
    instructions="You are helpful"
)

# Vel translation by default
graph.add_node("agent", openai_agent, node_type="agent")

# Or use native events
graph.add_node("agent", openai_agent, node_type="agent",
               use_native_events=True)

Key Features:

  • Auto-detects agent type (Vel vs OpenAI)
  • Token-by-token streaming
  • Event translation (Vel format by default)
  • Chat history management
  • System prompt override:
graph.add_node("agent", my_agent, node_type="agent",
               system_prompt="Custom prompt: ")

4. LLMNode

Direct LLM calls without agent framework. Simpler but less powerful than AgentNode.

Usage:

graph.add_node("llm", None, node_type="llm",
               model="gpt-4",
               system_prompt="You are a helpful assistant")

Parameters:

  • model: OpenAI model name (required)
  • system_prompt: System message (optional, supports variables)
  • temperature: Creativity (0-2, default: 1.0)
  • max_tokens: Output limit (default: None)

Example with Variables:

graph.add_node("llm", None, node_type="llm",
               model="gpt-4",
               system_prompt="Analyze: . Context: ")

When to Use:

  • Quick LLM calls without tools
  • Simple text generation
  • No need for conversation history
  • No tool calling required

5. ToolNode

Execute arbitrary Python functions as tools.

Basic Usage:

def my_tool(input: dict) -> dict:
    query = input.get("query", "")
    # Your logic here
    return {"result": f"Processed: {query}"}

graph.add_node("tool", my_tool, node_type="tool")

Async Tools:

async def fetch_data(input: dict) -> dict:
    async with httpx.AsyncClient() as client:
        response = await client.get(input["url"])
        return {"data": response.json()}

graph.add_node("fetcher", fetch_data, node_type="tool")

With Configuration:

def multiply(input: dict, multiplier: int = 2) -> dict:
    value = input.get("value", 0)
    return {"result": value * multiplier}

graph.add_node("tool", multiply, node_type="tool",
               config={"bindings": {"multiplier": 3}})

Key Features:

  • Supports sync and async functions
  • Automatic parameter injection
  • Error handling with retries
  • Access to execution context:
def tool_with_context(input: dict, context: ExecutionContext) -> dict:
    # Access session_id, variables, state, etc.
    return {"session": context.session_id}

6. RAGNode

Retrieve documents from vector stores for context enrichment in LLM prompts.

Basic Usage:

from mesh.nodes import RAGNode

# Create RAG node
rag_node = RAGNode(
    id="rag_0",
    query_template="",  # What to search for
    top_k=5,                          # Number of documents
    similarity_threshold=0.7,         # Minimum score
    file_id="uuid-123",              # Filter to specific file
)

# Inject retriever (dependency injection pattern)
rag_node.set_retriever(retriever)

graph.add_node("rag_0", rag_node, node_type="rag")

Parameters:

  • id: Node identifier (e.g., "rag_0")
  • query_template: Search query with variable support (default: "")
  • top_k: Number of documents to retrieve (default: 5)
  • similarity_threshold: Minimum similarity score 0.0-1.0 (default: 0.7)
  • file_id: UUID of specific file to search (optional)
  • folder_uuid: UUID of folder to search across (optional)
  • retriever_type: Type of vector store - "postgres" or "chroma" (default: "postgres")

Query Template - The Search Query

The Query Template determines what text gets embedded and searched against your vector database chunks.

Default: User’s Question

query_template=""  # Searches based on user input

Example flow:

User asks: "What cats are good in hot weather?"
           ↓
Query Template: 
           ↓
Resolved: "What cats are good in hot weather?"
           ↓
Generate embedding: [0.123, 0.456, ...] (1536 dims)
           ↓
Search postgres: Returns top 5 similar chunks

Dynamic Queries: Reference Previous Nodes

# Search based on LLM's refined query
query_template=""

# Combine multiple inputs
query_template="Find docs about  related to "

# Use tool output
query_template=""

Static Queries: Fixed Search

# Always search for specific topic
query_template="product specifications and features"

# Domain-specific search
query_template="customer support policies"

Output Structure

RAGNode outputs a dictionary with:

{
    "formatted": "<CONTEXT>...formatted docs with metadata...</CONTEXT>",
    "documents": [
        {
            "id": "uuid",
            "document_id": "file-uuid",
            "content": "...",
            "page_number": 5,
            "heading": "Section Title",
            "similarity": 0.89,
            "file_title": "Document.pdf",
            "folder_uuid": "folder-uuid"
        },
        ...
    ],
    "query": "resolved query text",
    "num_results": 5
}

Access in downstream nodes:

  • `` - Pre-formatted context block for LLM prompts
  • `` - Raw document array for custom processing
  • `` - The resolved query that was searched

Complete RAG Flow

from mesh import StateGraph
from mesh.nodes import RAGNode

# 1. Create graph
graph = StateGraph()

# 2. Add RAG node
rag_node = RAGNode(
    id="rag_0",
    query_template="",
    top_k=5,
    similarity_threshold=0.7,
    file_id="file-uuid-123",  # Search specific file
)

# 3. Add LLM that uses retrieved context
graph.add_node("llm", None, node_type="llm",
               model="gpt-4",
               system_prompt="""
               Answer using this context:
               

               Question: 
               """)

# 4. Connect nodes
graph.add_edge("START", "rag_0")
graph.add_edge("rag_0", "llm")

# 5. Inject retriever (before execution)
rag_node.set_retriever(my_retriever)

# 6. Execute
result = await graph.run(input="What cats are good in hot weather?")

Retriever Setup

RAGNode uses dependency injection for the retriever:

# Example retriever interface
class MyRetriever:
    async def search_file(self, query: str, file_id: str,
                         similarity_threshold: float, limit: int):
        # Generate embedding
        embedding = await generate_embedding(query)

        # Query vector database
        results = query_pgvector(embedding, file_id, similarity_threshold, limit)
        return results

    async def search_folder(self, query: str, folder_uuid: str,
                           similarity_threshold: float, limit: int):
        # Search across folder
        ...

# Inject into node
retriever = MyRetriever()
rag_node.set_retriever(retriever)

In React Flow Graphs:

When using React Flow parser, inject retrievers after parsing:

from mesh import ReactFlowParser
from mesh.nodes import RAGNode

# Parse graph
graph = parser.parse(react_flow_json)

# Inject retriever into all RAG nodes
retriever = MyRetriever()
for node in graph.nodes.values():
    if isinstance(node, RAGNode):
        node.set_retriever(retriever)

# Execute
await graph.run(input="...")

File Search - Search within specific document:

RAGNode(
    id="rag_0",
    file_id="550e8400-e29b-41d4-a716-446655440000",  # Specific file UUID
    top_k=5
)

Folder Search - Search across all documents in folder:

RAGNode(
    id="rag_0",
    folder_uuid="abc-123-def",  # Folder UUID
    top_k=10  # Aggregates top results across all files
)

Note: Use either file_id OR folder_uuid, not both.

Multi-Source RAG

Search multiple knowledge bases by using multiple RAG nodes:

# Search product docs
rag_products = RAGNode(
    id="rag_products",
    query_template="",
    file_id="products-file-id",
    top_k=3
)

# Search support tickets
rag_support = RAGNode(
    id="rag_support",
    query_template="",
    file_id="support-file-id",
    top_k=3
)

# LLM uses both sources
graph.add_node("llm", None, node_type="llm",
               system_prompt="""
               Product context: 
               Support context: 

               Question: 
               """)

# Connect
graph.add_edge("START", "rag_products")
graph.add_edge("START", "rag_support")  # Parallel retrieval
graph.add_edge("rag_products", "llm")
graph.add_edge("rag_support", "llm")

Query Refinement Pattern

Use an LLM to refine the search query before retrieval:

# Step 1: Refine query
graph.add_node("refiner", None, node_type="llm",
               model="gpt-4",
               system_prompt="Rewrite as a search query: ")

# Step 2: Search with refined query
rag_node = RAGNode(
    id="rag_0",
    query_template="",  # Use LLM's refined query
    top_k=5
)

# Step 3: Answer with context
graph.add_node("answerer", None, node_type="llm",
               system_prompt="Answer using: ")

# Connect
graph.add_edge("START", "refiner")
graph.add_edge("refiner", "rag_0")
graph.add_edge("rag_0", "answerer")

When to Use RAGNode

✅ Use RAGNode when:

  • You need to ground LLM responses in specific documents
  • Working with large knowledge bases (docs, tickets, articles)
  • Need up-to-date information from your own data
  • Want to cite sources in LLM responses
  • Building Q&A systems over documentation

❌ Don’t use RAGNode when:

  • Information fits in LLM context window
  • No vector database available
  • Real-time web search needed (use tool with API instead)
  • Documents change too frequently for embeddings

7. ConditionNode

Conditional branching with multiple output paths.

Usage:

from mesh.nodes import Condition

def check_sentiment(output: dict) -> bool:
    return "positive" in output.get("content", "").lower()

def check_negative(output: dict) -> bool:
    return "negative" in output.get("content", "").lower()

graph.add_node("condition", [
    Condition("positive", check_sentiment, "positive_handler"),
    Condition("negative", check_negative, "negative_handler"),
], node_type="condition", default_target="neutral_handler")

# Add handlers
graph.add_node("positive_handler", None, node_type="llm")
graph.add_node("negative_handler", None, node_type="llm")
graph.add_node("neutral_handler", None, node_type="llm")

# Connect
graph.add_edge("START", "analyzer")
graph.add_edge("analyzer", "condition")
graph.add_edge("condition", "positive_handler")
graph.add_edge("condition", "negative_handler")
graph.add_edge("condition", "neutral_handler")

Condition Object:

Condition(
    name="condition_name",           # Identifier
    predicate=lambda x: bool,        # Function returning True/False
    target_node="target_node_id"     # Where to route if True
)

Key Features:

  • Multiple conditions per node
  • Default fallback path
  • Predicates can be any callable
  • Unfulfilled branches are skipped

Advanced Example:

def is_long_text(output: dict) -> bool:
    content = output.get("content", "")
    return len(content) > 1000

def is_short_text(output: dict) -> bool:
    content = output.get("content", "")
    return len(content) <= 100

graph.add_node("router", [
    Condition("long", is_long_text, "summarizer"),
    Condition("short", is_short_text, "expander"),
], node_type="condition", default_target="normal_handler")

8. LoopNode

Iterate over arrays and execute downstream nodes for each item.

Usage:

graph.add_node("loop", None, node_type="loop",
               array_path="$.items",
               max_iterations=100)

graph.add_node("processor", None, node_type="llm",
               model="gpt-4",
               system_prompt="Process item: ")

graph.add_edge("START", "loop")
graph.add_edge("loop", "processor")

Parameters:

  • array_path: JSONPath to array in input (required)
  • max_iterations: Maximum loop count (default: 100)

JSONPath Examples:

# Top-level array
array_path="$"  # Input: [1, 2, 3]

# Nested field
array_path="$.data.items"  # Input: {"data": {"items": [...]}}

# Array in result
array_path="$.results[*]"  # Input: {"results": [...]}

Iteration Variables:

Access current iteration data:

#  - Current item value
#  - Current index (0-based)
#  - True if first item
#  - True if last item

graph.add_node("processor", None, node_type="llm",
               system_prompt="""
               Item #}: 
               
               """)

Example Workflow:

Input:

{
  "items": [
    {"name": "Alice", "age": 30},
    {"name": "Bob", "age": 25}
  ]
}

Graph:

graph.add_node("loop", None, node_type="loop", array_path="$.items")
graph.add_node("greet", None, node_type="llm",
               system_prompt="Greet , age ")
graph.add_edge("START", "loop")
graph.add_edge("loop", "greet")

9. ApprovalNode

Pause execution for human-in-the-loop approval before continuing.

Usage:

from mesh.nodes import ApprovalNode, approve, reject

graph.add_node("approval", ApprovalNode(
    id="approval",
    approval_id="plan_approval",
    approval_message="Please review the plan before execution",
    data_extractor=lambda input: {
        "plan_title": input.get("title"),
        "step_count": len(input.get("steps", [])),
    },
))

graph.add_edge("planner", "approval")
graph.add_edge("approval", "executor")

Parameters:

  • approval_id: Unique identifier for this approval point (required)
  • approval_message: Message displayed to the approver (optional)
  • data_extractor: Function to transform input into display data (optional)
  • timeout_seconds: Timeout in seconds (not yet implemented)

Execution Flow:

Node executes
     │
     ▼
┌─────────────┐
│  Approval   │ ──▶ Emit APPROVAL_PENDING event
│    Node     │ ──▶ Return approval_pending=True
└──────┬──────┘ ──▶ Execution PAUSES
       │
       ▼
┌─────────────┐
│   User      │     (External decision)
│  Decision   │
└──────┬──────┘
       │
       ├─ approve() ──▶ APPROVAL_RECEIVED ──▶ Resume execution
       │
       └─ reject()  ──▶ APPROVAL_REJECTED ──▶ End execution

Handling Approval:

from mesh import Executor, ExecutionContext, MemoryBackend
from mesh.nodes import approve, reject

executor = Executor(graph.compile(), MemoryBackend())
context = ExecutionContext(...)

async for event in executor.execute(input, context):
    if event.metadata.get("status") == "waiting_for_approval":
        # Display approval data to user
        approval_data = event.metadata.get("approval_data", {})
        print(f"Plan: {approval_data.get('plan_title')}")

        # Get user decision
        user_input = input("Approve? (y/n): ")

        if user_input.lower() == 'y':
            result = approve()
        else:
            result = reject(reason="User declined")

        # Resume execution
        async for resume_event in executor.resume(context, result):
            # Process resumed events
            pass

Approval with Modified Data:

# Approve and modify the input for the next node
result = approve(
    modified_data={"plan": modified_plan},
    approver_id="user@example.com",
    metadata={"approved_at": datetime.now().isoformat()},
)

Key Features:

  • Pauses execution until approval received
  • Stores pending state for resume
  • Supports data transformation for display
  • Handles both approval and rejection
  • Tracks approver identity and metadata

When to Use:

  • Research pipelines needing plan review
  • Content workflows with editorial approval
  • Financial transactions requiring authorization
  • Deployment pipelines with manual gates
  • Any critical decision point needing human oversight

See the Deep Research Guide for a complete example.

10. OrchestratorNode

LLM-driven delegation to sub-agents at runtime. Instead of static graph edges, an orchestrator dynamically decides which sub-agents to call based on the task.

Visual Canvas Pattern:

In the React Flow canvas, connect AgentFlowNodes as children of the OrchestratorNode:

                        ┌─→ AgentFlowNode (Researcher)
StartNode → Orchestrator ─┼─→ AgentFlowNode (Analyst)
                        └─→ AgentFlowNode (Writer)

The orchestrator discovers sub-agents from graph edges - no dropdown selection needed!

Basic Usage:

from mesh.nodes import OrchestratorNode

orchestrator = OrchestratorNode(
    id="orchestrator_0",
    provider="openai",
    model_name="gpt-4o",
    instruction="""You are a research team coordinator.
    For gathering information → use researcher
    For analyzing data → use analyst
    For writing summaries → use writer""",
    result_mode="synthesize",
    max_iterations=5,
)

# Inject dependencies
orchestrator.set_flow_loader(my_flow_loader)
orchestrator.set_registry(my_registry)
orchestrator.set_graph(execution_graph)  # For sub-agent discovery

Parameters:

  • provider: LLM provider (openai, anthropic, gemini)
  • model_name: Model for orchestration decisions
  • instruction: Instructions for the orchestrator LLM
  • temperature: Sampling temperature (default: 0.3)
  • result_mode: How to handle outputs (synthesize, stream_through, raw)
  • max_iterations: Maximum sub-agent calls (default: 5)
  • show_sub_agent_events: Stream events from sub-agents (default: True)

Key Features:

  • Sub-agents discovered from graph edges (connected AgentFlowNodes)
  • Sub-agents are Mesh graphs (not Vel Agents directly)
  • LLM decides which sub-agents to call based on task
  • Sub-agent descriptions are critical for routing decisions
  • Events from sub-agents are prefixed with orchestrator node ID

When to Use:

  • Dynamic task routing based on content
  • Building agent teams with specialized roles
  • Hierarchical delegation patterns
  • Complex workflows requiring runtime decisions

See the Orchestrator Guide for a complete walkthrough.

Node Configuration

All nodes support common configuration:

Retry Logic

graph.add_node("flaky_tool", my_function, node_type="tool",
               config={
                   "retry": {
                       "max_retries": 3,
                       "delay": 1.0  # seconds
                   }
               })

Metadata

graph.add_node("llm", None, node_type="llm",
               metadata={"description": "Analyzer node", "version": "1.0"})

Node Execution

Each node implements the execute() method:

async def execute(
    self,
    input: Any,
    context: ExecutionContext
) -> NodeResult:
    # Node logic here
    return NodeResult(
        output={"content": "..."},
        state={"key": "value"},
        chat_history=[...],
        metadata={...}
    )

NodeResult Fields:

  • output: Data passed to child nodes
  • state: Updates to shared state
  • chat_history: Conversation updates
  • metadata: Extra information
  • loop_to_node: For loop nodes
  • max_loops: Loop limit

Best Practices

1. Choose the Right Node Type

# ✅ Use AgentNode for complex interactions with tools
graph.add_node("agent", vel_agent, node_type="agent")

# ✅ Use LLMNode for simple text generation
graph.add_node("llm", None, node_type="llm", model="gpt-4")

# ✅ Use ToolNode for custom logic
graph.add_node("tool", my_function, node_type="tool")

2. Keep Nodes Focused

# ✅ Good: Single responsibility
graph.add_node("analyzer", None, node_type="llm",
               system_prompt="Analyze sentiment")
graph.add_node("summarizer", None, node_type="llm",
               system_prompt="Summarize results")

# ❌ Bad: Trying to do too much
graph.add_node("do_everything", None, node_type="llm",
               system_prompt="Analyze, summarize, and respond")

3. Use Variables

# ✅ Good: Reference previous nodes
graph.add_node("step2", None, node_type="llm",
               system_prompt="Based on , ...")

# ❌ Bad: Hardcoded values
graph.add_node("step2", None, node_type="llm",
               system_prompt="Based on the analysis, ...")

4. Handle Errors

# ✅ Good: Retries configured
graph.add_node("api_call", fetch_data, node_type="tool",
               config={"retry": {"max_retries": 3}})

# Add error handling
graph.add_node("error_handler", None, node_type="llm")

Common Patterns

Pattern: Analyze → Process → Respond

graph.add_node("analyzer", None, node_type="llm", model="gpt-4")
graph.add_node("processor", process_func, node_type="tool")
graph.add_node("responder", None, node_type="llm", model="gpt-4")

graph.add_edge("START", "analyzer")
graph.add_edge("analyzer", "processor")
graph.add_edge("processor", "responder")

Pattern: Agent with Tool

graph.add_node("agent", my_agent, node_type="agent")
graph.add_node("tool", my_tool, node_type="tool")
graph.add_edge("START", "agent")
graph.add_edge("agent", "tool")

Pattern: Conditional Routing

graph.add_node("classifier", None, node_type="llm")
graph.add_node("router", conditions, node_type="condition")
graph.add_node("handler_a", None, node_type="llm")
graph.add_node("handler_b", None, node_type="llm")

graph.add_edge("START", "classifier")
graph.add_edge("classifier", "router")
graph.add_edge("router", "handler_a")
graph.add_edge("router", "handler_b")

See Also