Add persistent memory and task result storage to ControlFlow agents with CortexDB.
ControlFlow Integration
CortexDB integrates with ControlFlow to provide persistent agent memory and task result storage. ControlFlow decomposes problems into tasks assigned to agents — CortexDB gives those agents long-term memory that persists across flow executions.
Installation
pip install cortexdb-controlflow
Quick Start
from cortexdb import Cortex
from cortexdb_controlflow import CortexDBMemory, CortexDBTaskStore
client = Cortex(base_url="http://localhost:3141")
memory = CortexDBMemory(client=client, tenant_id="my-app")
task_store = CortexDBTaskStore(client=client, tenant_id="my-app")
# Store agent observations
memory.store("The API rate limit is 100 req/min.", agent_id="researcher")
# Retrieve relevant context
results = memory.search("rate limits", agent_id="researcher")
# Persist task results for future flows
task_store.save_result(
task_id="api-audit",
flow_id="weekly-review",
result={"rate_limit": 100, "status": "healthy"},
)
Agent Memory
CortexDBMemory provides each ControlFlow agent with durable memory that persists across flow executions. Agents can store observations and retrieve context relevant to their current task.
import controlflow as cf
from cortexdb import Cortex
from cortexdb_controlflow import CortexDBMemory
cortex = Cortex(base_url="http://localhost:3141")
memory = CortexDBMemory(client=cortex, tenant_id="my-app", top_k=5)
@cf.flow
def research_flow():
# Store findings during execution
memory.store(
"The competitor released a new pricing tier at $49/mo.",
agent_id="researcher",
)
# In a later task, retrieve relevant context
context = memory.context(
"What do we know about competitor pricing?",
agent_id="researcher",
)
# Use context in a ControlFlow task
analysis = cf.run(
"Analyse competitor pricing strategy",
instructions=f"Use this context:\n{context}",
)
return analysis
Task Result Storage
CortexDBTaskStore persists the outcomes of ControlFlow tasks so that future flows can recall past results, enabling incremental workflows.
import controlflow as cf
from cortexdb import Cortex
from cortexdb_controlflow import CortexDBTaskStore
cortex = Cortex(base_url="http://localhost:3141")
store = CortexDBTaskStore(client=cortex, tenant_id="my-app")
@cf.flow
def weekly_report():
# Check if we already have recent results
past_results = store.get_results("data-collection", top_k=1)
if past_results:
data = past_results[0]
else:
data = cf.run("Collect weekly metrics from all sources")
store.save_result(
task_id="data-collection",
flow_id="weekly-report",
result=data,
)
summary = cf.run(
"Summarise the weekly metrics",
context={"data": data},
)
store.save_result(
task_id="weekly-summary",
flow_id="weekly-report",
result=summary,
)
return summary
Searching Across Flows
Search task results semantically across all flows or scoped to a specific flow.
# Search all task results
results = store.search_results("revenue analysis")
# Search within a specific flow
results = store.search_results("revenue", flow_id="quarterly-review")
Configuration
CortexDBMemory
| Parameter | Default | Description |
|---|---|---|
| client | Required | Initialised Cortex client instance |
| tenant_id | "default" | Tenant identifier for multi-tenant isolation |
| namespace | None | Optional namespace to scope memories |
| top_k | 5 | Max memories to retrieve per search |
CortexDBTaskStore
| Parameter | Default | Description |
|---|---|---|
| client | Required | Initialised Cortex client instance |
| tenant_id | "default" | Tenant identifier for multi-tenant isolation |
| namespace | None | Optional namespace to scope task results |
Complete Example
import controlflow as cf
from cortexdb import Cortex
from cortexdb_controlflow import CortexDBMemory, CortexDBTaskStore
cortex = Cortex(base_url="http://localhost:3141")
memory = CortexDBMemory(client=cortex, tenant_id="product-team", namespace="research")
store = CortexDBTaskStore(client=cortex, tenant_id="product-team", namespace="tasks")
researcher = cf.Agent(
name="Researcher",
instructions="You research market trends and competitor activity.",
)
analyst = cf.Agent(
name="Analyst",
instructions="You analyse data and produce actionable insights.",
)
@cf.flow
def market_analysis():
# Researcher gathers data with persistent memory
research_context = memory.context(
"What do we already know about market trends?",
agent_id="researcher",
)
findings = cf.run(
"Research current market trends in AI infrastructure",
agents=[researcher],
instructions=f"Prior research:\n{research_context}" if research_context else None,
)
# Store the research findings
memory.store(str(findings), agent_id="researcher")
store.save_result(
task_id="market-research",
flow_id="market-analysis",
result=findings,
agent_id="researcher",
)
# Analyst produces insights using research
past_analyses = store.search_results(
"market analysis insights",
flow_id="market-analysis",
top_k=3,
)
analysis = cf.run(
"Produce actionable insights from the research",
agents=[analyst],
context={"findings": findings, "past_analyses": past_analyses},
)
store.save_result(
task_id="market-insights",
flow_id="market-analysis",
result=analysis,
agent_id="analyst",
)
return analysis