Add long-term memory to Prefect data pipelines and flows.

Prefect Integration

CortexDB integrates with Prefect to provide long-term memory for data pipelines. Use the Prefect Block for connection management and Prefect tasks for memory operations within your flows.

Installation

pip install cortexdb[prefect]

Setup

Register the CortexDB Block

from cortexdb_prefect import CortexDBBlock

block = CortexDBBlock(
    base_url="http://localhost:3141",
    api_key="your-api-key",
    tenant_id="data-team",
)
block.save("production-cortex")

Once saved, the block appears in the Prefect UI and can be referenced by name in any flow.

Using Tasks in Flows

from prefect import flow
from cortexdb_prefect import CortexDBBlock, cortexdb_remember, cortexdb_recall, cortexdb_search

@flow(name="knowledge-pipeline")
def knowledge_pipeline():
    # Load the saved block
    block = CortexDBBlock.load("production-cortex")
    client = block.get_client()

    # Store pipeline results
    cortexdb_remember(
        client=client,
        content="ETL pipeline processed 50,000 records from source_a.",
        tenant_id="data-team",
        metadata={"source": "etl", "records": "50000"},
    )

    # Recall past context
    context = cortexdb_recall(
        client=client,
        query="recent ETL pipeline results",
        tenant_id="data-team",
    )

    return context

if __name__ == "__main__":
    knowledge_pipeline()

Available Tasks

cortexdb_remember

Store content in CortexDB. Configured with 2 retries and 5-second delay.

cortexdb_remember(
    client=client,
    content="Important data point.",
    tenant_id="my-app",
    scope="pipeline",
    metadata={"key": "value"},
)

cortexdb_recall

Retrieve relevant context from CortexDB.

result = cortexdb_recall(
    client=client,
    query="What happened in the last pipeline run?",
    tenant_id="my-app",
    max_tokens=4096,
    min_confidence=0.5,
)

cortexdb_forget

Remove memories from CortexDB.

cortexdb_forget(
    client=client,
    query="old pipeline data",
    reason="Data retention policy",
    tenant_id="my-app",
)

cortexdb_search

Search CortexDB for matching episodes with filters.

results = cortexdb_search(
    client=client,
    query="deployment events",
    tenant_id="my-app",
    source="ci-cd",
    namespace="production",
    limit=10,
)

CortexDB Block Configuration

| Parameter | Default | Description | |---|---|---| | base_url | http://localhost:3141 | CortexDB server URL | | api_key | None | API key (stored as Prefect secret) | | tenant_id | "default" | Default tenant identifier | | timeout | 30.0 | Request timeout in seconds | | max_retries | 3 | Maximum retry attempts |

Data Pipeline Example

from prefect import flow, task
from cortexdb_prefect import CortexDBBlock, cortexdb_remember, cortexdb_recall

@task
def extract_data():
    return [{"id": 1, "value": "data"}]

@task
def transform_data(raw_data, context):
    # Use recalled context to inform transformations
    return [{"id": d["id"], "processed": True} for d in raw_data]

@flow(name="memory-aware-etl")
def etl_with_memory():
    block = CortexDBBlock.load("production-cortex")
    client = block.get_client()

    # Recall previous run context
    context = cortexdb_recall(
        client=client,
        query="last ETL run parameters and results",
        tenant_id="data-team",
    )

    raw = extract_data()
    processed = transform_data(raw, context)

    # Store this run's results
    cortexdb_remember(
        client=client,
        content=f"ETL completed: processed {len(processed)} records.",
        tenant_id="data-team",
    )

    return processed