Wrap CortexDB ingest in Prefect tasks for scheduled and event-driven memory pipelines.

Prefect Integration

Prefect schedules and runs Python flows. Wrap CortexDB calls as @tasks so retries, logging, and observability come for free.

Install

pip install cortexdbai prefect

Pattern

import os
from datetime import datetime, timezone
from prefect import flow, task
from cortexdb.v1 import V1Client

client = V1Client(api_url="https://api-v1.cortexdb.ai", actor="service:prefect-worker",
                  bearer=os.environ["CORTEX_TOKEN"])


@task(retries=3, retry_delay_seconds=10)
def capture_experience(scope: str, text: str, role: str, idem: str) -> dict:
    return client.experience(
        scope=scope, text=text, role=role,
        observed_at=datetime.now(timezone.utc).isoformat(),
        idempotency_key=idem,
    )


@task
def recall_context(scope: str, query: str) -> str:
    pack = client.recall(scope=scope, view="holistic", query=query,
                         include=["beliefs", "facts", "episodes"],
                         budgets={"max_tokens": 3000})
    return pack["context_block"]


@flow
def daily_summary_flow(scope: str):
    context = recall_context(scope, query="What happened yesterday?")
    summary = summarize_with_llm(context)        # your code
    capture_experience(scope, summary, role="assistant", idem=f"daily-{datetime.utcnow().date()}")

The Prefect task name + execution date make a stable idempotency key — replays of the daily flow write once per day.

See also