Add persistent memory and task result storage to ControlFlow agents with CortexDB.

ControlFlow Integration

CortexDB integrates with ControlFlow to provide persistent agent memory and task result storage. ControlFlow decomposes problems into tasks assigned to agents — CortexDB gives those agents long-term memory that persists across flow executions.

Installation

pip install cortexdb-controlflow

Quick Start

from cortexdb import Cortex
from cortexdb_controlflow import CortexDBMemory, CortexDBTaskStore

client = Cortex(base_url="http://localhost:3141")
memory = CortexDBMemory(client=client, tenant_id="my-app")
task_store = CortexDBTaskStore(client=client, tenant_id="my-app")

# Store agent observations
memory.store("The API rate limit is 100 req/min.", agent_id="researcher")

# Retrieve relevant context
results = memory.search("rate limits", agent_id="researcher")

# Persist task results for future flows
task_store.save_result(
    task_id="api-audit",
    flow_id="weekly-review",
    result={"rate_limit": 100, "status": "healthy"},
)

Agent Memory

CortexDBMemory provides each ControlFlow agent with durable memory that persists across flow executions. Agents can store observations and retrieve context relevant to their current task.

import controlflow as cf
from cortexdb import Cortex
from cortexdb_controlflow import CortexDBMemory

cortex = Cortex(base_url="http://localhost:3141")
memory = CortexDBMemory(client=cortex, tenant_id="my-app", top_k=5)

@cf.flow
def research_flow():
    # Store findings during execution
    memory.store(
        "The competitor released a new pricing tier at $49/mo.",
        agent_id="researcher",
    )

    # In a later task, retrieve relevant context
    context = memory.context(
        "What do we know about competitor pricing?",
        agent_id="researcher",
    )

    # Use context in a ControlFlow task
    analysis = cf.run(
        "Analyse competitor pricing strategy",
        instructions=f"Use this context:\n{context}",
    )

    return analysis

Task Result Storage

CortexDBTaskStore persists the outcomes of ControlFlow tasks so that future flows can recall past results, enabling incremental workflows.

import controlflow as cf
from cortexdb import Cortex
from cortexdb_controlflow import CortexDBTaskStore

cortex = Cortex(base_url="http://localhost:3141")
store = CortexDBTaskStore(client=cortex, tenant_id="my-app")

@cf.flow
def weekly_report():
    # Check if we already have recent results
    past_results = store.get_results("data-collection", top_k=1)

    if past_results:
        data = past_results[0]
    else:
        data = cf.run("Collect weekly metrics from all sources")
        store.save_result(
            task_id="data-collection",
            flow_id="weekly-report",
            result=data,
        )

    summary = cf.run(
        "Summarise the weekly metrics",
        context={"data": data},
    )
    store.save_result(
        task_id="weekly-summary",
        flow_id="weekly-report",
        result=summary,
    )

    return summary

Searching Across Flows

Search task results semantically across all flows or scoped to a specific flow.

# Search all task results
results = store.search_results("revenue analysis")

# Search within a specific flow
results = store.search_results("revenue", flow_id="quarterly-review")

Configuration

CortexDBMemory

| Parameter | Default | Description | |---|---|---| | client | Required | Initialised Cortex client instance | | tenant_id | "default" | Tenant identifier for multi-tenant isolation | | namespace | None | Optional namespace to scope memories | | top_k | 5 | Max memories to retrieve per search |

CortexDBTaskStore

| Parameter | Default | Description | |---|---|---| | client | Required | Initialised Cortex client instance | | tenant_id | "default" | Tenant identifier for multi-tenant isolation | | namespace | None | Optional namespace to scope task results |

Complete Example

import controlflow as cf
from cortexdb import Cortex
from cortexdb_controlflow import CortexDBMemory, CortexDBTaskStore

cortex = Cortex(base_url="http://localhost:3141")
memory = CortexDBMemory(client=cortex, tenant_id="product-team", namespace="research")
store = CortexDBTaskStore(client=cortex, tenant_id="product-team", namespace="tasks")

researcher = cf.Agent(
    name="Researcher",
    instructions="You research market trends and competitor activity.",
)

analyst = cf.Agent(
    name="Analyst",
    instructions="You analyse data and produce actionable insights.",
)

@cf.flow
def market_analysis():
    # Researcher gathers data with persistent memory
    research_context = memory.context(
        "What do we already know about market trends?",
        agent_id="researcher",
    )

    findings = cf.run(
        "Research current market trends in AI infrastructure",
        agents=[researcher],
        instructions=f"Prior research:\n{research_context}" if research_context else None,
    )

    # Store the research findings
    memory.store(str(findings), agent_id="researcher")
    store.save_result(
        task_id="market-research",
        flow_id="market-analysis",
        result=findings,
        agent_id="researcher",
    )

    # Analyst produces insights using research
    past_analyses = store.search_results(
        "market analysis insights",
        flow_id="market-analysis",
        top_k=3,
    )

    analysis = cf.run(
        "Produce actionable insights from the research",
        agents=[analyst],
        context={"findings": findings, "past_analyses": past_analyses},
    )

    store.save_result(
        task_id="market-insights",
        flow_id="market-analysis",
        result=analysis,
        agent_id="analyst",
    )

    return analysis