Skip to content

Persistence API

ninja_persistence

Ninja Persistence — unified polyglot persistence layer for Ninja Stack.

__all__ module-attribute

__all__ = [
    "AdapterRegistry",
    "ChromaVectorAdapter",
    "ConnectionManager",
    "ConnectionProfile",
    "EmbeddingStrategy",
    "GraphAdapter",
    "MilvusVectorAdapter",
    "MongoAdapter",
    "Repository",
    "SQLAdapter",
]

ChromaVectorAdapter

ChromaVectorAdapter(
    entity: EntitySchema, client: Any = None
)

Chroma-backed vector store adapter.

Implements the Repository protocol with native semantic search support.

Requires the chromadb optional dependency: pip install ninja-persistence[chroma]

Source code in libs/ninja-persistence/src/ninja_persistence/adapters/chroma.py
def __init__(self, entity: EntitySchema, client: Any = None) -> None:
    self._entity = entity
    self._client = client
    self._collection_name = entity.collection_name or entity.name.lower()

find_by_id async

find_by_id(id: str) -> dict[str, Any] | None
Source code in libs/ninja-persistence/src/ninja_persistence/adapters/chroma.py
async def find_by_id(self, id: str) -> dict[str, Any] | None:
    coll = self._get_collection()
    result = coll.get(ids=[id])
    if not result["ids"]:
        return None
    doc: dict[str, Any] = {"id": id}
    if result.get("metadatas"):
        doc.update(result["metadatas"][0])
    if result.get("documents"):
        doc["document"] = result["documents"][0]
    return doc

find_many async

find_many(
    filters: dict[str, Any] | None = None, limit: int = 100
) -> list[dict[str, Any]]
Source code in libs/ninja-persistence/src/ninja_persistence/adapters/chroma.py
async def find_many(self, filters: dict[str, Any] | None = None, limit: int = 100) -> list[dict[str, Any]]:
    coll = self._get_collection()
    kwargs: dict[str, Any] = {"limit": limit}
    if filters and "where" in filters:
        kwargs["where"] = filters["where"]
    result = coll.get(**kwargs)
    docs: list[dict[str, Any]] = []
    for i, doc_id in enumerate(result["ids"]):
        doc: dict[str, Any] = {"id": doc_id}
        if result.get("metadatas") and i < len(result["metadatas"]):
            doc.update(result["metadatas"][i])
        if result.get("documents") and i < len(result["documents"]):
            doc["document"] = result["documents"][i]
        docs.append(doc)
    return docs

create async

create(data: dict[str, Any]) -> dict[str, Any]
Source code in libs/ninja-persistence/src/ninja_persistence/adapters/chroma.py
async def create(self, data: dict[str, Any]) -> dict[str, Any]:
    coll = self._get_collection()
    doc_id = data.get("id", "")
    document = data.get("document", "")
    metadata = {k: v for k, v in data.items() if k not in ("id", "document", "embedding")}
    embedding = data.get("embedding")
    kwargs: dict[str, Any] = {
        "ids": [doc_id],
        "documents": [document],
    }
    if metadata:
        kwargs["metadatas"] = [metadata]
    if embedding:
        kwargs["embeddings"] = [embedding]
    coll.add(**kwargs)
    return data

update async

update(
    id: str, patch: dict[str, Any]
) -> dict[str, Any] | None
Source code in libs/ninja-persistence/src/ninja_persistence/adapters/chroma.py
async def update(self, id: str, patch: dict[str, Any]) -> dict[str, Any] | None:
    coll = self._get_collection()
    kwargs: dict[str, Any] = {"ids": [id]}
    if "document" in patch:
        kwargs["documents"] = [patch["document"]]
    metadata = {k: v for k, v in patch.items() if k not in ("id", "document", "embedding")}
    if metadata:
        kwargs["metadatas"] = [metadata]
    if "embedding" in patch:
        kwargs["embeddings"] = [patch["embedding"]]
    coll.update(**kwargs)
    return await self.find_by_id(id)

delete async

delete(id: str) -> bool
Source code in libs/ninja-persistence/src/ninja_persistence/adapters/chroma.py
async def delete(self, id: str) -> bool:
    coll = self._get_collection()
    coll.delete(ids=[id])
    return True

search_semantic async

