Data Handler Node
A specialized ToolNode for executing SQL queries against various database sources with support for fixed and AI-interpolated parameters.
Overview
The DataHandlerNode is a specialized subclass of ToolNode designed for database operations. It:
- Connects to specified database source (MySQL, Postgres, Vertica, etc.)
- Executes SQL queries with named parameters
- Supports both fixed parameters (user-defined) and interpolated parameters (AI-generated)
- Can be stored in database as records (like other tool nodes)
Architecture
DataHandlerNode (extends ToolNode)
↓
Internally creates query executor function
↓
ToolNode handles execution, retries, events
↓
Query executed against injected DB session
Usage Patterns
Pattern 1: Fixed Parameters (User-Defined)
User provides all query parameters at configuration time:
from mesh.nodes import DataHandlerNode
node = DataHandlerNode(
id="data_handler_0",
db_source="postgres",
query="SELECT * FROM users WHERE status = :status AND age > :min_age",
params={
"status": "active",
"min_age": 18
}
)
# Inject DB session getter
node.set_db_session_getter(lambda source: get_db_session(source))
graph.add_node("data_handler_0", node, node_type="data_handler")
Pattern 2: Interpolated Parameters (AI-Generated)
AI agent provides parameters dynamically:
# LLM generates query parameters
graph.add_node("parameter_generator", None, node_type="llm",
model="gpt-4",
system_prompt="""
Extract user info:
- status: active/inactive
- min_age: number
Return JSON: {"status": "...", "min_age": ...}
""")
# Data handler uses AI-generated params
node = DataHandlerNode(
id="data_handler_0",
db_source="postgres",
query="SELECT * FROM users WHERE status = :status AND age > :min_age",
# No fixed params - will use output from parameter_generator
)
graph.add_edge("START", "parameter_generator")
graph.add_edge("parameter_generator", "data_handler_0")
Pattern 3: Hybrid (Fixed + Interpolated)
Mix of user-defined and AI-generated parameters:
node = DataHandlerNode(
id="data_handler_0",
db_source="postgres",
query="SELECT * FROM logs WHERE app = :app AND user_id = :user_id",
params={
"app": "production" # Fixed
# user_id will be interpolated from AI output
}
)
Database Storage Schema
Store DataHandlerNodes in your database like other tool nodes:
CREATE TABLE mosaic_agent_tool_nodes (
id SERIAL PRIMARY KEY,
node_uuid UUID UNIQUE,
user_id INTEGER,
-- Data Handler specific
label VARCHAR(255),
name VARCHAR(255),
description TEXT,
type VARCHAR(50) DEFAULT 'DataHandler',
-- Configuration (JSON)
inputs JSONB, -- DB source, query, params config
outputs JSONB,
-- Metadata
category VARCHAR(100) DEFAULT 'Data Operations',
icon VARCHAR(100),
is_public BOOLEAN DEFAULT false,
is_verified BOOLEAN DEFAULT false,
active BOOLEAN DEFAULT true,
-- Timestamps
created_at TIMESTAMP DEFAULT NOW(),
updated_at TIMESTAMP DEFAULT NOW()
);
Example Insert:
INSERT INTO mosaic_agent_tool_nodes (
node_uuid, user_id, label, name, type, description, category, inputs
) VALUES (
'e3d06619-dc5f-41ca-9afa-e7c197cbf7f9',
129617,
'User Query Handler',
'get_users',
'DataHandler',
'Query users table with filters',
'Data Operations',
'[
{
"name": "db_source",
"type": "options",
"label": "Database Source",
"options": [
{"name": "postgres", "label": "PostgreSQL"},
{"name": "mysql", "label": "MySQL"},
{"name": "vertica", "label": "Vertica"}
],
"default": "postgres",
"optional": false
},
{
"name": "query",
"type": "code",
"label": "SQL Query",
"rows": 8,
"default": "SELECT * FROM users WHERE status = :status",
"optional": false,
"placeholder": "SELECT * FROM table WHERE column = :param"
},
{
"name": "params",
"type": "code",
"label": "Query Parameters (JSON)",
"rows": 4,
"default": "{\"status\": \"active\"}",
"optional": true,
"placeholder": "{\"param_name\": \"value\"}"
}
]'::jsonb
);
React Flow Integration
Add to UI registry:
// In mesh-app/mesh-ui/src/registry/index.ts
{
type: "dataHandlerAgentflow",
name: "data_handler",
label: "Data Handler",
description: "Execute SQL queries against database sources",
icon: "Database",
category: "Data Operations",
color: "#10b981", // green
inputs: [
{
name: "id",
type: "string",
label: "Node ID (auto-generated)",
placeholder: "data_handler_0",
showInNode: true,
},
{
name: "dbSource",
type: "options",
label: "Database Source",
default: "postgres",
showInNode: true,
options: [
{ name: "postgres", label: "PostgreSQL" },
{ name: "mysql", label: "MySQL" },
{ name: "vertica", label: "Vertica" },
],
},
{
name: "query",
type: "code",
label: "SQL Query",
rows: 8,
placeholder: "SELECT * FROM table WHERE column = :param",
description: "Use :param_name for named parameters",
},
{
name: "params",
type: "code",
label: "Fixed Parameters (JSON)",
rows: 4,
optional: true,
placeholder: '{"param_name": "value"}',
description: "Leave empty for AI-interpolated params",
},
],
outputs: ["rows", "count"],
}
Parser Integration
Add to React Flow parser:
# In mesh/parsers/react_flow.py
from mesh.nodes.data_handler import DataHandlerNode
NODE_TYPE_MAP = {
# ... existing mappings
"dataHandlerAgentflow": "data_handler",
}
def _create_data_handler_node(self, node_id: str, config: Dict[str, Any]) -> DataHandlerNode:
"""Create DataHandlerNode from config."""
return DataHandlerNode(
id=node_id,
db_source=config.get("dbSource") or config.get("db_source", "postgres"),
query=config.get("query", ""),
params=config.get("params", {}),
event_mode=config.get("eventMode", "full"),
config=config,
)
# In _create_node method:
elif node_type == "data_handler":
return self._create_data_handler_node(node_id, config)
Dependency Injection
Inject DB session getter after parsing (like RAGNode):
# In backend/routers/execution.py
from mesh.nodes import DataHandlerNode
# After parsing
graph = parser.parse(request.flow)
# Inject DB session getter into DataHandler nodes
def get_db_session(source: str):
"""Get database session for given source."""
from server.resources.database import DBSession
source_map = {
"postgres": DBSession.postgres,
"mysql": DBSession.master,
"vertica": DBSession.vrt,
}
if source not in source_map:
raise ValueError(f"Unknown database source: {source}")
return source_map[source].value()
for node in graph.nodes.values():
if isinstance(node, DataHandlerNode):
node.set_db_session_getter(get_db_session)
Output Structure
DataHandlerNode returns:
{
"rows": [
{"id": 1, "name": "Alice", "status": "active"},
{"id": 2, "name": "Bob", "status": "active"},
],
"count": 2,
"query": "SELECT * FROM users WHERE status = :status",
"params": {"status": "active"}
}
Access in downstream nodes:
- `` - Array of result rows
- `` - Number of rows returned
- `` - The executed query
Complete Example Flow
from mesh import StateGraph
from mesh.nodes import DataHandlerNode
# 1. Create graph
graph = StateGraph()
# 2. LLM extracts query parameters from user question
graph.add_node("extractor", None, node_type="llm",
model="gpt-4",
system_prompt="""
Extract database query params from:
Return JSON with status and min_age.
""")
# 3. Data handler executes query with AI-extracted params
data_handler = DataHandlerNode(
id="query_users",
db_source="postgres",
query="SELECT * FROM users WHERE status = :status AND age > :min_age",
# No fixed params - uses extractor output
)
# 4. LLM formats results for user
graph.add_node("formatter", None, node_type="llm",
model="gpt-4",
system_prompt="""
Format these query results for the user:
Found users.
""")
# 5. Connect
graph.add_edge("START", "extractor")
graph.add_edge("extractor", "query_users")
graph.add_edge("query_users", "formatter")
# 6. Inject DB session getter
data_handler.set_db_session_getter(get_db_session)
# 7. Execute
result = await graph.run(input="Show me active users over 25")
Advantages Over Generic ToolNode
- Type Safety: Explicit DB source and query validation
- UI Integration: Specialized UI fields (DB dropdown, SQL editor)
- Parameter Resolution: Built-in logic for fixed vs interpolated params
- Session Management: Automatic connection handling and cleanup
- Database Storage: Clean schema for persisting data handlers
- Error Handling: Query-specific error messages with context
When to Use
✅ Use DataHandlerNode when:
- Executing SQL queries as part of agent workflow
- Need to mix fixed and AI-generated query parameters
- Want specialized UI for database operations
- Storing node configurations in database
- Building data-driven agents
❌ Use regular ToolNode when:
- Non-database operations
- Custom Python logic beyond SQL
- Don’t need parameter interpolation
- One-off scripts without reusability
Security Considerations
- SQL Injection: Always use named parameters (
:param_name), never string interpolation - Connection Pooling: Ensure DB session getter uses connection pools
- Query Validation: Consider whitelist of allowed tables/operations
- Parameter Sanitization: Validate AI-generated params before execution
- Access Control: Check user permissions before query execution
Migration from ToolNode
Existing ToolNodes doing database operations can be migrated:
Before (ToolNode):
def query_users(input: dict) -> dict:
session = get_postgres_session()
result = session.execute("SELECT * WHERE id = :id", {"id": input["user_id"]})
return {"rows": result.fetchall()}
graph.add_node("tool", query_users, node_type="tool")
After (DataHandlerNode):
node = DataHandlerNode(
id="query_users",
db_source="postgres",
query="SELECT * WHERE id = :user_id",
)
node.set_db_session_getter(get_db_session)
graph.add_node("query_users", node, node_type="data_handler")
Benefits:
- Less code
- Configurable via UI
- Stored in database
- Better error handling
- Consistent parameter resolution