Coverage for src / dataknobs_llm / conversations / storage.py: 53%
185 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-15 10:29 -0700
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-15 10:29 -0700
1"""Conversation storage with tree-based branching support.
3This module provides:
4- ConversationNode: Data stored in each tree node
5- ConversationState: Tree-based conversation state
6- ConversationStorage: Abstract storage interface
7- DataknobsConversationStorage: Storage adapter for dataknobs backends
8- Helper functions for node ID management and tree navigation
10Schema Versioning:
11 The storage format uses semantic versioning (MAJOR.MINOR.PATCH):
12 - MAJOR: Incompatible changes requiring migration
13 - MINOR: Backward-compatible additions
14 - PATCH: Bug fixes, no schema changes
16 Current schema version: 1.0.0
18Storage Architecture:
19 Conversations are stored as trees where each node represents a message.
20 The tree structure is serialized as:
21 - **Nodes**: List of ConversationNode objects (messages with metadata)
22 - **Edges**: List of [parent_id, child_id] relationships
23 - **Current Position**: Node ID showing where you are in the conversation
25 This format supports:
26 - Full conversation history with branching
27 - Efficient deserialization and tree reconstruction
28 - Schema evolution with automatic migration
29 - Backend-agnostic storage (works with any dataknobs backend)
31Example:
32 ```python
33 from dataknobs_data import database_factory
34 from dataknobs_llm.conversations import (
35 ConversationState,
36 ConversationNode,
37 DataknobsConversationStorage
38 )
39 from dataknobs_llm.llm.base import LLMMessage
40 from dataknobs_structures.tree import Tree
42 # Create storage backend
43 db = database_factory.create(backend="memory")
44 storage = DataknobsConversationStorage(db)
46 # Create conversation state
47 root_node = ConversationNode(
48 message=LLMMessage(role="system", content="You are helpful"),
49 node_id=""
50 )
51 tree = Tree(root_node)
52 state = ConversationState(
53 conversation_id="conv-123",
54 message_tree=tree,
55 current_node_id="",
56 metadata={"user_id": "alice"}
57 )
59 # Save conversation
60 await storage.save_conversation(state)
62 # Load conversation
63 loaded = await storage.load_conversation("conv-123")
64 messages = loaded.get_current_messages()
66 # List all conversations for user
67 user_convos = await storage.list_conversations(
68 filter_metadata={"user_id": "alice"}
69 )
70 ```
72Serialization Format:
73 The serialized format (from `ConversationState.to_dict()`) looks like:
75 ```python
76 {
77 "schema_version": "1.0.0",
78 "conversation_id": "conv-123",
79 "current_node_id": "0.1",
80 "metadata": {"user_id": "alice"},
81 "created_at": "2024-01-01T00:00:00",
82 "updated_at": "2024-01-01T00:05:00",
83 "nodes": [
84 {
85 "node_id": "",
86 "message": {
87 "role": "system",
88 "content": "You are helpful",
89 "name": None,
90 "metadata": {}
91 },
92 "timestamp": "2024-01-01T00:00:00",
93 "prompt_name": None,
94 "branch_name": None,
95 "metadata": {}
96 },
97 {
98 "node_id": "0",
99 "message": {"role": "user", "content": "Hello", ...},
100 ...
101 },
102 {
103 "node_id": "0.0",
104 "message": {"role": "assistant", "content": "Hi!", ...},
105 "metadata": {"usage": {...}, "cost_usd": 0.0001}
106 },
107 {
108 "node_id": "0.1", # Alternative response branch
109 "message": {"role": "assistant", "content": "Greetings!", ...},
110 "branch_name": "polite-variant"
111 }
112 ],
113 "edges": [
114 ["", "0"], # Root -> user message
115 ["0", "0.0"], # User -> assistant (first branch)
116 ["0", "0.1"] # User -> assistant (alternative branch)
117 ]
118 }
119 ```
121See Also:
122 ConversationManager: High-level conversation orchestration
123 AsyncPromptBuilder: Prompt rendering with RAG integration
124"""
126from abc import ABC, abstractmethod
127from dataclasses import dataclass, field
128from datetime import datetime
129from typing import Any, Dict, List
130import logging
132from dataknobs_structures.tree import Tree
133from dataknobs_llm.llm.base import LLMMessage
134from dataknobs_llm.exceptions import StorageError, SchemaVersionError
136# Current schema version - increment when making schema changes
137SCHEMA_VERSION = "1.0.0"
139logger = logging.getLogger(__name__)
142@dataclass
143class ConversationNode:
144 """Data stored in each conversation tree node.
146 Each node represents a single message (system, user, or assistant) in the
147 conversation. The tree structure allows for branching conversations where
148 multiple alternative messages can be explored.
150 Attributes:
151 message: The LLM message (role + content)
152 node_id: Dot-delimited child positions from root (e.g., "0.1.2")
153 timestamp: When this message was created
154 prompt_name: Optional name of prompt template used to generate this
155 branch_name: Optional human-readable label for this branch
156 metadata: Additional metadata (usage stats, model info, etc.)
158 Example:
159 >>> node = ConversationNode(
160 ... message=LLMMessage(role="user", content="Hello"),
161 ... node_id="0.1",
162 ... timestamp=datetime.now(),
163 ... prompt_name="greeting",
164 ... branch_name="polite-variant"
165 ... )
166 """
167 message: LLMMessage
168 node_id: str
169 timestamp: datetime = field(default_factory=datetime.now)
170 prompt_name: str | None = None
171 branch_name: str | None = None
172 metadata: Dict[str, Any] = field(default_factory=dict)
174 def to_dict(self) -> Dict[str, Any]:
175 """Convert node to dictionary for storage."""
176 return {
177 "message": {
178 "role": self.message.role,
179 "content": self.message.content,
180 "name": self.message.name,
181 "metadata": self.message.metadata or {}
182 },
183 "node_id": self.node_id,
184 "timestamp": self.timestamp.isoformat(),
185 "prompt_name": self.prompt_name,
186 "branch_name": self.branch_name,
187 "metadata": self.metadata
188 }
190 @classmethod
191 def from_dict(cls, data: Dict[str, Any]) -> "ConversationNode":
192 """Create node from dictionary."""
193 msg_data = data["message"]
194 return cls(
195 message=LLMMessage(
196 role=msg_data["role"],
197 content=msg_data["content"],
198 name=msg_data.get("name"),
199 metadata=msg_data.get("metadata", {})
200 ),
201 node_id=data["node_id"],
202 timestamp=datetime.fromisoformat(data["timestamp"]),
203 prompt_name=data.get("prompt_name"),
204 branch_name=data.get("branch_name"),
205 metadata=data.get("metadata", {})
206 )
209def calculate_node_id(node: Tree) -> str:
210 """Calculate dot-delimited node ID by walking up to root.
212 The node ID represents the path from root to this node as a series of
213 child indexes. For example, "0.1.2" means: root's child 0, then that
214 node's child 1, then that node's child 2.
216 Args:
217 node: Tree node to calculate ID for
219 Returns:
220 Dot-delimited node ID (e.g., "0", "0.1", "0.1.2")
222 Example:
223 >>> root = Tree(data)
224 >>> child = root.add_child(data2)
225 >>> grandchild = child.add_child(data3)
226 >>> calculate_node_id(grandchild)
227 '0.0'
228 """
229 if node.parent is None:
230 # Root node has no parent, so it's just "0" or we could use ""
231 # Let's use "" for root to make child IDs cleaner
232 return ""
234 # Walk up to root, collecting child indexes
235 indexes = []
236 current = node
237 while current.parent is not None:
238 indexes.append(str(current.sibnum))
239 current = current.parent
241 # Reverse to get root-to-node order
242 indexes.reverse()
244 return ".".join(indexes) if indexes else "0"
247def get_node_by_id(tree: Tree, node_id: str) -> Tree | None:
248 """Retrieve tree node by its dot-delimited ID.
250 Args:
251 tree: Root of the tree
252 node_id: Dot-delimited node ID (e.g., "0.1.2")
254 Returns:
255 Tree node with that ID, or None if not found
257 Example:
258 >>> node = get_node_by_id(tree, "0.1.2")
259 >>> # Equivalent to: tree.children[0].children[1].children[2]
260 """
261 if not node_id or node_id == "":
262 return tree # Root node
264 # Split into child indexes
265 try:
266 indexes = [int(i) for i in node_id.split(".")]
267 except ValueError:
268 return None # Invalid node_id format
270 # Navigate down the tree
271 current = tree
272 for idx in indexes:
273 if not current.children or idx >= len(current.children):
274 return None # Invalid path
275 current = current.children[idx]
277 return current
280def get_messages_for_llm(tree: Tree, node_id: str) -> List[LLMMessage]:
281 """Get linear message sequence from root to specified node.
283 This is what gets sent to the LLM - the path through the tree from
284 root to current position.
286 Args:
287 tree: Root of conversation tree
288 node_id: ID of current position
290 Returns:
291 List of messages from root to current node
293 Example:
294 >>> messages = get_messages_for_llm(tree, "0.1.2")
295 >>> # Returns: [root_msg, child_0_msg, child_1_msg, child_2_msg]
296 """
297 node = get_node_by_id(tree, node_id)
298 if node is None:
299 return []
301 # Get path from root to node
302 path = node.get_path()
304 # Extract messages from each node's data
305 messages = []
306 for tree_node in path:
307 if isinstance(tree_node.data, ConversationNode):
308 messages.append(tree_node.data.message)
310 return messages
313@dataclass
314class ConversationState:
315 """State of a conversation with tree-based branching support.
317 This replaces the linear message history with a tree structure that
318 supports multiple branches (alternative conversation paths).
320 Attributes:
321 conversation_id: Unique conversation identifier
322 message_tree: Root of conversation tree (Tree[ConversationNode])
323 current_node_id: ID of current position in tree (dot-delimited)
324 metadata: Additional conversation metadata
325 created_at: Conversation creation timestamp
326 updated_at: Last update timestamp
327 schema_version: Version of the storage schema used
329 Example:
330 >>> # Create conversation with system message
331 >>> root_node = ConversationNode(
332 ... message=LLMMessage(role="system", content="You are helpful"),
333 ... node_id=""
334 ... )
335 >>> tree = Tree(root_node)
336 >>> state = ConversationState(
337 ... conversation_id="conv-123",
338 ... message_tree=tree,
339 ... current_node_id="",
340 ... metadata={"user_id": "alice"}
341 ... )
342 >>>
343 >>> # Add user message
344 >>> user_node = ConversationNode(
345 ... message=LLMMessage(role="user", content="Hello"),
346 ... node_id="0"
347 ... )
348 >>> tree.add_child(Tree(user_node))
349 >>> state.current_node_id = "0"
350 """
351 conversation_id: str
352 message_tree: Tree # Tree[ConversationNode]
353 current_node_id: str = ""
354 metadata: Dict[str, Any] = field(default_factory=dict)
355 created_at: datetime = field(default_factory=datetime.now)
356 updated_at: datetime = field(default_factory=datetime.now)
357 schema_version: str = SCHEMA_VERSION
359 def get_current_node(self) -> Tree | None:
360 """Get the current tree node."""
361 return get_node_by_id(self.message_tree, self.current_node_id)
363 def get_current_messages(self) -> List[LLMMessage]:
364 """Get messages from root to current position (for LLM)."""
365 return get_messages_for_llm(self.message_tree, self.current_node_id)
367 def to_dict(self) -> Dict[str, Any]:
368 """Convert state to dictionary for storage.
370 The tree is serialized as a list of edges (parent_id, child_id, node_data).
371 Includes schema_version for backward compatibility.
372 """
373 # Collect all nodes and their data
374 nodes = []
375 edges = []
377 all_nodes = self.message_tree.find_nodes(lambda n: True, traversal="bfs") # noqa: ARG005
378 for tree_node in all_nodes:
379 if isinstance(tree_node.data, ConversationNode):
380 nodes.append(tree_node.data.to_dict())
382 # Add edge to parent (if not root)
383 if tree_node.parent is not None:
384 parent_id = calculate_node_id(tree_node.parent)
385 child_id = tree_node.data.node_id
386 edges.append([parent_id, child_id])
388 return {
389 "schema_version": self.schema_version,
390 "conversation_id": self.conversation_id,
391 "nodes": nodes,
392 "edges": edges,
393 "current_node_id": self.current_node_id,
394 "metadata": self.metadata,
395 "created_at": self.created_at.isoformat(),
396 "updated_at": self.updated_at.isoformat()
397 }
399 @classmethod
400 def from_dict(cls, data: Dict[str, Any]) -> "ConversationState":
401 """Create state from dictionary.
403 Reconstructs the tree from nodes and edges.
404 Handles schema version migration if needed.
405 """
406 # Check schema version
407 stored_version = data.get("schema_version", "0.0.0") # Default to 0.0.0 if missing
409 # Apply migrations if needed
410 if stored_version != SCHEMA_VERSION:
411 logger.info(
412 f"Migrating conversation {data['conversation_id']} "
413 f"from schema {stored_version} to {SCHEMA_VERSION}"
414 )
415 data = cls._migrate_schema(data, stored_version, SCHEMA_VERSION)
417 # Create nodes indexed by ID
418 nodes_by_id: Dict[str, ConversationNode] = {}
419 for node_data in data["nodes"]:
420 node = ConversationNode.from_dict(node_data)
421 nodes_by_id[node.node_id] = node
423 # Find root (node with empty ID)
424 root_node = nodes_by_id.get("")
425 if root_node is None:
426 # Try to find node with no parent in edges
427 child_ids = {edge[1] for edge in data["edges"]}
428 parent_ids = {edge[0] for edge in data["edges"]}
429 root_ids = parent_ids - child_ids
430 if root_ids:
431 root_node = nodes_by_id[root_ids.pop()]
432 else:
433 # Fallback: first node
434 root_node = next(iter(nodes_by_id.values()))
436 tree = Tree(root_node)
437 tree_nodes_by_id = {"": tree} # Map node_id -> Tree node
439 # Build tree by adding edges
440 for parent_id, child_id in data["edges"]:
441 if parent_id in tree_nodes_by_id:
442 parent_tree_node = tree_nodes_by_id[parent_id]
443 child_node = nodes_by_id[child_id]
444 child_tree_node = parent_tree_node.add_child(Tree(child_node))
445 tree_nodes_by_id[child_id] = child_tree_node
447 return cls(
448 conversation_id=data["conversation_id"],
449 message_tree=tree,
450 current_node_id=data["current_node_id"],
451 metadata=data["metadata"],
452 created_at=datetime.fromisoformat(data["created_at"]),
453 updated_at=datetime.fromisoformat(data["updated_at"]),
454 schema_version=SCHEMA_VERSION # Always use current version after migration
455 )
457 @staticmethod
458 def _migrate_schema(
459 data: Dict[str, Any],
460 from_version: str,
461 to_version: str
462 ) -> Dict[str, Any]:
463 """Migrate data from one schema version to another.
465 This method applies migrations sequentially to transform data from
466 an older schema version to the current version. Migrations are applied
467 in order (e.g., 0.0.0 → 1.0.0 → 1.1.0 → 1.2.0).
469 Args:
470 data: Data in old schema format
471 from_version: Source schema version
472 to_version: Target schema version
474 Returns:
475 Data in new schema format
477 Raises:
478 SchemaVersionError: If migration path is not supported
480 Example:
481 ```python
482 # Example migration from 1.0.0 to 1.1.0 might add a new field
483 old_data = {
484 "schema_version": "1.0.0",
485 "conversation_id": "conv-123",
486 "nodes": [...],
487 "edges": [...]
488 }
490 # After migration to 1.1.0
491 new_data = ConversationState._migrate_schema(
492 old_data,
493 from_version="1.0.0",
494 to_version="1.1.0"
495 )
496 # new_data might now include: {"tags": [], ...}
497 ```
499 Note:
500 **Adding New Migration Paths**:
502 When introducing schema changes, add a migration method:
504 ```python
505 @staticmethod
506 def _migrate_1_0_to_1_1(data: Dict[str, Any]) -> Dict[str, Any]:
507 '''Migrate from schema 1.0 to 1.1.'''
508 # Example: Add new optional field
509 data["tags"] = []
510 data["schema_version"] = "1.1.0"
511 return data
512 ```
514 Then update this method to call it:
516 ```python
517 if from_version == "1.0.0" and to_version >= "1.1.0":
518 data = cls._migrate_1_0_to_1_1(data)
519 ```
520 """
521 # Parse version strings
522 from_major, _from_minor, _from_patch = map(int, from_version.split("."))
523 to_major, _to_minor, _to_patch = map(int, to_version.split("."))
525 # No migration needed if versions match
526 if from_version == to_version:
527 return data
529 # Apply migrations based on version transitions
530 # Future migrations will be added here as needed
532 # Example migration patterns:
533 # if from_version == "0.0.0" and to_version >= "1.0.0":
534 # data = cls._migrate_0_to_1(data)
535 # if from_version < "1.1.0" and to_version >= "1.1.0":
536 # data = cls._migrate_1_0_to_1_1(data)
538 # For now, version 0.0.0 (no version field) to 1.0.0 is a no-op
539 # because the schema didn't change, we just added versioning
540 if from_version == "0.0.0":
541 logger.debug("Migrating from unversioned schema to 1.0.0 (no changes needed)")
542 data["schema_version"] = "1.0.0"
543 return data
545 # If we get here and versions still don't match, it's unsupported
546 if from_major > to_major:
547 raise SchemaVersionError(
548 f"Cannot downgrade from schema {from_version} to {to_version}"
549 )
551 logger.warning(
552 f"No migration path defined from {from_version} to {to_version}. "
553 "Using data as-is."
554 )
555 data["schema_version"] = to_version
556 return data
558 # Future migration methods will be added here as needed:
559 # @staticmethod
560 # def _migrate_1_0_to_1_1(data: Dict[str, Any]) -> Dict[str, Any]:
561 # """Migrate from schema 1.0 to 1.1."""
562 # # Add new field with default value
563 # data["new_field"] = "default_value"
564 # return data
567class ConversationStorage(ABC):
568 """Abstract storage interface for conversations.
570 This interface defines the contract for persisting conversation state.
571 Implementations can use any backend (SQL, NoSQL, file, etc.).
572 """
574 @abstractmethod
575 async def save_conversation(self, state: ConversationState) -> None:
576 """Save conversation state (upsert).
578 Args:
579 state: Conversation state to save
580 """
581 pass
583 @abstractmethod
584 async def load_conversation(
585 self,
586 conversation_id: str
587 ) -> ConversationState | None:
588 """Load conversation state.
590 Args:
591 conversation_id: Conversation identifier
593 Returns:
594 Conversation state or None if not found
595 """
596 pass
598 @abstractmethod
599 async def delete_conversation(self, conversation_id: str) -> bool:
600 """Delete conversation.
602 Args:
603 conversation_id: Conversation identifier
605 Returns:
606 True if deleted, False if not found
607 """
608 pass
610 @abstractmethod
611 async def list_conversations(
612 self,
613 filter_metadata: Dict[str, Any] | None = None,
614 limit: int = 100,
615 offset: int = 0
616 ) -> List[ConversationState]:
617 """List conversations with optional filtering.
619 Args:
620 filter_metadata: Optional metadata filters
621 limit: Maximum number of results
622 offset: Offset for pagination
624 Returns:
625 List of conversation states
626 """
627 pass
630class DataknobsConversationStorage(ConversationStorage):
631 """Conversation storage using dataknobs_data backends.
633 Stores conversations as Records with the tree serialized as nodes + edges.
634 Works with any dataknobs backend (Memory, File, S3, Postgres, etc.).
636 The storage layer handles:
637 - Automatic serialization/deserialization of conversation trees
638 - Schema version migration when loading old conversations
639 - Metadata-based filtering for listing conversations
640 - Upsert operations (insert or update)
642 Attributes:
643 backend: Dataknobs async database backend instance
645 Example:
646 ```python
647 from dataknobs_data import database_factory
648 from dataknobs_llm.conversations import DataknobsConversationStorage
650 # Memory backend (development/testing)
651 db = database_factory.create(backend="memory")
652 storage = DataknobsConversationStorage(db)
654 # File backend (local persistence)
655 db = database_factory.create(
656 backend="file",
657 file_path="./conversations.jsonl"
658 )
659 storage = DataknobsConversationStorage(db)
661 # S3 backend (cloud storage)
662 db = database_factory.create(
663 backend="s3",
664 bucket="my-conversations",
665 region="us-west-2"
666 )
667 storage = DataknobsConversationStorage(db)
669 # Postgres backend (production)
670 db = database_factory.create(
671 backend="postgres",
672 host="db.example.com",
673 database="conversations",
674 user="app",
675 password="secret"
676 )
677 storage = DataknobsConversationStorage(db)
679 # Save conversation
680 await storage.save_conversation(state)
682 # Load conversation
683 state = await storage.load_conversation("conv-123")
685 # List user's conversations
686 user_convos = await storage.list_conversations(
687 filter_metadata={"user_id": "alice"},
688 limit=50
689 )
691 # Delete conversation
692 deleted = await storage.delete_conversation("conv-123")
693 ```
695 Note:
696 **Backend Selection**:
698 - **Memory**: Fast, no persistence. Use for testing or ephemeral conversations.
699 - **File**: Simple local persistence. Good for single-server deployments.
700 - **S3**: Scalable cloud storage. Best for serverless or distributed systems.
701 - **Postgres**: Full ACID guarantees. Best for production multi-server setups.
703 All backends support the same API, so you can switch between them
704 by changing the database_factory configuration.
706 See Also:
707 ConversationStorage: Abstract interface
708 ConversationState: State structure being stored
709 dataknobs_data.database_factory: Backend creation utilities
710 """
712 def __init__(self, backend: Any):
713 """Initialize storage with dataknobs backend.
715 Args:
716 backend: Dataknobs async database backend (AsyncMemoryDatabase, etc.)
717 """
718 self.backend = backend
720 def _state_to_record(self, state: ConversationState) -> Any:
721 """Convert ConversationState to Record.
723 Args:
724 state: Conversation state to convert
726 Returns:
727 Record object for storage
728 """
729 # Import here to avoid circular dependency
730 try:
731 from dataknobs_data.records import Record
732 except ImportError:
733 raise StorageError(
734 "dataknobs_data package not available. "
735 "Install it to use DataknobsConversationStorage."
736 ) from None
738 # Convert state to dict
739 data = state.to_dict()
741 # Create Record with conversation_id as storage_id
742 return Record(
743 data=data,
744 storage_id=state.conversation_id
745 )
747 def _record_to_state(self, record: Any) -> ConversationState:
748 """Convert Record to ConversationState.
750 Args:
751 record: Record object from storage
753 Returns:
754 Conversation state
755 """
756 # Extract data from record
757 data = {}
758 for field_name, field_obj in record.fields.items():
759 data[field_name] = field_obj.value
761 # Reconstruct conversation state
762 return ConversationState.from_dict(data)
764 async def save_conversation(self, state: ConversationState) -> None:
765 """Save conversation to backend."""
766 try:
767 record = self._state_to_record(state)
768 # Use upsert to insert or update
769 await self.backend.upsert(state.conversation_id, record)
770 except Exception as e:
771 raise StorageError(f"Failed to save conversation: {e}") from e
773 async def load_conversation(
774 self,
775 conversation_id: str
776 ) -> ConversationState | None:
777 """Load conversation from backend."""
778 try:
779 # Read record by ID
780 record = await self.backend.read(conversation_id)
781 if record is None:
782 return None
784 return self._record_to_state(record)
786 except Exception as e:
787 raise StorageError(f"Failed to load conversation: {e}") from e
789 async def delete_conversation(self, conversation_id: str) -> bool:
790 """Delete conversation from backend."""
791 try:
792 return await self.backend.delete(conversation_id)
793 except Exception as e:
794 raise StorageError(f"Failed to delete conversation: {e}") from e
796 async def update_metadata(
797 self,
798 conversation_id: str,
799 metadata: Dict[str, Any]
800 ) -> None:
801 """Update conversation metadata.
803 Loads the conversation, updates its metadata, and saves it back.
805 Args:
806 conversation_id: ID of conversation to update
807 metadata: New metadata dict (replaces existing metadata)
809 Raises:
810 StorageError: If conversation not found or update fails
811 """
812 try:
813 # Load existing conversation
814 state = await self.load_conversation(conversation_id)
815 if state is None:
816 raise StorageError(f"Conversation not found: {conversation_id}")
818 # Update metadata
819 state.metadata = metadata
821 # Save back
822 await self.save_conversation(state)
824 except StorageError:
825 raise
826 except Exception as e:
827 raise StorageError(f"Failed to update metadata: {e}") from e
829 async def list_conversations(
830 self,
831 filter_metadata: Dict[str, Any] | None = None,
832 limit: int = 100,
833 offset: int = 0
834 ) -> List[ConversationState]:
835 """List conversations from backend."""
836 try:
837 # Import Query for filtering
838 try:
839 from dataknobs_data.query import Query
840 except ImportError:
841 raise StorageError(
842 "dataknobs_data package not available. "
843 "Install it to use DataknobsConversationStorage."
844 ) from None
846 # Build query with metadata filters using fluent interface
847 query = Query()
848 query.limit(limit).offset(offset)
850 if filter_metadata:
851 for key, value in filter_metadata.items():
852 # Add filter for metadata.key = value
853 query.filter(f"metadata.{key}", "=", value)
855 # Search with query
856 results = await self.backend.search(query)
858 # Convert records to conversation states
859 return [self._record_to_state(record) for record in results]
861 except Exception as e:
862 raise StorageError(f"Failed to list conversations: {e}") from e