Add long-term memory to Apache Airflow DAGs.

Airflow Integration

CortexDB provides an Apache Airflow provider package with a Hook for connection management and an Operator for executing memory operations within your DAGs.

Installation

pip install cortexdb-airflow

Setup

Configure the Airflow Connection

In the Airflow UI, go to Admin > Connections and create a new connection:

| Field | Value | |---|---| | Connection ID | cortexdb_default | | Connection Type | CortexDB | | Host | localhost | | Port | 3141 | | Password | Your API key (optional) | | Schema | Default tenant ID (e.g., my-app) | | Extra | {"timeout": 30, "max_retries": 3, "scheme": "http"} |

Using the Operator

from airflow.decorators import dag
from datetime import datetime
from cortexdb_airflow import CortexDBOperator

@dag(
    schedule=None,
    start_date=datetime(2024, 1, 1),
    catchup=False,
)
def memory_pipeline():
    store_result = CortexDBOperator(
        task_id="store_pipeline_result",
        operation="remember",
        content="Daily ETL pipeline completed. Processed 50,000 records.",
        tenant_id="data-team",
        metadata={"source": "airflow", "dag": "memory_pipeline"},
    )

    recall_context = CortexDBOperator(
        task_id="recall_context",
        operation="recall",
        query="recent pipeline results and anomalies",
        tenant_id="data-team",
        max_tokens=4096,
    )

    search_history = CortexDBOperator(
        task_id="search_history",
        operation="search",
        query="pipeline failures",
        tenant_id="data-team",
        source="airflow",
        limit=10,
    )

    store_result >> recall_context >> search_history

memory_pipeline()

Operations

Remember

Store content in CortexDB:

CortexDBOperator(
    task_id="store",
    operation="remember",
    content="The deployment succeeded at {{ ts }}.",
    tenant_id="ops-team",
)

Recall

Retrieve relevant context (result pushed to XCom):

CortexDBOperator(
    task_id="recall",
    operation="recall",
    query="What were the recent deployment issues?",
    tenant_id="ops-team",
)

Forget

Remove memories:

CortexDBOperator(
    task_id="forget",
    operation="forget",
    query="obsolete pipeline data",
    reason="Data retention policy cleanup",
    tenant_id="data-team",
)

Search

Search for episodes with filters:

CortexDBOperator(
    task_id="search",
    operation="search",
    query="deployment events",
    tenant_id="ops-team",
    namespace="production",
    limit=20,
)

Using the Hook Directly

For custom operators or PythonOperator tasks:

from airflow.decorators import task
from cortexdb_airflow import CortexDBHook

@task
def custom_memory_task():
    hook = CortexDBHook(cortexdb_conn_id="cortexdb_default")
    client = hook.get_client()
    tenant_id = hook.get_tenant_id()

    result = client.recall(
        query="What do we know about this data source?",
        tenant_id=tenant_id,
    )
    return result

Template Fields

The operator supports Airflow's Jinja templating for these fields:

  • content
  • query
  • tenant_id
  • reason
  • scope
  • namespace
CortexDBOperator(
    task_id="templated_store",
    operation="remember",
    content="DAG {{ dag.dag_id }} completed at {{ ts }}. Status: {{ ti.state }}",
    tenant_id="{{ var.value.cortex_tenant }}",
)

Configuration

CortexDBHook

| Parameter | Default | Description | |---|---|---| | cortexdb_conn_id | cortexdb_default | Airflow connection ID |

CortexDBOperator

| Parameter | Default | Description | |---|---|---| | operation | Required | One of: remember, recall, forget, search | | cortexdb_conn_id | cortexdb_default | Airflow connection ID | | tenant_id | Connection schema | Tenant identifier | | content | None | Content to store (remember) | | query | None | Query string (recall, forget, search) | | max_tokens | 4096 | Max tokens (recall) | | min_confidence | 0.0 | Min confidence (recall) | | limit | 20 | Result limit (search) | | offset | 0 | Pagination offset (search) |