Coverage for src/dataknobs_llm/conversations/storage.py: 89%

177 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-11-08 13:51 -0700

1"""Conversation storage with tree-based branching support. 

2 

3This module provides: 

4- ConversationNode: Data stored in each tree node 

5- ConversationState: Tree-based conversation state 

6- ConversationStorage: Abstract storage interface 

7- DataknobsConversationStorage: Storage adapter for dataknobs backends 

8- Helper functions for node ID management and tree navigation 

9 

10Schema Versioning: 

11 The storage format uses semantic versioning (MAJOR.MINOR.PATCH): 

12 - MAJOR: Incompatible changes requiring migration 

13 - MINOR: Backward-compatible additions 

14 - PATCH: Bug fixes, no schema changes 

15 

16 Current schema version: 1.0.0 

17 

18Storage Architecture: 

19 Conversations are stored as trees where each node represents a message. 

20 The tree structure is serialized as: 

21 - **Nodes**: List of ConversationNode objects (messages with metadata) 

22 - **Edges**: List of [parent_id, child_id] relationships 

23 - **Current Position**: Node ID showing where you are in the conversation 

24 

25 This format supports: 

26 - Full conversation history with branching 

27 - Efficient deserialization and tree reconstruction 

28 - Schema evolution with automatic migration 

29 - Backend-agnostic storage (works with any dataknobs backend) 

30 

31Example: 

32 ```python 

33 from dataknobs_data import database_factory 

34 from dataknobs_llm.conversations import ( 

35 ConversationState, 

36 ConversationNode, 

37 DataknobsConversationStorage 

38 ) 

39 from dataknobs_llm.llm.base import LLMMessage 

40 from dataknobs_structures.tree import Tree 

41 

42 # Create storage backend 

43 db = database_factory.create(backend="memory") 

44 storage = DataknobsConversationStorage(db) 

45 

46 # Create conversation state 

47 root_node = ConversationNode( 

48 message=LLMMessage(role="system", content="You are helpful"), 

49 node_id="" 

50 ) 

51 tree = Tree(root_node) 

52 state = ConversationState( 

53 conversation_id="conv-123", 

54 message_tree=tree, 

55 current_node_id="", 

56 metadata={"user_id": "alice"} 

57 ) 

58 

59 # Save conversation 

60 await storage.save_conversation(state) 

61 

62 # Load conversation 

63 loaded = await storage.load_conversation("conv-123") 

64 messages = loaded.get_current_messages() 

65 

66 # List all conversations for user 

67 user_convos = await storage.list_conversations( 

68 filter_metadata={"user_id": "alice"} 

69 ) 

70 ``` 

71 

72Serialization Format: 

73 The serialized format (from `ConversationState.to_dict()`) looks like: 

74 

75 ```python 

76 { 

77 "schema_version": "1.0.0", 

78 "conversation_id": "conv-123", 

79 "current_node_id": "0.1", 

80 "metadata": {"user_id": "alice"}, 

81 "created_at": "2024-01-01T00:00:00", 

82 "updated_at": "2024-01-01T00:05:00", 

83 "nodes": [ 

84 { 

85 "node_id": "", 

86 "message": { 

87 "role": "system", 

88 "content": "You are helpful", 

89 "name": None, 

90 "metadata": {} 

91 }, 

92 "timestamp": "2024-01-01T00:00:00", 

93 "prompt_name": None, 

94 "branch_name": None, 

95 "metadata": {} 

96 }, 

97 { 

98 "node_id": "0", 

99 "message": {"role": "user", "content": "Hello", ...}, 

100 ... 

101 }, 

102 { 

103 "node_id": "0.0", 

104 "message": {"role": "assistant", "content": "Hi!", ...}, 

105 "metadata": {"usage": {...}, "cost_usd": 0.0001} 

106 }, 

107 { 

108 "node_id": "0.1", # Alternative response branch 

109 "message": {"role": "assistant", "content": "Greetings!", ...}, 

110 "branch_name": "polite-variant" 

111 } 

112 ], 

113 "edges": [ 

114 ["", "0"], # Root -> user message 

115 ["0", "0.0"], # User -> assistant (first branch) 

116 ["0", "0.1"] # User -> assistant (alternative branch) 

117 ] 

118 } 

119 ``` 

120 

121See Also: 

122 ConversationManager: High-level conversation orchestration 

123 AsyncPromptBuilder: Prompt rendering with RAG integration 

124""" 

