ReasoningBank Phase 2: Full Academic Implementation
This document outlines how to extend Vel’s current ReasoningBank implementation to match the full capabilities described in the Google Research paper (arxiv:2509.25140).
Status: Roadmap for future development Current Phase: Phase 1 (Infrastructure complete) Target Phase: Phase 2 (Automatic self-evolution)
Current State vs. Target State
Phase 1: Current Implementation ✅
What Vel Has Now:
Component | Status | Description |
---|---|---|
Storage Infrastructure | ✅ Complete | SQLite with rb_strategies and rb_embeddings tables |
Embedding-Based Retrieval | ✅ Complete | Cosine similarity search with hybrid scoring |
Confidence Scoring | ✅ Complete | Incremental updates (±0.1 per outcome) |
Anti-Pattern Storage | ✅ Complete | Accumulates failure notes |
Pre-Run Advice Injection | ✅ Complete | prepare_for_run(signature) returns formatted advice |
Post-Run Confidence Update | ✅ Complete | finalize_outcome(success, fail_notes) updates scores |
Runtime-Owned Memory | ✅ Complete | No LLM tool calls, bounded latency |
What’s Manual:
- ❌ Strategy creation (user must insert)
- ❌ Success/failure evaluation (user provides boolean)
- ❌ Anti-pattern generation (user provides strings)
- ❌ Trajectory analysis (not implemented)
Phase 2: Target Implementation (Google Paper)
What We Need to Add:
Component | Priority | Complexity | Description |
---|---|---|---|
Trajectory Storage | High | Medium | Store raw agent trajectories for analysis |
LLM-as-Judge | High | Medium | Automatic success/failure evaluation |
Strategy Distillation | High | High | Extract strategies from trajectories automatically |
Anti-Pattern Extraction | Medium | Medium | Generate “what to avoid” from failures |
Memory Consolidation | Medium | Medium | Deduplicate and merge similar strategies |
Parallel Scaling | Low | High | Generate multiple trajectories per query |
Sequential Scaling | Low | Medium | Iterative refinement within single trajectory |
Architecture Overview
Current Phase 1 Architecture
┌─────────────────────────────────────────────────────────┐
│ Agent Runtime │
└────────────┬────────────────────────────────────────────┘
│
┌──────┴────────┐
│ ContextManager │
└──────┬────────┘
│
┌─────────┴─────────┐
│ prepare_for_run │ (retrieval)
│ finalize_outcome │ (confidence update)
└─────────┬─────────┘
│
┌─────────┴─────────┐
│ ReasoningBank │
└─────────┬─────────┘
│
┌─────────┴─────────┐
│ ReasoningBankStore│ (SQLite + embeddings)
└───────────────────┘
USER PROVIDES:
- Strategy items
- Success boolean
- Fail notes
Target Phase 2 Architecture
┌─────────────────────────────────────────────────────────┐
│ Agent Runtime │
└────────────┬────────────────────────────────────────────┘
│
┌──────┴────────┐
│ ContextManager │
└──────┬────────┘
│
┌─────────┴──────────────┐
│ prepare_for_run │ (retrieval)
│ finalize_outcome │ (confidence update)
│ record_trajectory │ ← NEW
└─────────┬──────────────┘
│
┌─────────┴─────────────────────┐
│ ReasoningBank │
└─────────┬─────────────────────┘
│
┌─────────┴─────────────────────┐
│ ReasoningBankStore │
│ + TrajectoryStore ← NEW│
└─────────┬─────────────────────┘
│
┌─────────┴─────────────────────┐
│ Strategy Learning Pipeline │ ← NEW
│ │
│ 1. LLMJudge │
│ 2. StrategyExtractor │
│ 3. AntiPatternGenerator │
│ 4. MemoryConsolidator │
└───────────────────────────────┘
AUTOMATIC:
- Trajectory recording
- Success evaluation
- Strategy extraction
- Anti-pattern generation
- Memory consolidation
Implementation Plan
Milestone 1: Trajectory Storage
Goal: Capture and store raw agent trajectories for analysis.
1.1 Database Schema
Add new table for trajectories:
CREATE TABLE rb_trajectories (
id INTEGER PRIMARY KEY,
run_id TEXT NOT NULL,
session_id TEXT,
signature_json TEXT NOT NULL,
messages TEXT NOT NULL, -- JSON array of messages
tool_calls TEXT DEFAULT '[]', -- JSON array of tool calls
final_answer TEXT,
error TEXT,
created_at REAL DEFAULT (strftime('%s','now')),
evaluated BOOLEAN DEFAULT 0, -- Has LLM-as-Judge run?
success BOOLEAN, -- Judge's verdict
strategies_extracted BOOLEAN DEFAULT 0,
UNIQUE(run_id)
);
CREATE INDEX idx_traj_eval ON rb_trajectories(evaluated, success);
CREATE INDEX idx_traj_extracted ON rb_trajectories(strategies_extracted);
1.2 Implementation
Create vel/memory/trajectory_store.py
:
from __future__ import annotations
from dataclasses import dataclass
from typing import Dict, Any, List, Optional
from pathlib import Path
import sqlite3, json
from time import time
@dataclass
class Trajectory:
id: Optional[int]
run_id: str
session_id: Optional[str]
signature: Dict[str, Any]
messages: List[Dict[str, Any]]
tool_calls: List[Dict[str, Any]]
final_answer: Optional[str]
error: Optional[str]
evaluated: bool
success: Optional[bool]
strategies_extracted: bool
class TrajectoryStore:
def __init__(self, db_path: str):
Path(db_path).parent.mkdir(parents=True, exist_ok=True)
self.db = sqlite3.connect(db_path)
self.db.execute("PRAGMA journal_mode=WAL;")
self.db.row_factory = sqlite3.Row
self._init_schema()
def _init_schema(self):
# Schema from above
pass
def record_trajectory(
self,
run_id: str,
signature: Dict[str, Any],
messages: List[Dict[str, Any]],
tool_calls: List[Dict[str, Any]],
final_answer: Optional[str] = None,
error: Optional[str] = None,
session_id: Optional[str] = None
) -> int:
"""Store a trajectory for later analysis."""
self.db.execute("""
INSERT INTO rb_trajectories(run_id, session_id, signature_json, messages, tool_calls, final_answer, error)
VALUES (?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(run_id) DO UPDATE SET
messages=excluded.messages,
tool_calls=excluded.tool_calls,
final_answer=excluded.final_answer,
error=excluded.error
""", (
run_id,
session_id,
json.dumps(signature),
json.dumps(messages),
json.dumps(tool_calls),
final_answer,
error
))
self.db.commit()
return self.db.execute("SELECT id FROM rb_trajectories WHERE run_id=?", (run_id,)).fetchone()["id"]
def get_unevaluated_trajectories(self, limit: int = 100) -> List[Trajectory]:
"""Get trajectories that haven't been evaluated by LLM-as-Judge."""
rows = self.db.execute("""
SELECT * FROM rb_trajectories
WHERE evaluated = 0
ORDER BY created_at ASC
LIMIT ?
""", (limit,)).fetchall()
return [self._row_to_trajectory(r) for r in rows]
def get_successful_unevaluated(self, limit: int = 100) -> List[Trajectory]:
"""Get successful trajectories that haven't had strategies extracted."""
rows = self.db.execute("""
SELECT * FROM rb_trajectories
WHERE evaluated = 1 AND success = 1 AND strategies_extracted = 0
ORDER BY created_at ASC
LIMIT ?
""", (limit,)).fetchall()
return [self._row_to_trajectory(r) for r in rows]
def mark_evaluated(self, trajectory_id: int, success: bool):
"""Mark trajectory as evaluated."""
self.db.execute("""
UPDATE rb_trajectories
SET evaluated=1, success=?
WHERE id=?
""", (1 if success else 0, trajectory_id))
self.db.commit()
def mark_strategies_extracted(self, trajectory_id: int):
"""Mark that strategies have been extracted from this trajectory."""
self.db.execute("""
UPDATE rb_trajectories
SET strategies_extracted=1
WHERE id=?
""", (trajectory_id,))
self.db.commit()
def _row_to_trajectory(self, row) -> Trajectory:
return Trajectory(
id=row["id"],
run_id=row["run_id"],
session_id=row["session_id"],
signature=json.loads(row["signature_json"]),
messages=json.loads(row["messages"]),
tool_calls=json.loads(row["tool_calls"]),
final_answer=row["final_answer"],
error=row["error"],
evaluated=bool(row["evaluated"]),
success=bool(row["success"]) if row["success"] is not None else None,
strategies_extracted=bool(row["strategies_extracted"])
)
1.3 Integration with ContextManager
Update vel/core/context.py
:
class ContextManager:
# ... existing code ...
def record_trajectory(
self,
run_id: str,
signature: Dict[str, Any],
messages: List[Dict[str, Any]],
tool_calls: List[Dict[str, Any]],
final_answer: Optional[str] = None,
error: Optional[str] = None,
session_id: Optional[str] = None
):
"""
Record trajectory for later analysis (Phase 2 feature).
No-op if trajectory storage is not enabled.
"""
traj_store = self._adapters.get("trajectory")
if traj_store:
traj_store.record_trajectory(
run_id, signature, messages, tool_calls,
final_answer, error, session_id
)
Update build_memory_adapters()
to include trajectory store when mode includes “reasoningbank”.
Milestone 2: LLM-as-Judge
Goal: Automatically evaluate whether a trajectory was successful.
2.1 Judge Implementation
Create vel/memory/llm_judge.py
:
from __future__ import annotations
from typing import Dict, Any, List, Optional
from dataclasses import dataclass
@dataclass
class JudgeResult:
success: bool
reasoning: str
confidence: float # 0.0-1.0
class LLMJudge:
"""
Evaluates trajectories for success/failure.
Based on the ReasoningBank paper's LLM-as-Judge approach.
"""
def __init__(self, model_config: Dict[str, Any]):
"""
Args:
model_config: {'provider': 'anthropic', 'model': 'claude-sonnet-4'}
"""
self.model_config = model_config
async def evaluate_trajectory(
self,
signature: Dict[str, Any],
messages: List[Dict[str, Any]],
tool_calls: List[Dict[str, Any]],
final_answer: Optional[str],
error: Optional[str]
) -> JudgeResult:
"""
Evaluate if the trajectory successfully completed its task.
Returns:
JudgeResult with success boolean, reasoning, and confidence
"""
from vel import Agent
# Format trajectory for analysis
trajectory_text = self._format_trajectory(
signature, messages, tool_calls, final_answer, error
)
# Create judge agent
judge = Agent(
id='llm-judge:v1',
model=self.model_config,
tools=[]
)
prompt = self._build_judge_prompt(trajectory_text)
# Get judgment
result = await judge.run({"message": prompt})
# Parse result
return self._parse_judge_response(result)
def _format_trajectory(
self,
signature: Dict[str, Any],
messages: List[Dict[str, Any]],
tool_calls: List[Dict[str, Any]],
final_answer: Optional[str],
error: Optional[str]
) -> str:
"""Format trajectory for judge analysis."""
lines = [
"=== TASK SIGNATURE ===",
f"Intent: {signature.get('intent', 'unknown')}",
f"Domain: {signature.get('domain', 'unknown')}",
f"Risk: {signature.get('risk', 'unknown')}",
"",
"=== TRAJECTORY ===",
]
for i, msg in enumerate(messages, 1):
role = msg.get('role', 'unknown')
content = str(msg.get('content', ''))[:500] # Truncate long messages
lines.append(f"[{i}] {role}: {content}")
if tool_calls:
lines.append("")
lines.append("=== TOOL CALLS ===")
for i, tool in enumerate(tool_calls, 1):
lines.append(f"[{i}] {tool.get('name', 'unknown')}: {tool.get('args', {})}")
if final_answer:
lines.append("")
lines.append("=== FINAL ANSWER ===")
lines.append(final_answer)
if error:
lines.append("")
lines.append("=== ERROR ===")
lines.append(error)
return "\n".join(lines)
def _build_judge_prompt(self, trajectory_text: str) -> str:
"""Build prompt for LLM judge."""
return f"""You are an expert evaluator analyzing an AI agent's task execution.
Your job is to determine whether the agent successfully completed its intended task.
Criteria for SUCCESS:
- Agent understood and addressed the core task
- Final answer is relevant and helpful
- No critical errors or failures
- Reasoning was sound
Criteria for FAILURE:
- Agent misunderstood the task
- Got stuck in loops or errors
- Produced irrelevant or incorrect output
- Critical errors occurred
Analyze the trajectory below and respond in JSON format:
success
{trajectory_text}
Respond ONLY with the JSON object, no other text."""
def _parse_judge_response(self, response: str) -> JudgeResult:
"""Parse judge's JSON response."""
import json
import re
# Extract JSON from response (might have markdown code blocks)
json_match = re.search(r'\{[^\}]+\}', response, re.DOTALL)
if not json_match:
# Default to failure if can't parse
return JudgeResult(success=False, reasoning="Parse error", confidence=0.5)
try:
data = json.loads(json_match.group(0))
return JudgeResult(
success=bool(data.get("success", False)),
reasoning=str(data.get("reasoning", "")),
confidence=float(data.get("confidence", 0.5))
)
except Exception:
return JudgeResult(success=False, reasoning="Parse error", confidence=0.5)
2.2 Background Evaluation Worker
Create vel/memory/evaluation_worker.py
:
from __future__ import annotations
import asyncio
from typing import Optional
from .trajectory_store import TrajectoryStore
from .llm_judge import LLMJudge
class EvaluationWorker:
"""
Background worker that continuously evaluates unevaluated trajectories.
"""
def __init__(
self,
trajectory_store: TrajectoryStore,
judge: LLMJudge,
batch_size: int = 10,
interval_seconds: int = 60
):
self.trajectory_store = trajectory_store
self.judge = judge
self.batch_size = batch_size
self.interval_seconds = interval_seconds
self._running = False
self._task: Optional[asyncio.Task] = None
async def start(self):
"""Start the background evaluation worker."""
if self._running:
return
self._running = True
self._task = asyncio.create_task(self._run_loop())
async def stop(self):
"""Stop the background evaluation worker."""
self._running = False
if self._task:
await self._task
async def _run_loop(self):
"""Main evaluation loop."""
while self._running:
try:
await self._evaluate_batch()
except Exception as e:
# Log error but keep running
print(f"Evaluation worker error: {e}")
# Wait before next batch
await asyncio.sleep(self.interval_seconds)
async def _evaluate_batch(self):
"""Evaluate a batch of trajectories."""
trajectories = self.trajectory_store.get_unevaluated_trajectories(self.batch_size)
for traj in trajectories:
try:
# Evaluate with LLM judge
result = await self.judge.evaluate_trajectory(
signature=traj.signature,
messages=traj.messages,
tool_calls=traj.tool_calls,
final_answer=traj.final_answer,
error=traj.error
)
# Mark as evaluated
self.trajectory_store.mark_evaluated(traj.id, result.success)
except Exception as e:
print(f"Error evaluating trajectory {traj.id}: {e}")
# Mark as evaluated (failure) to avoid retry loop
self.trajectory_store.mark_evaluated(traj.id, False)
Milestone 3: Strategy Distillation
Goal: Automatically extract strategies from successful trajectories.
3.1 Strategy Extractor Implementation
Create vel/memory/strategy_extractor.py
:
from __future__ import annotations
from typing import Dict, Any, List, Optional
from dataclasses import dataclass
@dataclass
class ExtractedStrategy:
strategy_text: str
anti_patterns: List[str]
confidence: float
reasoning: str
class StrategyExtractor:
"""
Extracts generalizable strategies from successful trajectories.
Based on the ReasoningBank paper's strategy distillation approach.
"""
def __init__(self, model_config: Dict[str, Any]):
self.model_config = model_config
async def extract_strategy(
self,
signature: Dict[str, Any],
messages: List[Dict[str, Any]],
tool_calls: List[Dict[str, Any]],
final_answer: str
) -> Optional[ExtractedStrategy]:
"""
Extract a generalizable strategy from a successful trajectory.
Returns:
ExtractedStrategy if one can be extracted, None otherwise
"""
from vel import Agent
# Format trajectory
trajectory_text = self._format_trajectory(
signature, messages, tool_calls, final_answer
)
# Create extractor agent
extractor = Agent(
id='strategy-extractor:v1',
model=self.model_config,
tools=[]
)
prompt = self._build_extraction_prompt(trajectory_text)
# Extract strategy
result = await extractor.run({"message": prompt})
# Parse result
return self._parse_extraction_response(result)
def _format_trajectory(
self,
signature: Dict[str, Any],
messages: List[Dict[str, Any]],
tool_calls: List[Dict[str, Any]],
final_answer: str
) -> str:
"""Format trajectory for strategy extraction."""
lines = [
"=== TASK CONTEXT ===",
f"Intent: {signature.get('intent', 'unknown')}",
f"Domain: {signature.get('domain', 'unknown')}",
"",
"=== EXECUTION STEPS ===",
]
for i, msg in enumerate(messages, 1):
role = msg.get('role', 'unknown')
content = str(msg.get('content', ''))[:300]
lines.append(f"Step {i} [{role}]: {content}")
if tool_calls:
lines.append("")
lines.append("=== TOOLS USED ===")
for tool in tool_calls:
lines.append(f"- {tool.get('name', 'unknown')}")
lines.append("")
lines.append("=== OUTCOME ===")
lines.append(final_answer[:500])
return "\n".join(lines)
def _build_extraction_prompt(self, trajectory_text: str) -> str:
"""Build prompt for strategy extraction."""
return f"""You are an expert at analyzing AI agent reasoning patterns.
Your task is to extract a GENERALIZABLE reasoning strategy from this successful execution.
The strategy should be:
- One clear sentence
- Applicable to similar tasks (not specific to this exact case)
- Actionable (describes HOW to think, not WHAT to think)
Also identify 1-3 anti-patterns (things to AVOID) based on potential failure modes.
Analyze the trajectory and respond in JSON format:
strategy_text
{trajectory_text}
Respond ONLY with the JSON object, no other text.
IMPORTANT:
- Strategy must be generalizable (not "Use API key abc123")
- Focus on reasoning approach, not specific actions
- Anti-patterns should be cautionary, not just negations"""
def _parse_extraction_response(self, response: str) -> Optional[ExtractedStrategy]:
"""Parse extractor's JSON response."""
import json
import re
json_match = re.search(r'\{[^\}]+\}', response, re.DOTALL)
if not json_match:
return None
try:
data = json.loads(json_match.group(0))
# Validate strategy quality
strategy_text = data.get("strategy_text", "").strip()
if len(strategy_text) < 10 or len(strategy_text) > 200:
return None # Too short or too long
return ExtractedStrategy(
strategy_text=strategy_text,
anti_patterns=data.get("anti_patterns", [])[:3], # Max 3
confidence=float(data.get("confidence", 0.6)),
reasoning=str(data.get("reasoning", ""))
)
except Exception:
return None
3.2 Strategy Extraction Worker
Create vel/memory/extraction_worker.py
:
from __future__ import annotations
import asyncio
from typing import Optional
from .trajectory_store import TrajectoryStore
from .strategy_reasoningbank import ReasoningBankStore
from .strategy_extractor import StrategyExtractor
class ExtractionWorker:
"""
Background worker that extracts strategies from successful trajectories.
"""
def __init__(
self,
trajectory_store: TrajectoryStore,
reasoning_bank: ReasoningBankStore,
extractor: StrategyExtractor,
batch_size: int = 10,
interval_seconds: int = 120
):
self.trajectory_store = trajectory_store
self.reasoning_bank = reasoning_bank
self.extractor = extractor
self.batch_size = batch_size
self.interval_seconds = interval_seconds
self._running = False
self._task: Optional[asyncio.Task] = None
async def start(self):
"""Start the background extraction worker."""
if self._running:
return
self._running = True
self._task = asyncio.create_task(self._run_loop())
async def stop(self):
"""Stop the background extraction worker."""
self._running = False
if self._task:
await self._task
async def _run_loop(self):
"""Main extraction loop."""
while self._running:
try:
await self._extract_batch()
except Exception as e:
print(f"Extraction worker error: {e}")
await asyncio.sleep(self.interval_seconds)
async def _extract_batch(self):
"""Extract strategies from a batch of successful trajectories."""
trajectories = self.trajectory_store.get_successful_unevaluated(self.batch_size)
for traj in trajectories:
try:
# Extract strategy
strategy = await self.extractor.extract_strategy(
signature=traj.signature,
messages=traj.messages,
tool_calls=traj.tool_calls,
final_answer=traj.final_answer or ""
)
if strategy:
# Check for similar existing strategies
existing = self.reasoning_bank.retrieve(traj.signature, k=5, min_conf=0.0)
# Simple deduplication: skip if very similar strategy exists
if not self._is_duplicate(strategy.strategy_text, existing):
# Add to ReasoningBank
self.reasoning_bank.upsert_strategy(
signature=traj.signature,
strategy_text=strategy.strategy_text,
anti_patterns=strategy.anti_patterns,
evidence_refs=[traj.run_id],
confidence=strategy.confidence
)
# Mark as extracted
self.trajectory_store.mark_strategies_extracted(traj.id)
except Exception as e:
print(f"Error extracting from trajectory {traj.id}: {e}")
# Mark as extracted to avoid retry loop
self.trajectory_store.mark_strategies_extracted(traj.id)
def _is_duplicate(self, new_strategy: str, existing_strategies) -> bool:
"""
Simple duplicate check using string similarity.
In production, use more sophisticated similarity (embeddings, etc.)
"""
new_lower = new_strategy.lower()
for existing in existing_strategies:
existing_lower = existing.strategy_text.lower()
# Very simple: check if >70% of words overlap
new_words = set(new_lower.split())
existing_words = set(existing_lower.split())
if len(new_words & existing_words) / max(len(new_words), 1) > 0.7:
return True
return False
Milestone 4: Memory Consolidation
Goal: Merge similar strategies to prevent redundancy.
4.1 Strategy Consolidator
Create vel/memory/consolidator.py
:
from __future__ import annotations
from typing import List, Dict, Any
import numpy as np
from .strategy_reasoningbank import ReasoningBankStore, StrategyItem
class MemoryConsolidator:
"""
Merges similar strategies to prevent memory bloat.
Based on the ReasoningBank paper's consolidation approach.
"""
def __init__(
self,
reasoning_bank: ReasoningBankStore,
similarity_threshold: float = 0.85
):
self.reasoning_bank = reasoning_bank
self.similarity_threshold = similarity_threshold
def consolidate_strategies(self, signature: Dict[str, Any]) -> int:
"""
Find and merge similar strategies for a given signature.
Returns:
Number of strategies merged
"""
# Get all strategies for this signature
strategies = self.reasoning_bank.retrieve(signature, k=100, min_conf=0.0)
if len(strategies) < 2:
return 0
# Build similarity matrix
embeddings = self._get_embeddings(strategies)
similarity_matrix = self._compute_similarity_matrix(embeddings)
# Find clusters of similar strategies
clusters = self._find_clusters(similarity_matrix, self.similarity_threshold)
# Merge each cluster
merged_count = 0
for cluster in clusters:
if len(cluster) > 1:
self._merge_cluster([strategies[i] for i in cluster])
merged_count += len(cluster) - 1
return merged_count
def _get_embeddings(self, strategies: List[StrategyItem]) -> np.ndarray:
"""Get embeddings for all strategies."""
# In real implementation, retrieve from database
# For now, re-encode (inefficient but simple)
texts = [s.strategy_text for s in strategies]
return self.reasoning_bank.emb.encode(texts)
def _compute_similarity_matrix(self, embeddings: np.ndarray) -> np.ndarray:
"""Compute pairwise cosine similarity."""
# Normalize
norms = np.linalg.norm(embeddings, axis=1, keepdims=True)
normalized = embeddings / (norms + 1e-8)
# Compute similarity
return normalized @ normalized.T
def _find_clusters(
self,
similarity_matrix: np.ndarray,
threshold: float
) -> List[List[int]]:
"""Find clusters of similar strategies using simple threshold."""
n = similarity_matrix.shape[0]
visited = set()
clusters = []
for i in range(n):
if i in visited:
continue
# Start new cluster
cluster = [i]
visited.add(i)
# Find all similar strategies
for j in range(i + 1, n):
if j not in visited and similarity_matrix[i, j] >= threshold:
cluster.append(j)
visited.add(j)
if len(cluster) > 1:
clusters.append(cluster)
return clusters
def _merge_cluster(self, strategies: List[StrategyItem]):
"""
Merge a cluster of similar strategies into one.
Strategy:
- Keep the highest confidence strategy as base
- Merge anti-patterns from all
- Merge evidence refs from all
- Update confidence to average
"""
# Sort by confidence
sorted_strategies = sorted(strategies, key=lambda s: s.confidence, reverse=True)
base = sorted_strategies[0]
# Merge anti-patterns
all_anti_patterns = set()
for s in strategies:
all_anti_patterns.update(s.anti_patterns)
# Merge evidence refs
all_evidence = set()
for s in strategies:
all_evidence.update(s.evidence_refs)
# Average confidence
avg_confidence = sum(s.confidence for s in strategies) / len(strategies)
# Update base strategy
self.reasoning_bank.db.execute("""
UPDATE rb_strategies
SET anti_patterns = ?,
evidence_refs = ?,
confidence = ?
WHERE id = ?
""", (
json.dumps(list(all_anti_patterns)),
json.dumps(list(all_evidence)),
avg_confidence,
base.id
))
# Delete merged strategies
for s in strategies[1:]:
self.reasoning_bank.db.execute("""
DELETE FROM rb_strategies WHERE id = ?
""", (s.id,))
self.reasoning_bank.db.execute("""
DELETE FROM rb_embeddings WHERE strategy_id = ?
""", (s.id,))
self.reasoning_bank.db.commit()
Milestone 5: Configuration and Integration
Goal: Make Phase 2 features opt-in and configurable.
5.1 Extended Memory Config
Update vel/core/context.py
:
@dataclass
class MemoryConfig:
"""
Memory configuration.
Phase 1 fields (existing):
mode: "none" | "facts" | "reasoning" | "all"
db_path: SQLite file path
rb_top_k: top-k strategies to retrieve
embeddings_fn: embedding function
Phase 2 fields (new):
enable_auto_learning: Enable automatic strategy learning
judge_model: Model config for LLM-as-Judge
extractor_model: Model config for strategy extraction
eval_interval_seconds: How often to run evaluation worker
extraction_interval_seconds: How often to run extraction worker
consolidation_interval_seconds: How often to consolidate
min_confidence_threshold: Prune strategies below this confidence
"""
# Phase 1 (existing)
mode: str = "none"
db_path: str = ".vel/vel.db"
rb_top_k: int = 5
embeddings_fn: Optional[Callable[[List[str]], "object"]] = None
# Phase 2 (new)
enable_auto_learning: bool = False
judge_model: Optional[Dict[str, Any]] = None
extractor_model: Optional[Dict[str, Any]] = None
eval_interval_seconds: int = 60
extraction_interval_seconds: int = 120
consolidation_interval_seconds: int = 3600
min_confidence_threshold: float = 0.3
5.2 Auto-Learning Manager
Create vel/memory/auto_learning.py
:
from __future__ import annotations
from typing import Optional
from .trajectory_store import TrajectoryStore
from .llm_judge import LLMJudge
from .strategy_extractor import StrategyExtractor
from .evaluation_worker import EvaluationWorker
from .extraction_worker import ExtractionWorker
from .consolidator import MemoryConsolidator
from ..core.context import MemoryConfig
class AutoLearningManager:
"""
Manages all Phase 2 automatic learning components.
"""
def __init__(
self,
config: MemoryConfig,
trajectory_store: TrajectoryStore,
reasoning_bank_store
):
self.config = config
self.trajectory_store = trajectory_store
self.reasoning_bank = reasoning_bank_store
# Initialize components
self.judge = LLMJudge(config.judge_model or {"provider": "anthropic", "model": "claude-sonnet-4"})
self.extractor = StrategyExtractor(config.extractor_model or {"provider": "anthropic", "model": "claude-sonnet-4"})
self.consolidator = MemoryConsolidator(reasoning_bank_store)
# Workers
self.eval_worker = EvaluationWorker(
trajectory_store,
self.judge,
interval_seconds=config.eval_interval_seconds
)
self.extraction_worker = ExtractionWorker(
trajectory_store,
reasoning_bank_store,
self.extractor,
interval_seconds=config.extraction_interval_seconds
)
async def start(self):
"""Start all background workers."""
await self.eval_worker.start()
await self.extraction_worker.start()
async def stop(self):
"""Stop all background workers."""
await self.eval_worker.stop()
await self.extraction_worker.stop()
5.3 Environment Variables
Add to .env.example
:
# Memory Phase 2: Auto-Learning (optional)
VEL_ENABLE_AUTO_LEARNING=false
VEL_JUDGE_MODEL=anthropic:claude-sonnet-4
VEL_EXTRACTOR_MODEL=anthropic:claude-sonnet-4
VEL_EVAL_INTERVAL=60
VEL_EXTRACTION_INTERVAL=120
Implementation Phases
Phase 2.1: Foundation (Weeks 1-2)
- Implement
TrajectoryStore
- Add trajectory recording to
ContextManager
- Update
build_memory_adapters()
to include trajectory store - Write tests for trajectory storage
- Update documentation
Deliverable: Trajectories are recorded automatically
Phase 2.2: Evaluation (Weeks 3-4)
- Implement
LLMJudge
- Implement
EvaluationWorker
- Add configuration for judge model
- Write tests for judge accuracy
- Add metrics/logging for evaluation
Deliverable: Trajectories are automatically evaluated
Phase 2.3: Extraction (Weeks 5-6)
- Implement
StrategyExtractor
- Implement
ExtractionWorker
- Add deduplication logic
- Write tests for extraction quality
- Add metrics/logging for extraction
Deliverable: Strategies are automatically extracted
Phase 2.4: Consolidation (Weeks 7-8)
- Implement
MemoryConsolidator
- Add periodic consolidation job
- Add confidence decay mechanism
- Implement pruning of low-confidence strategies
- Write tests for consolidation
Deliverable: Memory is automatically maintained
Phase 2.5: Integration (Weeks 9-10)
- Implement
AutoLearningManager
- Update
MemoryConfig
with Phase 2 fields - Add opt-in configuration
- Comprehensive integration tests
- Performance benchmarks
- Update all documentation
Deliverable: Full Phase 2 system operational
Testing Strategy
Unit Tests
# Test trajectory storage
def test_trajectory_store_record()
def test_trajectory_store_retrieval()
# Test LLM judge
async def test_judge_success_case()
async def test_judge_failure_case()
# Test strategy extraction
async def test_extract_valid_strategy()
async def test_extract_rejects_poor_quality()
# Test consolidation
def test_find_similar_strategies()
def test_merge_strategies()
Integration Tests
# End-to-end auto-learning
async def test_full_learning_pipeline()
async def test_trajectory_to_strategy()
async def test_confidence_updates_over_time()
Quality Metrics
Track these metrics in production:
- Judge accuracy (compare to human evaluation)
- Strategy quality (user ratings)
- Deduplication effectiveness (clusters found)
- Memory growth rate (strategies per day)
- Retrieval relevance (top-K accuracy)
Backwards Compatibility
Ensuring Phase 1 Continues to Work
# Phase 1 usage (manual) - still works
mem = MemoryConfig(mode="reasoning", embeddings_fn=encode)
ctx = ContextManager()
ctx.set_memory_config(mem)
# Manually add strategies
rb = ctx._adapters.get("rb")
rb.store.upsert_strategy(...)
# Phase 2 usage (automatic) - opt-in
mem = MemoryConfig(
mode="reasoning",
embeddings_fn=encode,
enable_auto_learning=True, # NEW: opt-in
judge_model={"provider": "anthropic", "model": "claude-sonnet-4"},
extractor_model={"provider": "anthropic", "model": "claude-sonnet-4"}
)
All Phase 1 features remain unchanged. Phase 2 is 100% opt-in.
Performance Considerations
Cost Analysis
Phase 2 adds LLM calls for evaluation and extraction:
Operation | LLM Calls per Trajectory | Approx. Cost (Claude Sonnet 4) |
---|---|---|
Judge evaluation | 1 | ~$0.01 |
Strategy extraction | 1 (only for successful) | ~$0.02 |
Total per successful trajectory | 2 | ~$0.03 |
With 100 runs/day:
- Cost: ~$3/day
- Strategies learned: ~50/day (assuming 50% success rate)
Latency
All Phase 2 operations happen asynchronously:
- Agent execution: No added latency
- Trajectory recording: <1ms (async)
- Evaluation: Background worker (no impact)
- Extraction: Background worker (no impact)
Alternative Approaches
Option 1: On-Demand Learning
Instead of background workers, learn on-demand:
async def learn_from_run(run_id, ctx):
"""Manually trigger learning for a specific run."""
manager = ctx._adapters.get("auto_learning")
await manager.process_run(run_id)
Pros: More control, lower cost Cons: Manual trigger required
Option 2: Batch Learning
Run learning once per day on all trajectories:
# Cron job
python -m vel.memory.batch_learn --db .vel/vel.db
Pros: Lower cost, easier to monitor Cons: Not real-time, delayed learning
Option 3: Hybrid Approach
Combine background workers with manual triggers:
# Background workers for critical cases
if signature.get("risk") == "high":
await manager.process_immediately(run_id)
else:
# Background worker will pick it up later
pass
Success Metrics
Track these to measure Phase 2 effectiveness:
- Learning Rate: Strategies added per day
- Quality Score: User ratings of generated strategies
- Coverage: % of signature space with strategies
- Reuse Rate: How often strategies are retrieved
- Confidence Evolution: Average confidence over time
- Deduplication Rate: Clusters merged per week
References
- Google ReasoningBank Paper: arxiv:2509.25140
- Vel Phase 1 Implementation:
vel/memory/strategy_reasoningbank.py
- claude-flow Implementation: github.com/ruvnet/claude-flow
Questions for Discussion
Before implementing, consider:
- Judge Model: Use fast/cheap model (Haiku) or accurate/expensive (Sonnet)?
- Extraction Frequency: Real-time vs batched vs on-demand?
- Cost Controls: Max strategies per day? Budget limits?
- Quality Controls: Human-in-the-loop review? Approval workflows?
- Privacy: Trajectory storage retention policy?
Conclusion
Phase 2 transforms Vel’s ReasoningBank from a manual infrastructure into a self-evolving memory system. By adding automatic trajectory recording, LLM-based evaluation, and strategy extraction, agents can learn from experience without human intervention.
The implementation is designed to be:
- ✅ Opt-in (Phase 1 unaffected)
- ✅ Async (no latency impact)
- ✅ Modular (can enable parts independently)
- ✅ Cost-aware (configurable intervals)
- ✅ Backwards compatible (existing code works)
Estimated Timeline: 10 weeks for full Phase 2 implementation Estimated Cost: ~$3/day per 100 agent runs (with Claude Sonnet 4)
Ready to implement when priorities align with automatic agent learning goals.