search_semantic(
    query: str, limit: int = 10
) -> list[dict[str, Any]]
Source code in libs/ninja-persistence/src/ninja_persistence/adapters/chroma.py
async def search_semantic(self, query: str, limit: int = 10) -> list[dict[str, Any]]:
    coll = self._get_collection()
    result = coll.query(query_texts=[query], n_results=limit)
    docs: list[dict[str, Any]] = []
    if result["ids"] and result["ids"][0]:
        for i, doc_id in enumerate(result["ids"][0]):
            doc: dict[str, Any] = {"id": doc_id}
            if result.get("metadatas") and result["metadatas"][0] and i < len(result["metadatas"][0]):
                doc.update(result["metadatas"][0][i])
            if result.get("documents") and result["documents"][0] and i < len(result["documents"][0]):
                doc["document"] = result["documents"][0][i]
            if result.get("distances") and result["distances"][0] and i < len(result["distances"][0]):
                doc["_distance"] = result["distances"][0][i]
            docs.append(doc)
    return docs

upsert_embedding async

upsert_embedding(id: str, embedding: list[float]) -> None
Source code in libs/ninja-persistence/src/ninja_persistence/adapters/chroma.py
async def upsert_embedding(self, id: str, embedding: list[float]) -> None:
    coll = self._get_collection()
    coll.update(ids=[id], embeddings=[embedding])

GraphAdapter

GraphAdapter(entity: EntitySchema, driver: Any = None)

Async Neo4j adapter for graph-backed entities.

Implements the Repository protocol for graph databases.

Requires the neo4j optional dependency: pip install ninja-persistence[graph]

Source code in libs/ninja-persistence/src/ninja_persistence/adapters/graph.py
def __init__(self, entity: EntitySchema, driver: Any = None) -> None:
    self._entity = entity
    self._driver = driver
    self._label = entity.collection_name or entity.name

find_by_id async

find_by_id(id: str) -> dict[str, Any] | None
Source code in libs/ninja-persistence/src/ninja_persistence/adapters/graph.py
async def find_by_id(self, id: str) -> dict[str, Any] | None:
    raise NotImplementedError("GraphAdapter.find_by_id is not yet implemented.")

find_many async

find_many(
    filters: dict[str, Any] | None = None, limit: int = 100
) -> list[dict[str, Any]]
Source code in libs/ninja-persistence/src/ninja_persistence/adapters/graph.py
async def find_many(self, filters: dict[str, Any] | None = None, limit: int = 100) -> list[dict[str, Any]]:
    raise NotImplementedError("GraphAdapter.find_many is not yet implemented.")

create async

create(data: dict[str, Any]) -> dict[str, Any]
Source code in libs/ninja-persistence/src/ninja_persistence/adapters/graph.py
async def create(self, data: dict[str, Any]) -> dict[str, Any]:
    raise NotImplementedError("GraphAdapter.create is not yet implemented.")

update async

update(
    id: str, patch: dict[str, Any]
) -> dict[str, Any] | None
Source code in libs/ninja-persistence/src/ninja_persistence/adapters/graph.py
async def update(self, id: str, patch: dict[str, Any]) -> dict[str, Any] | None:
    raise NotImplementedError("GraphAdapter.update is not yet implemented.")

delete async

delete(id: str) -> bool
Source code in libs/ninja-persistence/src/ninja_persistence/adapters/graph.py
async def delete(self, id: str) -> bool:
    raise NotImplementedError("GraphAdapter.delete is not yet implemented.")

search_semantic async

search_semantic(
    query: str, limit: int = 10
) -> list[dict[str, Any]]
Source code in libs/ninja-persistence/src/ninja_persistence/adapters/graph.py
async def search_semantic(self, query: str, limit: int = 10) -> list[dict[str, Any]]:
    raise NotImplementedError("GraphAdapter.search_semantic is not yet implemented.")

upsert_embedding async

upsert_embedding(id: str, embedding: list[float]) -> None
Source code in libs/ninja-persistence/src/ninja_persistence/adapters/graph.py
async def upsert_embedding(self, id: str, embedding: list[float]) -> None:
    raise NotImplementedError("GraphAdapter.upsert_embedding is not yet implemented.")

MilvusVectorAdapter

MilvusVectorAdapter(
    entity: EntitySchema, client: Any = None
)

Milvus-backed vector store adapter.