125 

126from abc import ABC, abstractmethod 

127from dataclasses import dataclass, field 

128from datetime import datetime 

129from typing import Any, Dict, List 

130import logging 

131 

132from dataknobs_structures.tree import Tree 

133from dataknobs_llm.llm.base import LLMMessage 

134 

135# Current schema version - increment when making schema changes 

136SCHEMA_VERSION = "1.0.0" 

137 

138logger = logging.getLogger(__name__) 

139 

140 

141@dataclass 

142class ConversationNode: 

143 """Data stored in each conversation tree node. 

144 

145 Each node represents a single message (system, user, or assistant) in the 

146 conversation. The tree structure allows for branching conversations where 

147 multiple alternative messages can be explored. 

148 

149 Attributes: 

150 message: The LLM message (role + content) 

151 node_id: Dot-delimited child positions from root (e.g., "0.1.2") 

152 timestamp: When this message was created 

153 prompt_name: Optional name of prompt template used to generate this 

154 branch_name: Optional human-readable label for this branch 

155 metadata: Additional metadata (usage stats, model info, etc.) 

156 

157 Example: 

158 >>> node = ConversationNode( 

159 ... message=LLMMessage(role="user", content="Hello"), 

160 ... node_id="0.1", 

161 ... timestamp=datetime.now(), 

162 ... prompt_name="greeting", 

163 ... branch_name="polite-variant" 

164 ... ) 

165 """ 

166 message: LLMMessage 

167 node_id: str 

168 timestamp: datetime = field(default_factory=datetime.now) 

169 prompt_name: str | None = None 

170 branch_name: str | None = None 

171 metadata: Dict[str, Any] = field(default_factory=dict) 

172 

173 def to_dict(self) -> Dict[str, Any]: 

174 """Convert node to dictionary for storage.""" 

175 return { 

176 "message": { 

177 "role": self.message.role, 

178 "content": self.message.content, 

179 "name": self.message.name, 

180 "metadata": self.message.metadata or {} 

181 }, 

182 "node_id": self.node_id, 

183 "timestamp": self.timestamp.isoformat(), 

184 "prompt_name": self.prompt_name, 

185 "branch_name": self.branch_name, 

186 "metadata": self.metadata 

187 } 

188 

189 @classmethod 

190 def from_dict(cls, data: Dict[str, Any]) -> "ConversationNode": 

191 """Create node from dictionary.""" 

192 msg_data = data["message"] 

193 return cls( 

194 message=LLMMessage( 

195 role=msg_data["role"], 

196 content=msg_data["content"], 

197 name=msg_data.get("name"), 

198 metadata=msg_data.get("metadata", {}) 

199 ), 

200 node_id=data["node_id"], 

201 timestamp=datetime.fromisoformat(data["timestamp"]), 

202 prompt_name=data.get("prompt_name"), 

203 branch_name=data.get("branch_name"), 

204 metadata=data.get("metadata", {}) 

205 ) 

206 

207 

208def calculate_node_id(node: Tree) -> str: 

209 """Calculate dot-delimited node ID by walking up to root. 

210 

211 The node ID represents the path from root to this node as a series of 

212 child indexes. For example, "0.1.2" means: root's child 0, then that 

213 node's child 1, then that node's child 2. 

214 

215 Args: 

216 node: Tree node to calculate ID for 

217 

218 Returns: 

219 Dot-delimited node ID (e.g., "0", "0.1", "0.1.2") 

220 

221 Example: 

222 >>> root = Tree(data) 

223 >>> child = root.add_child(data2) 

224 >>> grandchild = child.add_child(data3) 

225 >>> calculate_node_id(grandchild) 

226 '0.0' 

227 """ 

228 if node.parent is None: 

229 # Root node has no parent, so it's just "0" or we could use "" 

