Coverage for src / dataknobs_llm / conversations / storage.py: 53%

185 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-12-15 10:29 -0700

1"""Conversation storage with tree-based branching support. 

2 

3This module provides: 

4- ConversationNode: Data stored in each tree node 

5- ConversationState: Tree-based conversation state 

6- ConversationStorage: Abstract storage interface 

7- DataknobsConversationStorage: Storage adapter for dataknobs backends 

8- Helper functions for node ID management and tree navigation 

9 

10Schema Versioning: 

11 The storage format uses semantic versioning (MAJOR.MINOR.PATCH): 

12 - MAJOR: Incompatible changes requiring migration 

13 - MINOR: Backward-compatible additions 

14 - PATCH: Bug fixes, no schema changes 

15 

16 Current schema version: 1.0.0 

17 

18Storage Architecture: 

19 Conversations are stored as trees where each node represents a message. 

20 The tree structure is serialized as: 

21 - **Nodes**: List of ConversationNode objects (messages with metadata) 

22 - **Edges**: List of [parent_id, child_id] relationships 

23 - **Current Position**: Node ID showing where you are in the conversation 

24 

25 This format supports: 

26 - Full conversation history with branching 

27 - Efficient deserialization and tree reconstruction 

28 - Schema evolution with automatic migration 

29 - Backend-agnostic storage (works with any dataknobs backend) 

30 

31Example: 

32 ```python 

33 from dataknobs_data import database_factory 

34 from dataknobs_llm.conversations import ( 

35 ConversationState, 

36 ConversationNode, 

37 DataknobsConversationStorage 

38 ) 

39 from dataknobs_llm.llm.base import LLMMessage 

40 from dataknobs_structures.tree import Tree 

41 

42 # Create storage backend 

43 db = database_factory.create(backend="memory") 

44 storage = DataknobsConversationStorage(db) 

45 

46 # Create conversation state 

47 root_node = ConversationNode( 

48 message=LLMMessage(role="system", content="You are helpful"), 

49 node_id="" 

50 ) 

51 tree = Tree(root_node) 

52 state = ConversationState( 

53 conversation_id="conv-123", 

54 message_tree=tree, 

55 current_node_id="", 

56 metadata={"user_id": "alice"} 

57 ) 

58 

59 # Save conversation 

60 await storage.save_conversation(state) 

61 

62 # Load conversation 

63 loaded = await storage.load_conversation("conv-123") 

64 messages = loaded.get_current_messages() 

65 

66 # List all conversations for user 

67 user_convos = await storage.list_conversations( 

68 filter_metadata={"user_id": "alice"} 

69 ) 

70 ``` 

71 

72Serialization Format: 

73 The serialized format (from `ConversationState.to_dict()`) looks like: 

74 

75 ```python 

76 { 

77 "schema_version": "1.0.0", 

78 "conversation_id": "conv-123", 

79 "current_node_id": "0.1", 

80 "metadata": {"user_id": "alice"}, 

81 "created_at": "2024-01-01T00:00:00", 

82 "updated_at": "2024-01-01T00:05:00", 

83 "nodes": [ 

84 { 

85 "node_id": "", 

86 "message": { 

87 "role": "system", 

88 "content": "You are helpful", 

89 "name": None, 

90 "metadata": {} 

91 }, 

92 "timestamp": "2024-01-01T00:00:00", 

93 "prompt_name": None, 

94 "branch_name": None, 

95 "metadata": {} 

96 }, 

97 { 

98 "node_id": "0", 

99 "message": {"role": "user", "content": "Hello", ...}, 

100 ... 

101 }, 

102 { 

103 "node_id": "0.0", 

104 "message": {"role": "assistant", "content": "Hi!", ...}, 

105 "metadata": {"usage": {...}, "cost_usd": 0.0001} 

106 }, 

107 { 

108 "node_id": "0.1", # Alternative response branch 

109 "message": {"role": "assistant", "content": "Greetings!", ...}, 

110 "branch_name": "polite-variant" 

111 } 

112 ], 

113 "edges": [ 

114 ["", "0"], # Root -> user message 

115 ["0", "0.0"], # User -> assistant (first branch) 

116 ["0", "0.1"] # User -> assistant (alternative branch) 

117 ] 

118 } 

119 ``` 

120 

121See Also: 

122 ConversationManager: High-level conversation orchestration 

123 AsyncPromptBuilder: Prompt rendering with RAG integration 

124""" 

