Wrap CortexDB ingest in Prefect tasks for scheduled and event-driven memory pipelines.
Prefect Integration
Prefect schedules and runs Python flows. Wrap CortexDB calls as @tasks so retries, logging, and observability come for free.
Install
pip install cortexdbai prefect
Pattern
import os
from datetime import datetime, timezone
from prefect import flow, task
from cortexdb.v1 import V1Client
client = V1Client(api_url="https://api-v1.cortexdb.ai", actor="service:prefect-worker",
bearer=os.environ["CORTEX_TOKEN"])
@task(retries=3, retry_delay_seconds=10)
def capture_experience(scope: str, text: str, role: str, idem: str) -> dict:
return client.experience(
scope=scope, text=text, role=role,
observed_at=datetime.now(timezone.utc).isoformat(),
idempotency_key=idem,
)
@task
def recall_context(scope: str, query: str) -> str:
pack = client.recall(scope=scope, view="holistic", query=query,
include=["beliefs", "facts", "episodes"],
budgets={"max_tokens": 3000})
return pack["context_block"]
@flow
def daily_summary_flow(scope: str):
context = recall_context(scope, query="What happened yesterday?")
summary = summarize_with_llm(context) # your code
capture_experience(scope, summary, role="assistant", idem=f"daily-{datetime.utcnow().date()}")
The Prefect task name + execution date make a stable idempotency key — replays of the daily flow write once per day.
See also
- Python SDK
- ControlFlow — Prefect-native agent workflows