DataHandlerNode - Database Integration

How to integrate DataHandlerNode with your existing mosaic_agent_tool_nodes table.

Existing Table Structure

You already have:

mosaic_agent_tool_nodes (
    id, node_uuid, user_id,
    code, imports, label, name, version,
    type, icon, category, description,
    base_classes, inputs, outputs,
    credential, file_path,
    active, is_used, is_public, is_verified,
    language, args,
    created_at, updated_at
)

Storing DataHandlerNodes

Simply insert with type='DataHandler':

INSERT INTO mosaic_agent_tool_nodes (
    node_uuid,
    user_id,
    code,           -- Empty for DataHandler (no custom Python needed)
    imports,        -- '[]'
    label,          -- 'Query Active Users'
    name,           -- 'query_active_users'
    version,        -- 1
    type,           -- 'DataHandler' (KEY FIELD)
    icon,           -- 'Database'
    category,       -- 'Data Operations'
    description,    -- 'Queries users table with filters'
    base_classes,   -- '["DataHandler"]'
    inputs,         -- JSON array with db_source, query, params
    outputs,        -- '["rows", "count"]'
    args,           -- JSON array (same as inputs for backwards compat)
    active,         -- true
    is_public,      -- false
    language,       -- 'sql'
    is_verified     -- false
) VALUES (
    'e3d06619-dc5f-41ca-9afa-e7c197cbf7f9',
    129617,
    '',  -- No Python code needed
    '[]',
    'Query Active Users',
    'query_active_users',
    1,
    'DataHandler',  -- Type distinguishes from Tool
    'Database',
    'Data Operations',
    'Retrieve active users from database',
    '["DataHandler"]',
    '[
        {
            "name": "db_source",
            "type": "options",
            "label": "Database Source",
            "options": [
                {"name": "postgres", "label": "PostgreSQL"},
                {"name": "mysql", "label": "MySQL"}
            ],
            "default": "postgres",
            "optional": false
        },
        {
            "name": "query",
            "type": "code",
            "label": "SQL Query",
            "default": "SELECT id, name, email FROM users WHERE status = :status",
            "optional": false
        },
        {
            "name": "params",
            "type": "code",
            "label": "Query Parameters (JSON)",
            "default": "{\"status\": \"active\"}",
            "optional": true
        }
    ]'::jsonb,
    '["rows", "count"]'::jsonb,
    '[
        {
            "name": "db_source",
            "type": "options",
            "default": "postgres"
        },
        {
            "name": "query",
            "type": "code",
            "default": "SELECT id, name FROM users WHERE status = :status"
        },
        {
            "name": "params",
            "type": "code",
            "default": "{\"status\": \"active\"}"
        }
    ]'::jsonb,
    true,
    false,
    'sql',  -- Language field = 'sql' for DataHandlers
    false
);

Loading from Database

Create a loader that reads from your table and instantiates the correct node type:

# backend/loaders/node_loader.py

from typing import Dict, Any, List
import json
from mesh.nodes import DataHandlerNode, ToolNode

def load_node_from_db(record: Dict[str, Any]):
    """Load node from mosaic_agent_tool_nodes record.

    Args:
        record: Database record dict

    Returns:
        DataHandlerNode or ToolNode instance
    """
    node_type = record.get('type', 'Tool')
    node_uuid = record['node_uuid']

    if node_type == 'DataHandler':
        return _load_data_handler(record)
    elif node_type == 'Tool':
        return _load_tool(record)
    else:
        raise ValueError(f"Unknown node type: {node_type}")


def _load_data_handler(record: Dict[str, Any]) -> DataHandlerNode:
    """Load DataHandlerNode from DB record."""
    inputs = record.get('inputs', [])
    if isinstance(inputs, str):
        inputs = json.loads(inputs)

    # Extract config from inputs
    db_source = None
    query = None
    params = {}

    for inp in inputs:
        name = inp.get('name')
        default = inp.get('default')

        if name == 'db_source':
            db_source = default
        elif name == 'query':
            query = default
        elif name == 'params':
            if isinstance(default, str):
                params = json.loads(default) if default else {}
            else:
                params = default or {}

    if not db_source or not query:
        raise ValueError(
            f"DataHandler '{record['name']}' missing db_source or query"
        )

    return DataHandlerNode(
        id=record['node_uuid'],
        db_source=db_source,
        query=query,
        params=params,
    )


def _load_tool(record: Dict[str, Any]) -> ToolNode:
    """Load regular ToolNode from DB record."""
    # Execute user's Python code
    code = record.get('code', '')
    imports = record.get('imports', '[]')

    if isinstance(imports, str):
        imports = json.loads(imports)

    # Import dependencies
    for imp in imports:
        exec(imp)

    # Execute code to get function
    namespace = {}
    exec(code, namespace)

    # Find the function
    func_name = record['name']
    if func_name not in namespace:
        raise ValueError(f"Function '{func_name}' not found in code")

    tool_fn = namespace[func_name]

    return ToolNode(
        id=record['node_uuid'],
        tool_fn=tool_fn,
    )

Backend Registry Integration

Update your backend to load nodes from database:

# backend/registry.py

from backend.loaders.node_loader import load_node_from_db
from backend.database import get_db_connection