125 

126from abc import ABC, abstractmethod 

127from dataclasses import dataclass, field 

128from datetime import datetime 

129from typing import Any, Dict, List 

130import logging 

131 

132from dataknobs_structures.tree import Tree 

133from dataknobs_llm.llm.base import LLMMessage 

134from dataknobs_llm.exceptions import StorageError, SchemaVersionError 

135 

136# Current schema version - increment when making schema changes 

137SCHEMA_VERSION = "1.0.0" 

138 

139logger = logging.getLogger(__name__) 

140 

141 

142@dataclass 

143class ConversationNode: 

144 """Data stored in each conversation tree node. 

145 

146 Each node represents a single message (system, user, or assistant) in the 

147 conversation. The tree structure allows for branching conversations where 

148 multiple alternative messages can be explored. 

149 

150 Attributes: 

151 message: The LLM message (role + content) 

152 node_id: Dot-delimited child positions from root (e.g., "0.1.2") 

153 timestamp: When this message was created 

154 prompt_name: Optional name of prompt template used to generate this 

155 branch_name: Optional human-readable label for this branch 

156 metadata: Additional metadata (usage stats, model info, etc.) 

157 

158 Example: 

159 >>> node = ConversationNode( 

160 ... message=LLMMessage(role="user", content="Hello"), 

161 ... node_id="0.1", 

162 ... timestamp=datetime.now(), 

163 ... prompt_name="greeting", 

164 ... branch_name="polite-variant" 

165 ... ) 

166 """ 

167 message: LLMMessage 

168 node_id: str 

169 timestamp: datetime = field(default_factory=datetime.now) 

170 prompt_name: str | None = None 

171 branch_name: str | None = None 

172 metadata: Dict[str, Any] = field(default_factory=dict) 

173 

174 def to_dict(self) -> Dict[str, Any]: 

175 """Convert node to dictionary for storage.""" 

176 return { 

177 "message": { 

178 "role": self.message.role, 

179 "content": self.message.content, 

180 "name": self.message.name, 

181 "metadata": self.message.metadata or {} 

182 }, 

183 "node_id": self.node_id, 

184 "timestamp": self.timestamp.isoformat(), 

185 "prompt_name": self.prompt_name, 

186 "branch_name": self.branch_name, 

187 "metadata": self.metadata 

188 } 

189 

190 @classmethod 

191 def from_dict(cls, data: Dict[str, Any]) -> "ConversationNode": 

192 """Create node from dictionary.""" 

193 msg_data = data["message"] 

194 return cls( 

195 message=LLMMessage( 

196 role=msg_data["role"], 

197 content=msg_data["content"], 

198 name=msg_data.get("name"), 

199 metadata=msg_data.get("metadata", {}) 

200 ), 

201 node_id=data["node_id"], 

202 timestamp=datetime.fromisoformat(data["timestamp"]), 

203 prompt_name=data.get("prompt_name"), 

204 branch_name=data.get("branch_name"), 

205 metadata=data.get("metadata", {}) 

206 ) 

207 

208 

209def calculate_node_id(node: Tree) -> str: 

210 """Calculate dot-delimited node ID by walking up to root. 

211 

212 The node ID represents the path from root to this node as a series of 

213 child indexes. For example, "0.1.2" means: root's child 0, then that 

214 node's child 1, then that node's child 2. 

215 

216 Args: 

217 node: Tree node to calculate ID for 

218 

219 Returns: 

220 Dot-delimited node ID (e.g., "0", "0.1", "0.1.2") 

221 

222 Example: 

223 >>> root = Tree(data) 

224 >>> child = root.add_child(data2) 

225 >>> grandchild = child.add_child(data3) 

226 >>> calculate_node_id(grandchild) 

227 '0.0' 

228 """ 

229 if node.parent is None: 

230 # Root node has no parent, so it's just "0" or we could use "" 

231 # Let's use "" for root to make child IDs cleaner 

232 return "" 

233 

234 # Walk up to root, collecting child indexes 

235 indexes = [] 

236 current = node 