230 # Let's use "" for root to make child IDs cleaner 

231 return "" 

232 

233 # Walk up to root, collecting child indexes 

234 indexes = [] 

235 current = node 

236 while current.parent is not None: 

237 indexes.append(str(current.sibnum)) 

238 current = current.parent 

239 

240 # Reverse to get root-to-node order 

241 indexes.reverse() 

242 

243 return ".".join(indexes) if indexes else "0" 

244 

245 

246def get_node_by_id(tree: Tree, node_id: str) -> Tree | None: 

247 """Retrieve tree node by its dot-delimited ID. 

248 

249 Args: 

250 tree: Root of the tree 

251 node_id: Dot-delimited node ID (e.g., "0.1.2") 

252 

253 Returns: 

254 Tree node with that ID, or None if not found 

255 

256 Example: 

257 >>> node = get_node_by_id(tree, "0.1.2") 

258 >>> # Equivalent to: tree.children[0].children[1].children[2] 

259 """ 

260 if not node_id or node_id == "": 

261 return tree # Root node 

262 

263 # Split into child indexes 

264 try: 

265 indexes = [int(i) for i in node_id.split(".")] 

266 except ValueError: 

267 return None # Invalid node_id format 

268 

269 # Navigate down the tree 

270 current = tree 

271 for idx in indexes: 

272 if not current.children or idx >= len(current.children): 

273 return None # Invalid path 

274 current = current.children[idx] 

275 

276 return current 

277 

278 

279def get_messages_for_llm(tree: Tree, node_id: str) -> List[LLMMessage]: 

280 """Get linear message sequence from root to specified node. 

281 

282 This is what gets sent to the LLM - the path through the tree from 

283 root to current position. 

284 

285 Args: 

286 tree: Root of conversation tree 

287 node_id: ID of current position 

288 

289 Returns: 

290 List of messages from root to current node 

291 

292 Example: 

293 >>> messages = get_messages_for_llm(tree, "0.1.2") 

294 >>> # Returns: [root_msg, child_0_msg, child_1_msg, child_2_msg] 

295 """ 

296 node = get_node_by_id(tree, node_id) 

297 if node is None: 

298 return [] 

299 

300 # Get path from root to node 

301 path = node.get_path() 

302 

303 # Extract messages from each node's data 

304 messages = [] 

305 for tree_node in path: 

306 if isinstance(tree_node.data, ConversationNode): 

307 messages.append(tree_node.data.message) 

308 

309 return messages 

310 

311 

312@dataclass 

313class ConversationState: 

314 """State of a conversation with tree-based branching support. 

315 

316 This replaces the linear message history with a tree structure that 

317 supports multiple branches (alternative conversation paths). 

318 

319 Attributes: 

320 conversation_id: Unique conversation identifier 

321 message_tree: Root of conversation tree (Tree[ConversationNode]) 

322 current_node_id: ID of current position in tree (dot-delimited) 

323 metadata: Additional conversation metadata 

324 created_at: Conversation creation timestamp 

325 updated_at: Last update timestamp 

326 schema_version: Version of the storage schema used 

327 

328 Example: 

329 >>> # Create conversation with system message 

330 >>> root_node = ConversationNode( 

331 ... message=LLMMessage(role="system", content="You are helpful"), 

332 ... node_id="" 

333 ... ) 

334 >>> tree = Tree(root_node) 

335 >>> state = ConversationState( 

336 ... conversation_id="conv-123", 

337 ... message_tree=tree, 

338 ... current_node_id="", 

339 ... metadata={"user_id": "alice"} 

340 ... ) 

341 >>> 

342 >>> # Add user message 

343 >>> user_node = ConversationNode( 

344 ... message=LLMMessage(role="user", content="Hello"), 

345 ... node_id="0" 

346 ... ) 

347 >>> tree.add_child(Tree(user_node)) 

348 >>> state.current_node_id = "0" 

349 """ 

350 conversation_id: str 

351 message_tree: Tree # Tree[ConversationNode] 

352 current_node_id: str = "" 

353 metadata: Dict[str, Any] = field(default_factory=dict) 

354 created_at: datetime = field(default_factory=datetime.now) 