def load_nodes_from_database(user_id: int):
    """Load user's custom nodes from database."""
    conn = get_db_connection()

    # Query all active nodes for user
    records = conn.execute("""
        SELECT * FROM mosaic_agent_tool_nodes
        WHERE user_id = :user_id
        AND active = true
        ORDER BY created_at DESC
    """, {"user_id": user_id}).fetchall()

    nodes = {}
    for record in records:
        record_dict = dict(record)
        try:
            node = load_node_from_db(record_dict)
            nodes[record_dict['node_uuid']] = node
        except Exception as e:
            print(f"Warning: Failed to load node {record_dict['name']}: {e}")

    return nodes


# In main.py or registry setup
def create_registry(user_id: int):
    """Create registry with user's custom nodes."""
    registry = NodeRegistry()

    # Load from database
    custom_nodes = load_nodes_from_database(user_id)

    for node_uuid, node in custom_nodes.items():
        if isinstance(node, DataHandlerNode):
            registry.register_tool(node_uuid, node)
        elif isinstance(node, ToolNode):
            registry.register_tool(node_uuid, node.tool_fn)

    return registry

Execution with Dependency Injection

In your execution endpoint:

# backend/routers/execution.py

from mesh.nodes import DataHandlerNode, RAGNode

@router.post("/execute")
async def execute_graph(request: ExecuteRequest, req: Request):
    # Parse graph
    graph = parser.parse(request.flow)

    # Inject database session getter into DataHandlerNodes
    from server.resources.database import DBSession

    def get_db_session(source: str):
        source_map = {
            "postgres": DBSession.postgres,
            "mysql": DBSession.master,
            "vertica": DBSession.vrt,
        }
        return source_map.get(source).value()

    for node in graph.nodes.values():
        if isinstance(node, DataHandlerNode):
            node.set_db_session_getter(get_db_session)
        elif isinstance(node, RAGNode):
            node.set_retriever(rag_retriever)

    # Execute
    executor = Executor(graph, MemoryBackend())
    # ...

React Flow UI Integration

The UI loads available DataHandlerNodes from the backend:

// Frontend fetches user's custom nodes
const { data: customNodes } = await fetch('/api/nodes/custom');

// Filter DataHandlers for dropdown
const dataHandlers = customNodes.filter(n => n.type === 'DataHandler');

// In node palette
{
  category: "Data Operations",
  nodes: dataHandlers.map(dh => ({
    type: "dataHandlerAgentflow",
    name: dh.node_uuid,
    label: dh.label,
    description: dh.description,
    // ... config from dh.inputs
  }))
}

Example: Creating via API

Endpoint to create DataHandler nodes:

# backend/routers/nodes.py

@router.post("/nodes/data-handler")
async def create_data_handler(
    label: str,
    db_source: str,
    query: str,
    params: Dict[str, Any] = None,
    user_id: int = Depends(get_current_user_id)
):
    """Create a new DataHandler node."""
    node_uuid = str(uuid.uuid4())

    inputs = [
        {
            "name": "db_source",
            "type": "options",
            "default": db_source,
        },
        {
            "name": "query",
            "type": "code",
            "default": query,
        },
        {
            "name": "params",
            "type": "code",
            "default": json.dumps(params or {}),
        }
    ]

    conn.execute("""
        INSERT INTO mosaic_agent_tool_nodes (
            node_uuid, user_id, label, name, type,
            inputs, args, active, is_public
        ) VALUES (
            :node_uuid, :user_id, :label, :name, 'DataHandler',
            :inputs, :args, true, false
        )
    """, {
        "node_uuid": node_uuid,
        "user_id": user_id,
        "label": label,
        "name": label.lower().replace(" ", "_"),
        "inputs": json.dumps(inputs),
        "args": json.dumps(inputs),
    })

    return {"node_uuid": node_uuid, "label": label}

Query Examples

1. Fixed Parameters (User Config):

-- User creates this via UI/API
INSERT INTO mosaic_agent_tool_nodes (..., inputs) VALUES (...,
    '[
        {"name": "db_source", "default": "postgres"},
        {"name": "query", "default": "SELECT * FROM products WHERE category = :category"},
        {"name": "params", "default": "{\"category\": \"electronics\"}"}
    ]'
);

2. AI-Interpolated Parameters:

-- No fixed params - AI will provide them
INSERT INTO mosaic_agent_tool_nodes (..., inputs) VALUES (...,
    '[
        {"name": "db_source", "default": "postgres"},
        {"name": "query", "default": "SELECT * FROM users WHERE id = :user_id"},
        {"name": "params", "default": "{}"}
    ]'
);

-- In graph: LLM extracts user_id → DataHandler uses it

Migration Path

Existing Tool Nodes:

-- These stay as-is
SELECT * FROM mosaic_agent_tool_nodes WHERE type = 'Tool';

New DataHandler Nodes:

-- Add these with type = 'DataHandler'
INSERT INTO mosaic_agent_tool_nodes (type, ...) VALUES ('DataHandler', ...);

Query Both:

-- Backend loads both types
SELECT * FROM mosaic_agent_tool_nodes
WHERE user_id = :user_id
AND type IN ('Tool', 'DataHandler')
AND active = true;

Benefits

No schema changes - Uses existing table ✅ Type discrimination - type field distinguishes DataHandler from Tool ✅ Backwards compatible - Existing Tool nodes still work ✅ Clean separation - DataHandlers don’t need Python code field ✅ Database-driven - All configs stored in DB, loadable at runtime ✅ Per-user - Each user can create their own DataHandlers