237 while current.parent is not None: 

238 indexes.append(str(current.sibnum)) 

239 current = current.parent 

240 

241 # Reverse to get root-to-node order 

242 indexes.reverse() 

243 

244 return ".".join(indexes) if indexes else "0" 

245 

246 

247def get_node_by_id(tree: Tree, node_id: str) -> Tree | None: 

248 """Retrieve tree node by its dot-delimited ID. 

249 

250 Args: 

251 tree: Root of the tree 

252 node_id: Dot-delimited node ID (e.g., "0.1.2") 

253 

254 Returns: 

255 Tree node with that ID, or None if not found 

256 

257 Example: 

258 >>> node = get_node_by_id(tree, "0.1.2") 

259 >>> # Equivalent to: tree.children[0].children[1].children[2] 

260 """ 

261 if not node_id or node_id == "": 

262 return tree # Root node 

263 

264 # Split into child indexes 

265 try: 

266 indexes = [int(i) for i in node_id.split(".")] 

267 except ValueError: 

268 return None # Invalid node_id format 

269 

270 # Navigate down the tree 

271 current = tree 

272 for idx in indexes: 

273 if not current.children or idx >= len(current.children): 

274 return None # Invalid path 

275 current = current.children[idx] 

276 

277 return current 

278 

279 

280def get_messages_for_llm(tree: Tree, node_id: str) -> List[LLMMessage]: 

281 """Get linear message sequence from root to specified node. 

282 

283 This is what gets sent to the LLM - the path through the tree from 

284 root to current position. 

285 

286 Args: 

287 tree: Root of conversation tree 

288 node_id: ID of current position 

289 

290 Returns: 

291 List of messages from root to current node 

292 

293 Example: 

294 >>> messages = get_messages_for_llm(tree, "0.1.2") 

295 >>> # Returns: [root_msg, child_0_msg, child_1_msg, child_2_msg] 

296 """ 

297 node = get_node_by_id(tree, node_id) 

298 if node is None: 

299 return [] 

300 

301 # Get path from root to node 

302 path = node.get_path() 

303 

304 # Extract messages from each node's data 

305 messages = [] 

306 for tree_node in path: 

307 if isinstance(tree_node.data, ConversationNode): 

308 messages.append(tree_node.data.message) 

309 

310 return messages 

311 

312 

313@dataclass 

314class ConversationState: 

315 """State of a conversation with tree-based branching support. 

316 

317 This replaces the linear message history with a tree structure that 

318 supports multiple branches (alternative conversation paths). 

319 

320 Attributes: 

321 conversation_id: Unique conversation identifier 

322 message_tree: Root of conversation tree (Tree[ConversationNode]) 

323 current_node_id: ID of current position in tree (dot-delimited) 

324 metadata: Additional conversation metadata 

325 created_at: Conversation creation timestamp 

326 updated_at: Last update timestamp 

327 schema_version: Version of the storage schema used 

328 

329 Example: 

330 >>> # Create conversation with system message 

331 >>> root_node = ConversationNode( 

332 ... message=LLMMessage(role="system", content="You are helpful"), 

333 ... node_id="" 

334 ... ) 

335 >>> tree = Tree(root_node) 

336 >>> state = ConversationState( 

337 ... conversation_id="conv-123", 

338 ... message_tree=tree, 

339 ... current_node_id="", 

340 ... metadata={"user_id": "alice"} 

341 ... ) 

342 >>> 

343 >>> # Add user message 

344 >>> user_node = ConversationNode( 

345 ... message=LLMMessage(role="user", content="Hello"), 

346 ... node_id="0" 

347 ... ) 

348 >>> tree.add_child(Tree(user_node)) 

349 >>> state.current_node_id = "0" 

350 """ 

351 conversation_id: str 

352 message_tree: Tree # Tree[ConversationNode] 

353 current_node_id: str = "" 

354 metadata: Dict[str, Any] = field(default_factory=dict) 

355 created_at: datetime = field(default_factory=datetime.now) 

356 updated_at: datetime = field(default_factory=datetime.now) 

357 schema_version: str = SCHEMA_VERSION 

358 

359 def get_current_node(self) -> Tree | None: 

360 """Get the current tree node.""" 