Implements the Repository protocol with native semantic search support.

Requires the pymilvus optional dependency: pip install ninja-persistence[milvus]

Source code in libs/ninja-persistence/src/ninja_persistence/adapters/milvus.py
def __init__(self, entity: EntitySchema, client: Any = None) -> None:
    self._entity = entity
    self._client = client
    self._collection_name = entity.collection_name or entity.name.lower()

find_by_id async

find_by_id(id: str) -> dict[str, Any] | None
Source code in libs/ninja-persistence/src/ninja_persistence/adapters/milvus.py
async def find_by_id(self, id: str) -> dict[str, Any] | None:
    raise NotImplementedError("MilvusVectorAdapter.find_by_id is not yet implemented.")

find_many async

find_many(
    filters: dict[str, Any] | None = None, limit: int = 100
) -> list[dict[str, Any]]
Source code in libs/ninja-persistence/src/ninja_persistence/adapters/milvus.py
async def find_many(self, filters: dict[str, Any] | None = None, limit: int = 100) -> list[dict[str, Any]]:
    raise NotImplementedError("MilvusVectorAdapter.find_many is not yet implemented.")

create async

create(data: dict[str, Any]) -> dict[str, Any]
Source code in libs/ninja-persistence/src/ninja_persistence/adapters/milvus.py
async def create(self, data: dict[str, Any]) -> dict[str, Any]:
    raise NotImplementedError("MilvusVectorAdapter.create is not yet implemented.")

update async

update(
    id: str, patch: dict[str, Any]
) -> dict[str, Any] | None
Source code in libs/ninja-persistence/src/ninja_persistence/adapters/milvus.py
async def update(self, id: str, patch: dict[str, Any]) -> dict[str, Any] | None:
    raise NotImplementedError("MilvusVectorAdapter.update is not yet implemented.")

delete async

delete(id: str) -> bool
Source code in libs/ninja-persistence/src/ninja_persistence/adapters/milvus.py
async def delete(self, id: str) -> bool:
    raise NotImplementedError("MilvusVectorAdapter.delete is not yet implemented.")

search_semantic async

search_semantic(
    query: str, limit: int = 10
) -> list[dict[str, Any]]
Source code in libs/ninja-persistence/src/ninja_persistence/adapters/milvus.py
async def search_semantic(self, query: str, limit: int = 10) -> list[dict[str, Any]]:
    raise NotImplementedError("MilvusVectorAdapter.search_semantic is not yet implemented.")

upsert_embedding async

upsert_embedding(id: str, embedding: list[float]) -> None
Source code in libs/ninja-persistence/src/ninja_persistence/adapters/milvus.py
async def upsert_embedding(self, id: str, embedding: list[float]) -> None:
    raise NotImplementedError("MilvusVectorAdapter.upsert_embedding is not yet implemented.")

MongoAdapter

MongoAdapter(entity: EntitySchema, database: Any = None)

Async MongoDB adapter backed by Motor.

Implements the Repository protocol for document databases.

Requires the motor optional dependency: pip install ninja-persistence[mongo]

Source code in libs/ninja-persistence/src/ninja_persistence/adapters/mongo.py
def __init__(self, entity: EntitySchema, database: Any = None) -> None:
    self._entity = entity
    self._database = database
    self._collection_name = entity.collection_name or entity.name.lower()

find_by_id async

find_by_id(id: str) -> dict[str, Any] | None
Source code in libs/ninja-persistence/src/ninja_persistence/adapters/mongo.py
async def find_by_id(self, id: str) -> dict[str, Any] | None:
    coll = self._get_collection()
    doc = await coll.find_one({"_id": id})
    return dict(doc) if doc else None

find_many async

find_many(
    filters: dict[str, Any] | None = None, limit: int = 100
) -> list[dict[str, Any]]
Source code in libs/ninja-persistence/src/ninja_persistence/adapters/mongo.py
async def find_many(self, filters: dict[str, Any] | None = None, limit: int = 100) -> list[dict[str, Any]]:
    coll = self._get_collection()
    cursor = coll.find(filters or {}).limit(limit)
    return [dict(doc) async for doc in cursor]

create async

create(data: dict[str, Any]) -> dict[str, Any]
Source code in libs/ninja-persistence/src/ninja_persistence/adapters/mongo.py
async def create(self, data: dict[str, Any]) -> dict[str, Any]:
    coll = self._get_collection()
    await coll.insert_one(data)
    return data

