Add long-term memory to Prefect data pipelines and flows.
Prefect Integration
CortexDB integrates with Prefect to provide long-term memory for data pipelines. Use the Prefect Block for connection management and Prefect tasks for memory operations within your flows.
Installation
pip install cortexdb[prefect]
Setup
Register the CortexDB Block
from cortexdb_prefect import CortexDBBlock
block = CortexDBBlock(
base_url="http://localhost:3141",
api_key="your-api-key",
tenant_id="data-team",
)
block.save("production-cortex")
Once saved, the block appears in the Prefect UI and can be referenced by name in any flow.
Using Tasks in Flows
from prefect import flow
from cortexdb_prefect import CortexDBBlock, cortexdb_remember, cortexdb_recall, cortexdb_search
@flow(name="knowledge-pipeline")
def knowledge_pipeline():
# Load the saved block
block = CortexDBBlock.load("production-cortex")
client = block.get_client()
# Store pipeline results
cortexdb_remember(
client=client,
content="ETL pipeline processed 50,000 records from source_a.",
tenant_id="data-team",
metadata={"source": "etl", "records": "50000"},
)
# Recall past context
context = cortexdb_recall(
client=client,
query="recent ETL pipeline results",
tenant_id="data-team",
)
return context
if __name__ == "__main__":
knowledge_pipeline()
Available Tasks
cortexdb_remember
Store content in CortexDB. Configured with 2 retries and 5-second delay.
cortexdb_remember(
client=client,
content="Important data point.",
tenant_id="my-app",
scope="pipeline",
metadata={"key": "value"},
)
cortexdb_recall
Retrieve relevant context from CortexDB.
result = cortexdb_recall(
client=client,
query="What happened in the last pipeline run?",
tenant_id="my-app",
max_tokens=4096,
min_confidence=0.5,
)
cortexdb_forget
Remove memories from CortexDB.
cortexdb_forget(
client=client,
query="old pipeline data",
reason="Data retention policy",
tenant_id="my-app",
)
cortexdb_search
Search CortexDB for matching episodes with filters.
results = cortexdb_search(
client=client,
query="deployment events",
tenant_id="my-app",
source="ci-cd",
namespace="production",
limit=10,
)
CortexDB Block Configuration
| Parameter | Default | Description |
|---|---|---|
| base_url | http://localhost:3141 | CortexDB server URL |
| api_key | None | API key (stored as Prefect secret) |
| tenant_id | "default" | Default tenant identifier |
| timeout | 30.0 | Request timeout in seconds |
| max_retries | 3 | Maximum retry attempts |
Data Pipeline Example
from prefect import flow, task
from cortexdb_prefect import CortexDBBlock, cortexdb_remember, cortexdb_recall
@task
def extract_data():
return [{"id": 1, "value": "data"}]
@task
def transform_data(raw_data, context):
# Use recalled context to inform transformations
return [{"id": d["id"], "processed": True} for d in raw_data]
@flow(name="memory-aware-etl")
def etl_with_memory():
block = CortexDBBlock.load("production-cortex")
client = block.get_client()
# Recall previous run context
context = cortexdb_recall(
client=client,
query="last ETL run parameters and results",
tenant_id="data-team",
)
raw = extract_data()
processed = transform_data(raw, context)
# Store this run's results
cortexdb_remember(
client=client,
content=f"ETL completed: processed {len(processed)} records.",
tenant_id="data-team",
)
return processed