Wrap CortexDB ingest in Airflow PythonOperator tasks.

Airflow Integration

Apache Airflow schedules DAGs of operators. Wrap CortexDB calls in PythonOperator (or @task for TaskFlow API) tasks.

Install

pip install cortexdbai apache-airflow

Pattern

import os
from datetime import datetime, timezone
from airflow.decorators import dag, task
from cortexdb.v1 import V1Client

client = V1Client(api_url="https://api-v1.cortexdb.ai", actor="service:airflow",
                  bearer=os.environ["CORTEX_TOKEN"])


@dag(schedule="@daily", catchup=False, start_date=datetime(2026, 5, 1))
def cortex_daily_summary():

    @task
    def recall_context(scope: str) -> str:
        pack = client.recall(scope=scope, view="holistic",
                             query="What happened yesterday?",
                             include=["beliefs", "facts", "episodes"],
                             budgets={"max_tokens": 3000})
        return pack["context_block"]

    @task
    def capture(scope: str, summary: str, run_id: str) -> dict:
        return client.experience(
            scope=scope, text=summary, role="assistant",
            observed_at=datetime.now(timezone.utc).isoformat(),
            idempotency_key=run_id,
        )

    scope = "org:acme/source:airflow"
    context = recall_context(scope)
    summary = summarize.override(task_id="summarize")(context)  # your task
    capture(scope, summary, "{{ run_id }}")


cortex_daily_summary()

Use Airflow's {{ run_id }} Jinja macro as the idempotency key — DAG run replays write once.

See also