update async

update(
    id: str, patch: dict[str, Any]
) -> dict[str, Any] | None
Source code in libs/ninja-persistence/src/ninja_persistence/adapters/mongo.py
async def update(self, id: str, patch: dict[str, Any]) -> dict[str, Any] | None:
    coll = self._get_collection()
    result = await coll.update_one({"_id": id}, {"$set": patch})
    if result.matched_count == 0:
        return None
    return await self.find_by_id(id)

delete async

delete(id: str) -> bool
Source code in libs/ninja-persistence/src/ninja_persistence/adapters/mongo.py
async def delete(self, id: str) -> bool:
    coll = self._get_collection()
    result = await coll.delete_one({"_id": id})
    return result.deleted_count > 0

search_semantic async

search_semantic(
    query: str, limit: int = 10
) -> list[dict[str, Any]]

Semantic search requires a sidecar vector index for Mongo-backed entities.

Source code in libs/ninja-persistence/src/ninja_persistence/adapters/mongo.py
async def search_semantic(self, query: str, limit: int = 10) -> list[dict[str, Any]]:
    """Semantic search requires a sidecar vector index for Mongo-backed entities."""
    return []

upsert_embedding async

upsert_embedding(id: str, embedding: list[float]) -> None

Embedding storage requires a sidecar vector adapter for Mongo-backed entities.

Source code in libs/ninja-persistence/src/ninja_persistence/adapters/mongo.py
async def upsert_embedding(self, id: str, embedding: list[float]) -> None:
    """Embedding storage requires a sidecar vector adapter for Mongo-backed entities."""

SQLAdapter

SQLAdapter(engine: AsyncEngine, entity: EntitySchema)

Async SQL adapter backed by SQLAlchemy.

Implements the Repository protocol for relational databases.

Source code in libs/ninja-persistence/src/ninja_persistence/adapters/sql.py
def __init__(self, engine: AsyncEngine, entity: EntitySchema) -> None:
    self._engine = engine
    self._entity = entity
    self._metadata = sa.MetaData()
    self._table = _build_table(entity, self._metadata)

table property

table: Table

ensure_table async

ensure_table() -> None

Create the table if it does not exist.

Source code in libs/ninja-persistence/src/ninja_persistence/adapters/sql.py
async def ensure_table(self) -> None:
    """Create the table if it does not exist."""
    async with self._engine.begin() as conn:
        await conn.run_sync(self._metadata.create_all)

find_by_id async

find_by_id(id: str) -> dict[str, Any] | None
Source code in libs/ninja-persistence/src/ninja_persistence/adapters/sql.py
async def find_by_id(self, id: str) -> dict[str, Any] | None:
    pk = _get_pk_column(self._table)
    stmt = self._table.select().where(pk == id)
    async with AsyncSession(self._engine) as session:
        result = await session.execute(stmt)
        row = result.mappings().first()
        return dict(row) if row else None

find_many async

find_many(
    filters: dict[str, Any] | None = None, limit: int = 100
) -> list[dict[str, Any]]
Source code in libs/ninja-persistence/src/ninja_persistence/adapters/sql.py
async def find_many(self, filters: dict[str, Any] | None = None, limit: int = 100) -> list[dict[str, Any]]:
    stmt = self._table.select().limit(limit)
    if filters:
        for col_name, value in filters.items():
            if col_name in self._table.c:
                stmt = stmt.where(self._table.c[col_name] == value)
    async with AsyncSession(self._engine) as session:
        result = await session.execute(stmt)
        return [dict(row) for row in result.mappings().all()]

create async

create(data: dict[str, Any]) -> dict[str, Any]
Source code in libs/ninja-persistence/src/ninja_persistence/adapters/sql.py
async def create(self, data: dict[str, Any]) -> dict[str, Any]:
    stmt = self._table.insert().values(**data)
    async with AsyncSession(self._engine) as session:
        await session.execute(stmt)
        await session.commit()
    return data

update async

