Capture and recall CortexDB experiences from Temporal workflows + activities.
Temporal Integration
Temporal orchestrates durable workflows. Run CortexDB calls inside activities (never workflows — workflows must be deterministic). Use the workflow's natural retry semantics for resilient ingest.
Install
pip install cortexdbai temporalio
Pattern — activity wrappers
import os
from datetime import datetime, timezone
from temporalio import activity
from cortexdb.v1 import V1Client
client = V1Client(api_url="https://api-v1.cortexdb.ai", actor="service:temporal-worker",
bearer=os.environ["CORTEX_TOKEN"])
@activity.defn
async def capture_experience(scope: str, text: str, role: str, idem: str) -> dict:
return client.experience(scope=scope, text=text, role=role,
observed_at=datetime.now(timezone.utc).isoformat(),
idempotency_key=idem)
@activity.defn
async def recall_context(scope: str, query: str) -> str:
pack = client.recall(scope=scope, view="holistic", query=query,
include=["beliefs", "facts", "episodes"],
budgets={"max_tokens": 3000})
return pack["context_block"]
from temporalio import workflow
@workflow.defn
class HandleCustomerEvent:
@workflow.run
async def run(self, scope: str, message: str) -> str:
context = await workflow.execute_activity(
recall_context, args=[scope, message],
schedule_to_close_timeout=timedelta(seconds=30),
)
# ... call LLM here ...
await workflow.execute_activity(
capture_experience, args=[scope, message, "user", workflow.info().workflow_id],
schedule_to_close_timeout=timedelta(seconds=30),
)
return "done"
Use the Temporal workflow ID as the idempotency_key so retries don't duplicate experiences.
See also
- Python SDK
- Lifecycle — async stages +
?wait=for read-after-write inside a workflow