Add long-term memory to Temporal durable workflows.
Temporal Integration
CortexDB integrates with Temporal to provide long-term memory for durable workflows. Store workflow context, recall past execution history, and automatically log workflow events as CortexDB episodes.
Installation
pip install cortexdb[temporal]
Activities
Use CortexDBActivities to add remember, recall, forget, and search as Temporal activities:
from temporalio.client import Client
from temporalio.worker import Worker
from cortexdb import Cortex
from cortexdb_temporal import CortexDBActivities
cortex = Cortex(base_url="http://localhost:3141")
activities = CortexDBActivities(client=cortex, tenant_id="my-app")
async def main():
client = await Client.connect("localhost:7233")
worker = Worker(
client=client,
task_queue="cortex-queue",
activities=activities.get_activities(),
)
await worker.run()
Using Activities in Workflows
from datetime import timedelta
from temporalio import workflow
from cortexdb_temporal.activities import RememberInput, RecallInput
@workflow.defn
class ResearchWorkflow:
@workflow.run
async def run(self, topic: str) -> str:
# Recall past research on this topic
context = await workflow.execute_activity(
"cortexdb_recall",
RecallInput(query=topic, tenant_id="research"),
start_to_close_timeout=timedelta(seconds=30),
)
# ... do research work ...
# Store the results for future workflows
await workflow.execute_activity(
"cortexdb_remember",
RememberInput(
content=f"Research on {topic}: findings here",
tenant_id="research",
),
start_to_close_timeout=timedelta(seconds=30),
)
return "done"
Workflow Interceptor
The CortexDBInterceptor automatically logs all workflow and activity lifecycle events to CortexDB:
from cortexdb_temporal import CortexDBInterceptor
interceptor = CortexDBInterceptor(
client=cortex,
tenant_id="my-app",
log_activities=True,
log_workflows=True,
)
worker = Worker(
client=client,
task_queue="cortex-queue",
workflows=[ResearchWorkflow],
activities=activities.get_activities(),
interceptors=[interceptor],
)
Events logged automatically:
workflow_started— when a workflow begins executionworkflow_completed— when a workflow finishes (includes duration)workflow_failed— when a workflow fails (includes error and duration)activity_started— when an activity beginsactivity_completed— when an activity finishesactivity_failed— when an activity fails
Standalone Activity Functions
For simpler setups, use the standalone activity functions:
from cortexdb_temporal import cortexdb_remember, cortexdb_recall
from cortexdb_temporal.activities import configure, RememberInput, RecallInput
# Configure the module-level client
configure(Cortex(base_url="http://localhost:3141"))
# Register standalone functions with a worker
worker = Worker(
client=client,
task_queue="cortex-queue",
activities=[cortexdb_remember, cortexdb_recall],
)
Configuration
CortexDBActivities
| Parameter | Default | Description |
|---|---|---|
| client | Required | Initialized CortexDB client |
| tenant_id | "default" | Default tenant identifier |
CortexDBInterceptor
| Parameter | Default | Description |
|---|---|---|
| client | Required | Initialized CortexDB client |
| tenant_id | "default" | Tenant for storing event episodes |
| log_activities | True | Log activity lifecycle events |
| log_workflows | True | Log workflow lifecycle events |