361 return get_node_by_id(self.message_tree, self.current_node_id) 

362 

363 def get_current_messages(self) -> List[LLMMessage]: 

364 """Get messages from root to current position (for LLM).""" 

365 return get_messages_for_llm(self.message_tree, self.current_node_id) 

366 

367 def to_dict(self) -> Dict[str, Any]: 

368 """Convert state to dictionary for storage. 

369 

370 The tree is serialized as a list of edges (parent_id, child_id, node_data). 

371 Includes schema_version for backward compatibility. 

372 """ 

373 # Collect all nodes and their data 

374 nodes = [] 

375 edges = [] 

376 

377 all_nodes = self.message_tree.find_nodes(lambda n: True, traversal="bfs") # noqa: ARG005 

378 for tree_node in all_nodes: 

379 if isinstance(tree_node.data, ConversationNode): 

380 nodes.append(tree_node.data.to_dict()) 

381 

382 # Add edge to parent (if not root) 

383 if tree_node.parent is not None: 

384 parent_id = calculate_node_id(tree_node.parent) 

385 child_id = tree_node.data.node_id 

386 edges.append([parent_id, child_id]) 

387 

388 return { 

389 "schema_version": self.schema_version, 

390 "conversation_id": self.conversation_id, 

391 "nodes": nodes, 

392 "edges": edges, 

393 "current_node_id": self.current_node_id, 

394 "metadata": self.metadata, 

395 "created_at": self.created_at.isoformat(), 

396 "updated_at": self.updated_at.isoformat() 

397 } 

398 

399 @classmethod 

400 def from_dict(cls, data: Dict[str, Any]) -> "ConversationState": 

401 """Create state from dictionary. 

402 

403 Reconstructs the tree from nodes and edges. 

404 Handles schema version migration if needed. 

405 """ 

406 # Check schema version 

407 stored_version = data.get("schema_version", "0.0.0") # Default to 0.0.0 if missing 

408 

409 # Apply migrations if needed 

410 if stored_version != SCHEMA_VERSION: 

411 logger.info( 

412 f"Migrating conversation {data['conversation_id']} " 

413 f"from schema {stored_version} to {SCHEMA_VERSION}" 

414 ) 

415 data = cls._migrate_schema(data, stored_version, SCHEMA_VERSION) 

416 

417 # Create nodes indexed by ID 

418 nodes_by_id: Dict[str, ConversationNode] = {} 

419 for node_data in data["nodes"]: 

420 node = ConversationNode.from_dict(node_data) 

421 nodes_by_id[node.node_id] = node 

422 

423 # Find root (node with empty ID) 

424 root_node = nodes_by_id.get("") 

425 if root_node is None: 

426 # Try to find node with no parent in edges 

427 child_ids = {edge[1] for edge in data["edges"]} 

428 parent_ids = {edge[0] for edge in data["edges"]} 

429 root_ids = parent_ids - child_ids 

430 if root_ids: 

431 root_node = nodes_by_id[root_ids.pop()] 

432 else: 

433 # Fallback: first node 

434 root_node = next(iter(nodes_by_id.values())) 

435 

436 tree = Tree(root_node) 

437 tree_nodes_by_id = {"": tree} # Map node_id -> Tree node 

438 

439 # Build tree by adding edges 

440 for parent_id, child_id in data["edges"]: 

441 if parent_id in tree_nodes_by_id: 

442 parent_tree_node = tree_nodes_by_id[parent_id] 

443 child_node = nodes_by_id[child_id] 

444 child_tree_node = parent_tree_node.add_child(Tree(child_node)) 

445 tree_nodes_by_id[child_id] = child_tree_node 

446 

447 return cls( 

448 conversation_id=data["conversation_id"], 

449 message_tree=tree, 

450 current_node_id=data["current_node_id"], 

451 metadata=data["metadata"], 

452 created_at=datetime.fromisoformat(data["created_at"]), 

453 updated_at=datetime.fromisoformat(data["updated_at"]), 

454 schema_version=SCHEMA_VERSION # Always use current version after migration 

455 ) 

456 

457 @staticmethod 

458 def _migrate_schema( 

459 data: Dict[str, Any], 

460 from_version: str, 

461 to_version: str 

462 ) -> Dict[str, Any]: 

