Add long-term memory to Temporal durable workflows.

Temporal Integration

CortexDB integrates with Temporal to provide long-term memory for durable workflows. Store workflow context, recall past execution history, and automatically log workflow events as CortexDB episodes.

Installation

pip install cortexdb[temporal]

Activities

Use CortexDBActivities to add remember, recall, forget, and search as Temporal activities:

from temporalio.client import Client
from temporalio.worker import Worker
from cortexdb import Cortex
from cortexdb_temporal import CortexDBActivities

cortex = Cortex(base_url="http://localhost:3141")
activities = CortexDBActivities(client=cortex, tenant_id="my-app")

async def main():
    client = await Client.connect("localhost:7233")

    worker = Worker(
        client=client,
        task_queue="cortex-queue",
        activities=activities.get_activities(),
    )
    await worker.run()

Using Activities in Workflows

from datetime import timedelta
from temporalio import workflow
from cortexdb_temporal.activities import RememberInput, RecallInput

@workflow.defn
class ResearchWorkflow:
    @workflow.run
    async def run(self, topic: str) -> str:
        # Recall past research on this topic
        context = await workflow.execute_activity(
            "cortexdb_recall",
            RecallInput(query=topic, tenant_id="research"),
            start_to_close_timeout=timedelta(seconds=30),
        )

        # ... do research work ...

        # Store the results for future workflows
        await workflow.execute_activity(
            "cortexdb_remember",
            RememberInput(
                content=f"Research on {topic}: findings here",
                tenant_id="research",
            ),
            start_to_close_timeout=timedelta(seconds=30),
        )

        return "done"

Workflow Interceptor

The CortexDBInterceptor automatically logs all workflow and activity lifecycle events to CortexDB:

from cortexdb_temporal import CortexDBInterceptor

interceptor = CortexDBInterceptor(
    client=cortex,
    tenant_id="my-app",
    log_activities=True,
    log_workflows=True,
)

worker = Worker(
    client=client,
    task_queue="cortex-queue",
    workflows=[ResearchWorkflow],
    activities=activities.get_activities(),
    interceptors=[interceptor],
)

Events logged automatically:

  • workflow_started — when a workflow begins execution
  • workflow_completed — when a workflow finishes (includes duration)
  • workflow_failed — when a workflow fails (includes error and duration)
  • activity_started — when an activity begins
  • activity_completed — when an activity finishes
  • activity_failed — when an activity fails

Standalone Activity Functions

For simpler setups, use the standalone activity functions:

from cortexdb_temporal import cortexdb_remember, cortexdb_recall
from cortexdb_temporal.activities import configure, RememberInput, RecallInput

# Configure the module-level client
configure(Cortex(base_url="http://localhost:3141"))

# Register standalone functions with a worker
worker = Worker(
    client=client,
    task_queue="cortex-queue",
    activities=[cortexdb_remember, cortexdb_recall],
)

Configuration

CortexDBActivities

| Parameter | Default | Description | |---|---|---| | client | Required | Initialized CortexDB client | | tenant_id | "default" | Default tenant identifier |

CortexDBInterceptor

| Parameter | Default | Description | |---|---|---| | client | Required | Initialized CortexDB client | | tenant_id | "default" | Tenant for storing event episodes | | log_activities | True | Log activity lifecycle events | | log_workflows | True | Log workflow lifecycle events |