update(
    id: str, patch: dict[str, Any]
) -> dict[str, Any] | None
Source code in libs/ninja-persistence/src/ninja_persistence/adapters/sql.py
async def update(self, id: str, patch: dict[str, Any]) -> dict[str, Any] | None:
    pk = _get_pk_column(self._table)
    stmt = self._table.update().where(pk == id).values(**patch)
    async with AsyncSession(self._engine) as session:
        result = await session.execute(stmt)
        await session.commit()
        if result.rowcount == 0:
            return None
    return await self.find_by_id(id)

delete async

delete(id: str) -> bool
Source code in libs/ninja-persistence/src/ninja_persistence/adapters/sql.py
async def delete(self, id: str) -> bool:
    pk = _get_pk_column(self._table)
    stmt = self._table.delete().where(pk == id)
    async with AsyncSession(self._engine) as session:
        result = await session.execute(stmt)
        await session.commit()
        return result.rowcount > 0

search_semantic async

search_semantic(
    query: str, limit: int = 10
) -> list[dict[str, Any]]

Semantic search is not natively supported in SQL.

For SQL-backed entities, this requires a sidecar vector index. Returns an empty list when no sidecar is configured.

Source code in libs/ninja-persistence/src/ninja_persistence/adapters/sql.py
async def search_semantic(self, query: str, limit: int = 10) -> list[dict[str, Any]]:
    """Semantic search is not natively supported in SQL.

    For SQL-backed entities, this requires a sidecar vector index.
    Returns an empty list when no sidecar is configured.
    """
    return []

upsert_embedding async

upsert_embedding(id: str, embedding: list[float]) -> None

Embedding storage is not natively supported in SQL.

For SQL-backed entities, embeddings are managed by a sidecar vector adapter. This is a no-op when no sidecar is configured.

Source code in libs/ninja-persistence/src/ninja_persistence/adapters/sql.py
async def upsert_embedding(self, id: str, embedding: list[float]) -> None:
    """Embedding storage is not natively supported in SQL.

    For SQL-backed entities, embeddings are managed by a sidecar vector adapter.
    This is a no-op when no sidecar is configured.
    """

ConnectionManager

ConnectionManager(
    profiles: dict[str, ConnectionProfile] | None = None,
)

Manages connection pools for all configured engines.

Reads connection profiles from .ninjastack/connections.json and lazily creates engine-specific connection objects on first access.

Source code in libs/ninja-persistence/src/ninja_persistence/connections.py
def __init__(self, profiles: dict[str, ConnectionProfile] | None = None) -> None:
    self._profiles: dict[str, ConnectionProfile] = profiles or {}
    self._sql_engines: dict[str, AsyncEngine] = {}

from_file classmethod

from_file(
    path: str | Path = ".ninjastack/connections.json",
) -> ConnectionManager

Load connection profiles from a JSON file.

Source code in libs/ninja-persistence/src/ninja_persistence/connections.py
@classmethod
def from_file(cls, path: str | Path = ".ninjastack/connections.json") -> ConnectionManager:
    """Load connection profiles from a JSON file."""
    filepath = Path(path)
    if not filepath.exists():
        return cls(profiles={})
    raw = json.loads(filepath.read_text())
    profiles = {name: ConnectionProfile(**cfg) for name, cfg in raw.items()}
    return cls(profiles=profiles)

get_profile

get_profile(name: str) -> ConnectionProfile

Get a connection profile by name.

Source code in libs/ninja-persistence/src/ninja_persistence/connections.py
def get_profile(self, name: str) -> ConnectionProfile:
    """Get a connection profile by name."""
    if name not in self._profiles:
        raise KeyError(f"Connection profile '{name}' not found. Available: {list(self._profiles.keys())}")
    return self._profiles[name]

get_sql_engine

get_sql_engine(
    profile_name: str = "default",
) -> AsyncEngine

Get or create an async SQLAlchemy engine for the given profile.

Source code in libs/ninja-persistence/src/ninja_persistence/connections.py
def get_sql_engine(self, profile_name: str = "default") -> AsyncEngine:
    """Get or create an async SQLAlchemy engine for the given profile."""
    if profile_name not in self._sql_engines:
        profile = self.get_profile(profile_name)
        kwargs: dict[str, Any] = {"echo": profile.options.get("echo", False)}
        # pool_size/max_overflow are not supported by SQLite's StaticPool
        if not profile.url.startswith("sqlite"):
            kwargs["pool_size"] = profile.options.get("pool_size", 5)
            kwargs["max_overflow"] = profile.options.get("max_overflow", 10)
        self._sql_engines[profile_name] = create_async_engine(profile.url, **kwargs)
    return self._sql_engines[profile_name]

