Add long-term memory to Apache Airflow DAGs.
Airflow Integration
CortexDB provides an Apache Airflow provider package with a Hook for connection management and an Operator for executing memory operations within your DAGs.
Installation
pip install cortexdb-airflow
Setup
Configure the Airflow Connection
In the Airflow UI, go to Admin > Connections and create a new connection:
| Field | Value |
|---|---|
| Connection ID | cortexdb_default |
| Connection Type | CortexDB |
| Host | localhost |
| Port | 3141 |
| Password | Your API key (optional) |
| Schema | Default tenant ID (e.g., my-app) |
| Extra | {"timeout": 30, "max_retries": 3, "scheme": "http"} |
Using the Operator
from airflow.decorators import dag
from datetime import datetime
from cortexdb_airflow import CortexDBOperator
@dag(
schedule=None,
start_date=datetime(2024, 1, 1),
catchup=False,
)
def memory_pipeline():
store_result = CortexDBOperator(
task_id="store_pipeline_result",
operation="remember",
content="Daily ETL pipeline completed. Processed 50,000 records.",
tenant_id="data-team",
metadata={"source": "airflow", "dag": "memory_pipeline"},
)
recall_context = CortexDBOperator(
task_id="recall_context",
operation="recall",
query="recent pipeline results and anomalies",
tenant_id="data-team",
max_tokens=4096,
)
search_history = CortexDBOperator(
task_id="search_history",
operation="search",
query="pipeline failures",
tenant_id="data-team",
source="airflow",
limit=10,
)
store_result >> recall_context >> search_history
memory_pipeline()
Operations
Remember
Store content in CortexDB:
CortexDBOperator(
task_id="store",
operation="remember",
content="The deployment succeeded at {{ ts }}.",
tenant_id="ops-team",
)
Recall
Retrieve relevant context (result pushed to XCom):
CortexDBOperator(
task_id="recall",
operation="recall",
query="What were the recent deployment issues?",
tenant_id="ops-team",
)
Forget
Remove memories:
CortexDBOperator(
task_id="forget",
operation="forget",
query="obsolete pipeline data",
reason="Data retention policy cleanup",
tenant_id="data-team",
)
Search
Search for episodes with filters:
CortexDBOperator(
task_id="search",
operation="search",
query="deployment events",
tenant_id="ops-team",
namespace="production",
limit=20,
)
Using the Hook Directly
For custom operators or PythonOperator tasks:
from airflow.decorators import task
from cortexdb_airflow import CortexDBHook
@task
def custom_memory_task():
hook = CortexDBHook(cortexdb_conn_id="cortexdb_default")
client = hook.get_client()
tenant_id = hook.get_tenant_id()
result = client.recall(
query="What do we know about this data source?",
tenant_id=tenant_id,
)
return result
Template Fields
The operator supports Airflow's Jinja templating for these fields:
contentquerytenant_idreasonscopenamespace
CortexDBOperator(
task_id="templated_store",
operation="remember",
content="DAG {{ dag.dag_id }} completed at {{ ts }}. Status: {{ ti.state }}",
tenant_id="{{ var.value.cortex_tenant }}",
)
Configuration
CortexDBHook
| Parameter | Default | Description |
|---|---|---|
| cortexdb_conn_id | cortexdb_default | Airflow connection ID |
CortexDBOperator
| Parameter | Default | Description |
|---|---|---|
| operation | Required | One of: remember, recall, forget, search |
| cortexdb_conn_id | cortexdb_default | Airflow connection ID |
| tenant_id | Connection schema | Tenant identifier |
| content | None | Content to store (remember) |
| query | None | Query string (recall, forget, search) |
| max_tokens | 4096 | Max tokens (recall) |
| min_confidence | 0.0 | Min confidence (recall) |
| limit | 20 | Result limit (search) |
| offset | 0 | Pagination offset (search) |