463 """Migrate data from one schema version to another. 

464 

465 This method applies migrations sequentially to transform data from 

466 an older schema version to the current version. Migrations are applied 

467 in order (e.g., 0.0.0 → 1.0.0 → 1.1.0 → 1.2.0). 

468 

469 Args: 

470 data: Data in old schema format 

471 from_version: Source schema version 

472 to_version: Target schema version 

473 

474 Returns: 

475 Data in new schema format 

476 

477 Raises: 

478 SchemaVersionError: If migration path is not supported 

479 

480 Example: 

481 ```python 

482 # Example migration from 1.0.0 to 1.1.0 might add a new field 

483 old_data = { 

484 "schema_version": "1.0.0", 

485 "conversation_id": "conv-123", 

486 "nodes": [...], 

487 "edges": [...] 

488 } 

489 

490 # After migration to 1.1.0 

491 new_data = ConversationState._migrate_schema( 

492 old_data, 

493 from_version="1.0.0", 

494 to_version="1.1.0" 

495 ) 

496 # new_data might now include: {"tags": [], ...} 

497 ``` 

498 

499 Note: 

500 **Adding New Migration Paths**: 

501 

502 When introducing schema changes, add a migration method: 

503 

504 ```python 

505 @staticmethod 

506 def _migrate_1_0_to_1_1(data: Dict[str, Any]) -> Dict[str, Any]: 

507 '''Migrate from schema 1.0 to 1.1.''' 

508 # Example: Add new optional field 

509 data["tags"] = [] 

510 data["schema_version"] = "1.1.0" 

511 return data 

512 ``` 

513 

514 Then update this method to call it: 

515 

516 ```python 

517 if from_version == "1.0.0" and to_version >= "1.1.0": 

518 data = cls._migrate_1_0_to_1_1(data) 

519 ``` 

520 """ 

521 # Parse version strings 

522 from_major, _from_minor, _from_patch = map(int, from_version.split(".")) 

523 to_major, _to_minor, _to_patch = map(int, to_version.split(".")) 

524 

525 # No migration needed if versions match 

526 if from_version == to_version: 

527 return data 

528 

529 # Apply migrations based on version transitions 

530 # Future migrations will be added here as needed 

531 

532 # Example migration patterns: 

533 # if from_version == "0.0.0" and to_version >= "1.0.0": 

534 # data = cls._migrate_0_to_1(data) 

535 # if from_version < "1.1.0" and to_version >= "1.1.0": 

536 # data = cls._migrate_1_0_to_1_1(data) 

537 

538 # For now, version 0.0.0 (no version field) to 1.0.0 is a no-op 

539 # because the schema didn't change, we just added versioning 

540 if from_version == "0.0.0": 

541 logger.debug("Migrating from unversioned schema to 1.0.0 (no changes needed)") 

542 data["schema_version"] = "1.0.0" 

543 return data 

544 

545 # If we get here and versions still don't match, it's unsupported 

546 if from_major > to_major: 

547 raise SchemaVersionError( 

548 f"Cannot downgrade from schema {from_version} to {to_version}" 

549 ) 

550 

551 logger.warning( 

552 f"No migration path defined from {from_version} to {to_version}. " 

553 "Using data as-is." 

554 ) 

555 data["schema_version"] = to_version 

556 return data 

557 

558 # Future migration methods will be added here as needed: 

559 # @staticmethod 

560 # def _migrate_1_0_to_1_1(data: Dict[str, Any]) -> Dict[str, Any]: 

561 # """Migrate from schema 1.0 to 1.1.""" 

562 # # Add new field with default value 

563 # data["new_field"] = "default_value" 

564 # return data 

565 

566 

567class ConversationStorage(ABC): 

568 """Abstract storage interface for conversations. 

569 

570 This interface defines the contract for persisting conversation state. 

571 Implementations can use any backend (SQL, NoSQL, file, etc.). 

572 """ 

573 

574 @abstractmethod 

575 async def save_conversation(self, state: ConversationState) -> None: 

576 """Save conversation state (upsert). 

577 

578 Args: 

579 state: Conversation state to save 

580 """ 

581 pass 

582 

583 @abstractmethod 