close_all async

close_all() -> None

Dispose all managed connection pools.

Source code in libs/ninja-persistence/src/ninja_persistence/connections.py
async def close_all(self) -> None:
    """Dispose all managed connection pools."""
    for engine in self._sql_engines.values():
        await engine.dispose()
    self._sql_engines.clear()

ConnectionProfile

Bases: BaseModel

A single database connection configuration.

engine class-attribute instance-attribute

engine: str = Field(
    description="Storage engine type: sql, mongo, graph, vector."
)

url class-attribute instance-attribute

url: str = Field(description='Connection URL / DSN.')

options class-attribute instance-attribute

options: dict[str, Any] = Field(
    default_factory=dict,
    description="Engine-specific options.",
)

EmbeddingStrategy

Bases: BaseModel

Configuration for how to generate embeddings for an entity.

Examines the entity schema to determine which fields have embedding configs and produces the text payload to send to the embedding model.

model_name class-attribute instance-attribute

model_name: str = Field(
    default="text-embedding-3-small",
    description="Default embedding model.",
)

dimensions class-attribute instance-attribute

dimensions: int = Field(
    default=1536,
    description="Default vector dimensionality.",
)

separator class-attribute instance-attribute

separator: str = Field(
    default=" ",
    description="Separator when concatenating multiple fields.",
)

get_embeddable_fields

get_embeddable_fields(
    entity: EntitySchema,
) -> list[FieldSchema]

Return all fields in the entity that have embedding configuration.

Source code in libs/ninja-persistence/src/ninja_persistence/embedding/strategy.py
def get_embeddable_fields(self, entity: EntitySchema) -> list[FieldSchema]:
    """Return all fields in the entity that have embedding configuration."""
    return [f for f in entity.fields if f.embedding is not None]

build_text_for_embedding

build_text_for_embedding(
    entity: EntitySchema, record: dict[str, Any]
) -> str

Build the text payload to embed from a record's embeddable fields.

If no fields have explicit embedding config, falls back to concatenating all string/text fields.

Source code in libs/ninja-persistence/src/ninja_persistence/embedding/strategy.py
def build_text_for_embedding(self, entity: EntitySchema, record: dict[str, Any]) -> str:
    """Build the text payload to embed from a record's embeddable fields.

    If no fields have explicit embedding config, falls back to
    concatenating all string/text fields.
    """
    embeddable = self.get_embeddable_fields(entity)
    if embeddable:
        parts = [str(record.get(f.name, "")) for f in embeddable if record.get(f.name)]
        return self.separator.join(parts)

    # Fallback: concatenate all string-like fields
    from ninja_core.schema.entity import FieldType

    text_types = {FieldType.STRING, FieldType.TEXT}
    parts = [
        str(record.get(f.name, "")) for f in entity.fields if f.field_type in text_types and record.get(f.name)
    ]
    return self.separator.join(parts)

get_model_for_field

get_model_for_field(field: FieldSchema) -> str

Return the embedding model to use for a specific field.

Source code in libs/ninja-persistence/src/ninja_persistence/embedding/strategy.py
def get_model_for_field(self, field: FieldSchema) -> str:
    """Return the embedding model to use for a specific field."""
    if field.embedding:
        return field.embedding.model
    return self.model_name

get_dimensions_for_field

get_dimensions_for_field(field: FieldSchema) -> int

Return the vector dimensions to use for a specific field.

Source code in libs/ninja-persistence/src/ninja_persistence/embedding/strategy.py
def get_dimensions_for_field(self, field: FieldSchema) -> int:
    """Return the vector dimensions to use for a specific field."""
    if field.embedding:
        return field.embedding.dimensions
    return self.dimensions

Repository

Bases: Protocol[T]

Unified persistence interface for all storage engines.

Every adapter (SQL, Mongo, Graph, Vector) implements this protocol so that Data Agents can perform CRUD and semantic search without knowing the backend.

find_by_id async

find_by_id(id: str) -> dict[str, Any] | None

Retrieve a single record by primary key.

Source code in libs/ninja-persistence/src/ninja_persistence/protocols.py
async def find_by_id(self, id: str) -> dict[str, Any] | None:
    """Retrieve a single record by primary key."""
    ...