355 updated_at: datetime = field(default_factory=datetime.now) 

356 schema_version: str = SCHEMA_VERSION 

357 

358 def get_current_node(self) -> Tree | None: 

359 """Get the current tree node.""" 

360 return get_node_by_id(self.message_tree, self.current_node_id) 

361 

362 def get_current_messages(self) -> List[LLMMessage]: 

363 """Get messages from root to current position (for LLM).""" 

364 return get_messages_for_llm(self.message_tree, self.current_node_id) 

365 

366 def to_dict(self) -> Dict[str, Any]: 

367 """Convert state to dictionary for storage. 

368 

369 The tree is serialized as a list of edges (parent_id, child_id, node_data). 

370 Includes schema_version for backward compatibility. 

371 """ 

372 # Collect all nodes and their data 

373 nodes = [] 

374 edges = [] 

375 

376 all_nodes = self.message_tree.find_nodes(lambda n: True, traversal="bfs") # noqa: ARG005 

377 for tree_node in all_nodes: 

378 if isinstance(tree_node.data, ConversationNode): 

379 nodes.append(tree_node.data.to_dict()) 

380 

381 # Add edge to parent (if not root) 

382 if tree_node.parent is not None: 

383 parent_id = calculate_node_id(tree_node.parent) 

384 child_id = tree_node.data.node_id 

385 edges.append([parent_id, child_id]) 

386 

387 return { 

388 "schema_version": self.schema_version, 

389 "conversation_id": self.conversation_id, 

390 "nodes": nodes, 

391 "edges": edges, 

392 "current_node_id": self.current_node_id, 

393 "metadata": self.metadata, 

394 "created_at": self.created_at.isoformat(), 

395 "updated_at": self.updated_at.isoformat() 

396 } 

397 

398 @classmethod 

399 def from_dict(cls, data: Dict[str, Any]) -> "ConversationState": 

400 """Create state from dictionary. 

401 

402 Reconstructs the tree from nodes and edges. 

403 Handles schema version migration if needed. 

404 """ 

405 # Check schema version 

406 stored_version = data.get("schema_version", "0.0.0") # Default to 0.0.0 if missing 

407 

408 # Apply migrations if needed 

409 if stored_version != SCHEMA_VERSION: 

410 logger.info( 

411 f"Migrating conversation {data['conversation_id']} " 

412 f"from schema {stored_version} to {SCHEMA_VERSION}" 

413 ) 

414 data = cls._migrate_schema(data, stored_version, SCHEMA_VERSION) 

415 

416 # Create nodes indexed by ID 

417 nodes_by_id: Dict[str, ConversationNode] = {} 

418 for node_data in data["nodes"]: 

419 node = ConversationNode.from_dict(node_data) 

420 nodes_by_id[node.node_id] = node 

421 

422 # Find root (node with empty ID) 

423 root_node = nodes_by_id.get("") 

424 if root_node is None: 

425 # Try to find node with no parent in edges 

426 child_ids = {edge[1] for edge in data["edges"]} 

427 parent_ids = {edge[0] for edge in data["edges"]} 

428 root_ids = parent_ids - child_ids 

429 if root_ids: 

430 root_node = nodes_by_id[root_ids.pop()] 

431 else: 

432 # Fallback: first node 

433 root_node = next(iter(nodes_by_id.values())) 

434 

435 tree = Tree(root_node) 

436 tree_nodes_by_id = {"": tree} # Map node_id -> Tree node 

437 

438 # Build tree by adding edges 

439 for parent_id, child_id in data["edges"]: 

440 if parent_id in tree_nodes_by_id: 

441 parent_tree_node = tree_nodes_by_id[parent_id] 

442 child_node = nodes_by_id[child_id] 

443 child_tree_node = parent_tree_node.add_child(Tree(child_node)) 

444 tree_nodes_by_id[child_id] = child_tree_node 

445 

446 return cls( 

447 conversation_id=data["conversation_id"], 

448 message_tree=tree, 

449 current_node_id=data["current_node_id"], 

450 metadata=data["metadata"], 

451 created_at=datetime.fromisoformat(data["created_at"]), 

452 updated_at=datetime.fromisoformat(data["updated_at"]), 

453 schema_version=SCHEMA_VERSION # Always use current version after migration 

454 ) 