584 async def load_conversation( 

585 self, 

586 conversation_id: str 

587 ) -> ConversationState | None: 

588 """Load conversation state. 

589 

590 Args: 

591 conversation_id: Conversation identifier 

592 

593 Returns: 

594 Conversation state or None if not found 

595 """ 

596 pass 

597 

598 @abstractmethod 

599 async def delete_conversation(self, conversation_id: str) -> bool: 

600 """Delete conversation. 

601 

602 Args: 

603 conversation_id: Conversation identifier 

604 

605 Returns: 

606 True if deleted, False if not found 

607 """ 

608 pass 

609 

610 @abstractmethod 

611 async def list_conversations( 

612 self, 

613 filter_metadata: Dict[str, Any] | None = None, 

614 limit: int = 100, 

615 offset: int = 0 

616 ) -> List[ConversationState]: 

617 """List conversations with optional filtering. 

618 

619 Args: 

620 filter_metadata: Optional metadata filters 

621 limit: Maximum number of results 

622 offset: Offset for pagination 

623 

624 Returns: 

625 List of conversation states 

626 """ 

627 pass 

628 

629 

630class DataknobsConversationStorage(ConversationStorage): 

631 """Conversation storage using dataknobs_data backends. 

632 

633 Stores conversations as Records with the tree serialized as nodes + edges. 

634 Works with any dataknobs backend (Memory, File, S3, Postgres, etc.). 

635 

636 The storage layer handles: 

637 - Automatic serialization/deserialization of conversation trees 

638 - Schema version migration when loading old conversations 

639 - Metadata-based filtering for listing conversations 

640 - Upsert operations (insert or update) 

641 

642 Attributes: 

643 backend: Dataknobs async database backend instance 

644 

645 Example: 

646 ```python 

647 from dataknobs_data import database_factory 

648 from dataknobs_llm.conversations import DataknobsConversationStorage 

649 

650 # Memory backend (development/testing) 

651 db = database_factory.create(backend="memory") 

652 storage = DataknobsConversationStorage(db) 

653 

654 # File backend (local persistence) 

655 db = database_factory.create( 

656 backend="file", 

657 file_path="./conversations.jsonl" 

658 ) 

659 storage = DataknobsConversationStorage(db) 

660 

661 # S3 backend (cloud storage) 

662 db = database_factory.create( 

663 backend="s3", 

664 bucket="my-conversations", 

665 region="us-west-2" 

666 ) 

667 storage = DataknobsConversationStorage(db) 

668 

669 # Postgres backend (production) 

670 db = database_factory.create( 

671 backend="postgres", 

672 host="db.example.com", 

673 database="conversations", 

674 user="app", 

675 password="secret" 

676 ) 

677 storage = DataknobsConversationStorage(db) 

678 

679 # Save conversation 

680 await storage.save_conversation(state) 

681 

682 # Load conversation 

683 state = await storage.load_conversation("conv-123") 

684 

685 # List user's conversations 

686 user_convos = await storage.list_conversations( 

687 filter_metadata={"user_id": "alice"}, 

688 limit=50 

689 ) 

690 

691 # Delete conversation 

692 deleted = await storage.delete_conversation("conv-123") 

693 ``` 

694 

695 Note: 

696 **Backend Selection**: 

697 

698 - **Memory**: Fast, no persistence. Use for testing or ephemeral conversations. 

699 - **File**: Simple local persistence. Good for single-server deployments. 

700 - **S3**: Scalable cloud storage. Best for serverless or distributed systems. 

701 - **Postgres**: Full ACID guarantees. Best for production multi-server setups. 

702 

703 All backends support the same API, so you can switch between them 

704 by changing the database_factory configuration. 

705 

706 See Also: 

707 ConversationStorage: Abstract interface 

708 ConversationState: State structure being stored 

709 dataknobs_data.database_factory: Backend creation utilities 

710 """ 

711 

712 def __init__(self, backend: Any): 

713 """Initialize storage with dataknobs backend. 

714 

715 Args: 

716 backend: Dataknobs async database backend (AsyncMemoryDatabase, etc.) 

717 """ 

718 self.backend = backend 

719 

720 def _state_to_record(self, state: ConversationState) -> Any: 