find_many async

find_many(
    filters: dict[str, Any] | None = None, limit: int = 100
) -> list[dict[str, Any]]

Retrieve multiple records matching the given filters.

Source code in libs/ninja-persistence/src/ninja_persistence/protocols.py
async def find_many(self, filters: dict[str, Any] | None = None, limit: int = 100) -> list[dict[str, Any]]:
    """Retrieve multiple records matching the given filters."""
    ...

create async

create(data: dict[str, Any]) -> dict[str, Any]

Insert a new record and return the created entity.

Source code in libs/ninja-persistence/src/ninja_persistence/protocols.py
async def create(self, data: dict[str, Any]) -> dict[str, Any]:
    """Insert a new record and return the created entity."""
    ...

update async

update(
    id: str, patch: dict[str, Any]
) -> dict[str, Any] | None

Apply a partial update to an existing record.

Source code in libs/ninja-persistence/src/ninja_persistence/protocols.py
async def update(self, id: str, patch: dict[str, Any]) -> dict[str, Any] | None:
    """Apply a partial update to an existing record."""
    ...

delete async

delete(id: str) -> bool

Delete a record by primary key. Returns True if deleted.

Source code in libs/ninja-persistence/src/ninja_persistence/protocols.py
async def delete(self, id: str) -> bool:
    """Delete a record by primary key. Returns True if deleted."""
    ...

search_semantic async

search_semantic(
    query: str, limit: int = 10
) -> list[dict[str, Any]]

Perform semantic (vector similarity) search.

Source code in libs/ninja-persistence/src/ninja_persistence/protocols.py
async def search_semantic(self, query: str, limit: int = 10) -> list[dict[str, Any]]:
    """Perform semantic (vector similarity) search."""
    ...

upsert_embedding async

upsert_embedding(id: str, embedding: list[float]) -> None

Insert or update the embedding vector for a record.

Source code in libs/ninja-persistence/src/ninja_persistence/protocols.py
async def upsert_embedding(self, id: str, embedding: list[float]) -> None:
    """Insert or update the embedding vector for a record."""
    ...

AdapterRegistry

AdapterRegistry(connection_manager: ConnectionManager)

Routes entity schemas to the correct persistence adapter.

Given an entity's StorageEngine from the ASD, the registry returns a configured Repository instance backed by the appropriate adapter.

Source code in libs/ninja-persistence/src/ninja_persistence/registry.py
def __init__(self, connection_manager: ConnectionManager) -> None:
    self._connection_manager = connection_manager
    self._overrides: dict[str, Repository[Any]] = {}

register

register(
    entity_name: str, repository: Repository[Any]
) -> None

Register a custom repository override for an entity.

Source code in libs/ninja-persistence/src/ninja_persistence/registry.py
def register(self, entity_name: str, repository: Repository[Any]) -> None:
    """Register a custom repository override for an entity."""
    self._overrides[entity_name] = repository

get_repository

get_repository(
    entity: EntitySchema, profile_name: str = "default"
) -> Repository[Any]

Resolve the correct repository adapter for an entity.

Checks overrides first, then falls back to engine-based routing.

Source code in libs/ninja-persistence/src/ninja_persistence/registry.py
def get_repository(self, entity: EntitySchema, profile_name: str = "default") -> Repository[Any]:
    """Resolve the correct repository adapter for an entity.

    Checks overrides first, then falls back to engine-based routing.
    """
    if entity.name in self._overrides:
        return self._overrides[entity.name]

    engine = entity.storage_engine

    if engine == StorageEngine.SQL:
        from ninja_persistence.adapters.sql import SQLAdapter

        sql_engine = self._connection_manager.get_sql_engine(profile_name)
        return SQLAdapter(engine=sql_engine, entity=entity)

    if engine == StorageEngine.MONGO:
        from ninja_persistence.adapters.mongo import MongoAdapter

        return MongoAdapter(entity=entity)

    if engine == StorageEngine.GRAPH:
        from ninja_persistence.adapters.graph import GraphAdapter

        return GraphAdapter(entity=entity)

    if engine == StorageEngine.VECTOR:
        from ninja_persistence.adapters.chroma import ChromaVectorAdapter

        return ChromaVectorAdapter(entity=entity)

    raise ValueError(f"Unsupported storage engine: {engine}")