455 

456 @staticmethod 

457 def _migrate_schema( 

458 data: Dict[str, Any], 

459 from_version: str, 

460 to_version: str 

461 ) -> Dict[str, Any]: 

462 """Migrate data from one schema version to another. 

463 

464 This method applies migrations sequentially to transform data from 

465 an older schema version to the current version. Migrations are applied 

466 in order (e.g., 0.0.0 → 1.0.0 → 1.1.0 → 1.2.0). 

467 

468 Args: 

469 data: Data in old schema format 

470 from_version: Source schema version 

471 to_version: Target schema version 

472 

473 Returns: 

474 Data in new schema format 

475 

476 Raises: 

477 SchemaVersionError: If migration path is not supported 

478 

479 Example: 

480 ```python 

481 # Example migration from 1.0.0 to 1.1.0 might add a new field 

482 old_data = { 

483 "schema_version": "1.0.0", 

484 "conversation_id": "conv-123", 

485 "nodes": [...], 

486 "edges": [...] 

487 } 

488 

489 # After migration to 1.1.0 

490 new_data = ConversationState._migrate_schema( 

491 old_data, 

492 from_version="1.0.0", 

493 to_version="1.1.0" 

494 ) 

495 # new_data might now include: {"tags": [], ...} 

496 ``` 

497 

498 Note: 

499 **Adding New Migration Paths**: 

500 

501 When introducing schema changes, add a migration method: 

502 

503 ```python 

504 @staticmethod 

505 def _migrate_1_0_to_1_1(data: Dict[str, Any]) -> Dict[str, Any]: 

506 '''Migrate from schema 1.0 to 1.1.''' 

507 # Example: Add new optional field 

508 data["tags"] = [] 

509 data["schema_version"] = "1.1.0" 

510 return data 

511 ``` 

512 

513 Then update this method to call it: 

514 

515 ```python 

516 if from_version == "1.0.0" and to_version >= "1.1.0": 

517 data = cls._migrate_1_0_to_1_1(data) 

518 ``` 

519 """ 

520 # Parse version strings 

521 from_major, _from_minor, _from_patch = map(int, from_version.split(".")) 

522 to_major, _to_minor, _to_patch = map(int, to_version.split(".")) 

523 

524 # No migration needed if versions match 

525 if from_version == to_version: 

526 return data 

527 

528 # Apply migrations based on version transitions 

529 # Future migrations will be added here as needed 

530 

531 # Example migration patterns: 

532 # if from_version == "0.0.0" and to_version >= "1.0.0": 

533 # data = cls._migrate_0_to_1(data) 

534 # if from_version < "1.1.0" and to_version >= "1.1.0": 

535 # data = cls._migrate_1_0_to_1_1(data) 

536 

537 # For now, version 0.0.0 (no version field) to 1.0.0 is a no-op 

538 # because the schema didn't change, we just added versioning 

539 if from_version == "0.0.0": 

540 logger.debug("Migrating from unversioned schema to 1.0.0 (no changes needed)") 

541 data["schema_version"] = "1.0.0" 

542 return data 

543 

544 # If we get here and versions still don't match, it's unsupported 

545 if from_major > to_major: 

546 raise SchemaVersionError( 

547 f"Cannot downgrade from schema {from_version} to {to_version}" 

548 ) 

549 

550 logger.warning( 

551 f"No migration path defined from {from_version} to {to_version}. " 

552 "Using data as-is." 

553 ) 

554 data["schema_version"] = to_version 

555 return data 

556 

557 # Future migration methods will be added here as needed: 

558 # @staticmethod 

559 # def _migrate_1_0_to_1_1(data: Dict[str, Any]) -> Dict[str, Any]: 

560 # """Migrate from schema 1.0 to 1.1.""" 

561 # # Add new field with default value 

562 # data["new_field"] = "default_value" 

563 # return data 

564 

565 

566class ConversationStorage(ABC): 

