Add persistent memory and task result storage to ControlFlow agents with CortexDB.

ControlFlow Integration

CortexDB integrates with ControlFlow to provide persistent agent memory and task result storage. ControlFlow decomposes problems into tasks assigned to agents — CortexDB gives those agents long-term memory that persists across flow executions.

Installation

pip install cortexdbai[controlflow]

Quick Start

from cortexdb import Cortex
from cortexdb_controlflow import CortexDBMemory, CortexDBTaskStore

client = Cortex(base_url="https://api.cortexdb.ai")
memory = CortexDBMemory(client=client, tenant_id="my-app")
task_store = CortexDBTaskStore(client=client, tenant_id="my-app")

# Store agent observations
memory.store("The API rate limit is 100 req/min.", agent_id="researcher")

# Retrieve relevant context
result = memory.search("rate limits", agent_id="researcher")
print(result.context)

# Persist task results for future flows
task_store.save_result(
    task_id="api-audit",
    flow_id="weekly-review",
    result={"rate_limit": 100, "status": "healthy"},
)

Agent Memory

CortexDBMemory provides each ControlFlow agent with durable memory that persists across flow executions. Agents can store observations and retrieve context relevant to their current task.

import controlflow as cf
from cortexdb import Cortex
from cortexdb_controlflow import CortexDBMemory

cortex = Cortex(base_url="https://api.cortexdb.ai")
memory = CortexDBMemory(client=cortex, tenant_id="my-app")

@cf.flow
def research_flow():
    # Store findings during execution
    memory.store(
        "The competitor released a new pricing tier at $49/mo.",
        agent_id="researcher",
    )

    # In a later task, retrieve relevant context
    result = memory.context(
        "What do we know about competitor pricing?",
        agent_id="researcher",
    )

    # Use context in a ControlFlow task
    analysis = cf.run(
        "Analyse competitor pricing strategy",
        instructions=f"Use this context:\n{result.context}",
    )

    return analysis

Task Result Storage

CortexDBTaskStore persists the outcomes of ControlFlow tasks so that future flows can recall past results, enabling incremental workflows.

import controlflow as cf
from cortexdb import Cortex
from cortexdb_controlflow import CortexDBTaskStore

cortex = Cortex(base_url="https://api.cortexdb.ai")
store = CortexDBTaskStore(client=cortex, tenant_id="my-app")

@cf.flow
def weekly_report():
    # Check if we already have recent results
    past = store.get_results("data-collection")

    if past.context:
        data = past.context
    else:
        data = cf.run("Collect weekly metrics from all sources")
        store.save_result(
            task_id="data-collection",
            flow_id="weekly-report",
            result=data,
        )

    summary = cf.run(
        "Summarise the weekly metrics",
        context={"data": data},
    )
    store.save_result(
        task_id="weekly-summary",
        flow_id="weekly-report",
        result=summary,
    )

    return summary

Searching Across Flows

Search task results semantically across all flows or scoped to a specific flow.

# Search all task results
result = store.search_results("revenue analysis")
print(result.context)

# Search within a specific flow
result = store.search_results("revenue", flow_id="quarterly-review")
print(result.context)

Configuration

CortexDBMemory

| Parameter | Default | Description | |---|---|---| | client | Required | Initialised Cortex client instance | | tenant_id | "default" | Tenant identifier for multi-tenant isolation |

CortexDBTaskStore

| Parameter | Default | Description | |---|---|---| | client | Required | Initialised Cortex client instance | | tenant_id | "default" | Tenant identifier for multi-tenant isolation |

Under the Hood

The integration wrapper maps to CortexDB's REST API:

# memory.store("The API rate limit is 100 req/min.", agent_id="researcher")
curl -X POST https://api.cortexdb.ai/v1/remember \
  -H "Authorization: Bearer your-api-key" \
  -H "Content-Type: application/json" \
  -d '{
    "content": "The API rate limit is 100 req/min.",
    "tenant_id": "my-app"
  }'
# Returns: { "event_id": "019d6359-d3cc-7671-9e4c-9151011fa016" }

# memory.context("rate limits", agent_id="researcher")
curl -X POST https://api.cortexdb.ai/v1/recall \
  -H "Authorization: Bearer your-api-key" \
  -H "Content-Type: application/json" \
  -d '{
    "query": "rate limits",
    "tenant_id": "my-app"
  }'
# Returns: { "context": "...", "confidence": 0.91, "latency_ms": 9 }

Complete Example

import controlflow as cf
from cortexdb import Cortex
from cortexdb_controlflow import CortexDBMemory, CortexDBTaskStore

cortex = Cortex(base_url="https://api.cortexdb.ai")
memory = CortexDBMemory(client=cortex, tenant_id="product-team")
store = CortexDBTaskStore(client=cortex, tenant_id="product-team")

researcher = cf.Agent(
    name="Researcher",
    instructions="You research market trends and competitor activity.",
)

analyst = cf.Agent(
    name="Analyst",
    instructions="You analyse data and produce actionable insights.",
)

@cf.flow
def market_analysis():
    # Researcher gathers data with persistent memory
    research_result = memory.context(
        "What do we already know about market trends?",
        agent_id="researcher",
    )

    findings = cf.run(
        "Research current market trends in AI infrastructure",
        agents=[researcher],
        instructions=f"Prior research:\n{research_result.context}" if research_result.context else None,
    )

    # Store the research findings
    memory.store(str(findings), agent_id="researcher")
    store.save_result(
        task_id="market-research",
        flow_id="market-analysis",
        result=findings,
        agent_id="researcher",
    )

    # Analyst produces insights using research
    past_analysis = store.search_results(
        "market analysis insights",
        flow_id="market-analysis",
    )

    analysis = cf.run(
        "Produce actionable insights from the research",
        agents=[analyst],
        context={"findings": findings, "past_analysis": past_analysis.context},
    )

    store.save_result(
        task_id="market-insights",
        flow_id="market-analysis",
        result=analysis,
        agent_id="analyst",
    )

    return analysis