Capture and recall CortexDB experiences from Temporal workflows + activities.

Temporal Integration

Temporal orchestrates durable workflows. Run CortexDB calls inside activities (never workflows — workflows must be deterministic). Use the workflow's natural retry semantics for resilient ingest.

Install

pip install cortexdbai temporalio

Pattern — activity wrappers

import os
from datetime import datetime, timezone
from temporalio import activity
from cortexdb.v1 import V1Client

client = V1Client(api_url="https://api-v1.cortexdb.ai", actor="service:temporal-worker",
                  bearer=os.environ["CORTEX_TOKEN"])


@activity.defn
async def capture_experience(scope: str, text: str, role: str, idem: str) -> dict:
    return client.experience(scope=scope, text=text, role=role,
                              observed_at=datetime.now(timezone.utc).isoformat(),
                              idempotency_key=idem)


@activity.defn
async def recall_context(scope: str, query: str) -> str:
    pack = client.recall(scope=scope, view="holistic", query=query,
                         include=["beliefs", "facts", "episodes"],
                         budgets={"max_tokens": 3000})
    return pack["context_block"]
from temporalio import workflow

@workflow.defn
class HandleCustomerEvent:
    @workflow.run
    async def run(self, scope: str, message: str) -> str:
        context = await workflow.execute_activity(
            recall_context, args=[scope, message],
            schedule_to_close_timeout=timedelta(seconds=30),
        )
        # ... call LLM here ...
        await workflow.execute_activity(
            capture_experience, args=[scope, message, "user", workflow.info().workflow_id],
            schedule_to_close_timeout=timedelta(seconds=30),
        )
        return "done"

Use the Temporal workflow ID as the idempotency_key so retries don't duplicate experiences.

See also