567 """Abstract storage interface for conversations. 

568 

569 This interface defines the contract for persisting conversation state. 

570 Implementations can use any backend (SQL, NoSQL, file, etc.). 

571 """ 

572 

573 @abstractmethod 

574 async def save_conversation(self, state: ConversationState) -> None: 

575 """Save conversation state (upsert). 

576 

577 Args: 

578 state: Conversation state to save 

579 """ 

580 pass 

581 

582 @abstractmethod 

583 async def load_conversation( 

584 self, 

585 conversation_id: str 

586 ) -> ConversationState | None: 

587 """Load conversation state. 

588 

589 Args: 

590 conversation_id: Conversation identifier 

591 

592 Returns: 

593 Conversation state or None if not found 

594 """ 

595 pass 

596 

597 @abstractmethod 

598 async def delete_conversation(self, conversation_id: str) -> bool: 

599 """Delete conversation. 

600 

601 Args: 

602 conversation_id: Conversation identifier 

603 

604 Returns: 

605 True if deleted, False if not found 

606 """ 

607 pass 

608 

609 @abstractmethod 

610 async def list_conversations( 

611 self, 

612 filter_metadata: Dict[str, Any] | None = None, 

613 limit: int = 100, 

614 offset: int = 0 

615 ) -> List[ConversationState]: 

616 """List conversations with optional filtering. 

617 

618 Args: 

619 filter_metadata: Optional metadata filters 

620 limit: Maximum number of results 

621 offset: Offset for pagination 

622 

623 Returns: 

624 List of conversation states 

625 """ 

626 pass 

627 

628 

629class DataknobsConversationStorage(ConversationStorage): 

630 """Conversation storage using dataknobs_data backends. 

631 

632 Stores conversations as Records with the tree serialized as nodes + edges. 

633 Works with any dataknobs backend (Memory, File, S3, Postgres, etc.). 

634 

635 The storage layer handles: 

636 - Automatic serialization/deserialization of conversation trees 

637 - Schema version migration when loading old conversations 

638 - Metadata-based filtering for listing conversations 

639 - Upsert operations (insert or update) 

640 

641 Attributes: 

642 backend: Dataknobs async database backend instance 

643 

644 Example: 

645 ```python 

646 from dataknobs_data import database_factory 

647 from dataknobs_llm.conversations import DataknobsConversationStorage 

648 

649 # Memory backend (development/testing) 

650 db = database_factory.create(backend="memory") 

651 storage = DataknobsConversationStorage(db) 

652 

653 # File backend (local persistence) 

654 db = database_factory.create( 

655 backend="file", 

656 file_path="./conversations.jsonl" 

657 ) 

658 storage = DataknobsConversationStorage(db) 

659 

660 # S3 backend (cloud storage) 

661 db = database_factory.create( 

662 backend="s3", 

663 bucket="my-conversations", 

664 region="us-west-2" 

665 ) 

666 storage = DataknobsConversationStorage(db) 

667 

668 # Postgres backend (production) 

669 db = database_factory.create( 

670 backend="postgres", 

671 host="db.example.com", 

672 database="conversations", 

673 user="app", 

674 password="secret" 

675 ) 

676 storage = DataknobsConversationStorage(db) 

677 

678 # Save conversation 

679 await storage.save_conversation(state) 

680 

681 # Load conversation 

682 state = await storage.load_conversation("conv-123") 

683 

684 # List user's conversations 

685 user_convos = await storage.list_conversations( 

686 filter_metadata={"user_id": "alice"}, 

687 limit=50 

688 ) 

689 

690 # Delete conversation 

691 deleted = await storage.delete_conversation("conv-123") 

692 ``` 

693 

694 Note: 

695 **Backend Selection**: 

696 

697 - **Memory**: Fast, no persistence. Use for testing or ephemeral conversations. 

698 - **File**: Simple local persistence. Good for single-server deployments. 

699 - **S3**: Scalable cloud storage. Best for serverless or distributed systems. 

700 - **Postgres**: Full ACID guarantees. Best for production multi-server setups. 

701 

702 All backends support the same API, so you can switch between them 

703 by changing the database_factory configuration. 

704 

705 See Also: 

706 ConversationStorage: Abstract interface 

707 ConversationState: State structure being stored 

708 dataknobs_data.database_factory: Backend creation utilities 

709 """ 