721 """Convert ConversationState to Record. 

722 

723 Args: 

724 state: Conversation state to convert 

725 

726 Returns: 

727 Record object for storage 

728 """ 

729 # Import here to avoid circular dependency 

730 try: 

731 from dataknobs_data.records import Record 

732 except ImportError: 

733 raise StorageError( 

734 "dataknobs_data package not available. " 

735 "Install it to use DataknobsConversationStorage." 

736 ) from None 

737 

738 # Convert state to dict 

739 data = state.to_dict() 

740 

741 # Create Record with conversation_id as storage_id 

742 return Record( 

743 data=data, 

744 storage_id=state.conversation_id 

745 ) 

746 

747 def _record_to_state(self, record: Any) -> ConversationState: 

748 """Convert Record to ConversationState. 

749 

750 Args: 

751 record: Record object from storage 

752 

753 Returns: 

754 Conversation state 

755 """ 

756 # Extract data from record 

757 data = {} 

758 for field_name, field_obj in record.fields.items(): 

759 data[field_name] = field_obj.value 

760 

761 # Reconstruct conversation state 

762 return ConversationState.from_dict(data) 

763 

764 async def save_conversation(self, state: ConversationState) -> None: 

765 """Save conversation to backend.""" 

766 try: 

767 record = self._state_to_record(state) 

768 # Use upsert to insert or update 

769 await self.backend.upsert(state.conversation_id, record) 

770 except Exception as e: 

771 raise StorageError(f"Failed to save conversation: {e}") from e 

772 

773 async def load_conversation( 

774 self, 

775 conversation_id: str 

776 ) -> ConversationState | None: 

777 """Load conversation from backend.""" 

778 try: 

779 # Read record by ID 

780 record = await self.backend.read(conversation_id) 

781 if record is None: 

782 return None 

783 

784 return self._record_to_state(record) 

785 

786 except Exception as e: 

787 raise StorageError(f"Failed to load conversation: {e}") from e 

788 

789 async def delete_conversation(self, conversation_id: str) -> bool: 

790 """Delete conversation from backend.""" 

791 try: 

792 return await self.backend.delete(conversation_id) 

793 except Exception as e: 

794 raise StorageError(f"Failed to delete conversation: {e}") from e 

795 

796 async def update_metadata( 

797 self, 

798 conversation_id: str, 

799 metadata: Dict[str, Any] 

800 ) -> None: 

801 """Update conversation metadata. 

802 

803 Loads the conversation, updates its metadata, and saves it back. 

804 

805 Args: 

806 conversation_id: ID of conversation to update 

807 metadata: New metadata dict (replaces existing metadata) 

808 

809 Raises: 

810 StorageError: If conversation not found or update fails 

811 """ 

812 try: 

813 # Load existing conversation 

814 state = await self.load_conversation(conversation_id) 

815 if state is None: 

816 raise StorageError(f"Conversation not found: {conversation_id}") 

817 

818 # Update metadata 

819 state.metadata = metadata 

820 

821 # Save back 

822 await self.save_conversation(state) 

823 

824 except StorageError: 

825 raise 

826 except Exception as e: 

827 raise StorageError(f"Failed to update metadata: {e}") from e 

828 

829 async def list_conversations( 

830 self, 

831 filter_metadata: Dict[str, Any] | None = None, 

832 limit: int = 100, 

833 offset: int = 0 

834 ) -> List[ConversationState]: 

835 """List conversations from backend.""" 

836 try: 

837 # Import Query for filtering 

838 try: 

839 from dataknobs_data.query import Query 

840 except ImportError: 

841 raise StorageError( 

842 "dataknobs_data package not available. " 

843 "Install it to use DataknobsConversationStorage." 

844 ) from None 

845 

846 # Build query with metadata filters using fluent interface 

847 query = Query() 

848 query.limit(limit).offset(offset) 

849 

850 if filter_metadata: 

851 for key, value in filter_metadata.items(): 

852 # Add filter for metadata.key = value 

853 query.filter(f"metadata.{key}", "=", value) 

854 

855 # Search with query 

856 results = await self.backend.search(query) 

857 

858 # Convert records to conversation states 

859 return [self._record_to_state(record) for record in results] 

860 

861 except Exception as e: 

862 raise StorageError(f"Failed to list conversations: {e}") from e