710 

711 def __init__(self, backend: Any): 

712 """Initialize storage with dataknobs backend. 

713 

714 Args: 

715 backend: Dataknobs async database backend (AsyncMemoryDatabase, etc.) 

716 """ 

717 self.backend = backend 

718 

719 def _state_to_record(self, state: ConversationState) -> Any: 

720 """Convert ConversationState to Record. 

721 

722 Args: 

723 state: Conversation state to convert 

724 

725 Returns: 

726 Record object for storage 

727 """ 

728 # Import here to avoid circular dependency 

729 try: 

730 from dataknobs_data.records import Record 

731 except ImportError: 

732 raise StorageError( 

733 "dataknobs_data package not available. " 

734 "Install it to use DataknobsConversationStorage." 

735 ) from None 

736 

737 # Convert state to dict 

738 data = state.to_dict() 

739 

740 # Create Record with conversation_id as storage_id 

741 return Record( 

742 data=data, 

743 storage_id=state.conversation_id 

744 ) 

745 

746 def _record_to_state(self, record: Any) -> ConversationState: 

747 """Convert Record to ConversationState. 

748 

749 Args: 

750 record: Record object from storage 

751 

752 Returns: 

753 Conversation state 

754 """ 

755 # Extract data from record 

756 data = {} 

757 for field_name, field_obj in record.fields.items(): 

758 data[field_name] = field_obj.value 

759 

760 # Reconstruct conversation state 

761 return ConversationState.from_dict(data) 

762 

763 async def save_conversation(self, state: ConversationState) -> None: 

764 """Save conversation to backend.""" 

765 try: 

766 record = self._state_to_record(state) 

767 # Use upsert to insert or update 

768 await self.backend.upsert(state.conversation_id, record) 

769 except Exception as e: 

770 raise StorageError(f"Failed to save conversation: {e}") from e 

771 

772 async def load_conversation( 

773 self, 

774 conversation_id: str 

775 ) -> ConversationState | None: 

776 """Load conversation from backend.""" 

777 try: 

778 # Read record by ID 

779 record = await self.backend.read(conversation_id) 

780 if record is None: 

781 return None 

782 

783 return self._record_to_state(record) 

784 

785 except Exception as e: 

786 raise StorageError(f"Failed to load conversation: {e}") from e 

787 

788 async def delete_conversation(self, conversation_id: str) -> bool: 

789 """Delete conversation from backend.""" 

790 try: 

791 return await self.backend.delete(conversation_id) 

792 except Exception as e: 

793 raise StorageError(f"Failed to delete conversation: {e}") from e 

794 

795 async def list_conversations( 

796 self, 

797 filter_metadata: Dict[str, Any] | None = None, 

798 limit: int = 100, 

799 offset: int = 0 

800 ) -> List[ConversationState]: 

801 """List conversations from backend.""" 

802 try: 

803 # Import Query for filtering 

804 try: 

805 from dataknobs_data.query import Query 

806 except ImportError: 

807 raise StorageError( 

808 "dataknobs_data package not available. " 

809 "Install it to use DataknobsConversationStorage." 

810 ) from None 

811 

812 # Build query with metadata filters using fluent interface 

813 query = Query() 

814 query.limit(limit).offset(offset) 

815 

816 if filter_metadata: 

817 for key, value in filter_metadata.items(): 

818 # Add filter for metadata.key = value 

819 query.filter(f"metadata.{key}", "=", value) 

820 

821 # Search with query 

822 results = await self.backend.search(query) 

823 

824 # Convert records to conversation states 

825 return [self._record_to_state(record) for record in results] 

826 

827 except Exception as e: 

828 raise StorageError(f"Failed to list conversations: {e}") from e 

829 

830 

831class StorageError(Exception): 

832 """Exception raised for storage operation errors.""" 

833 pass 

834 

835 

836class SchemaVersionError(Exception): 

837 """Exception raised for schema version incompatibilities.""" 

838 pass