Coverage for src/dataknobs_llm/conversations/storage.py: 89%
177 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-08 13:51 -0700
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-08 13:51 -0700
1"""Conversation storage with tree-based branching support.
3This module provides:
4- ConversationNode: Data stored in each tree node
5- ConversationState: Tree-based conversation state
6- ConversationStorage: Abstract storage interface
7- DataknobsConversationStorage: Storage adapter for dataknobs backends
8- Helper functions for node ID management and tree navigation
10Schema Versioning:
11 The storage format uses semantic versioning (MAJOR.MINOR.PATCH):
12 - MAJOR: Incompatible changes requiring migration
13 - MINOR: Backward-compatible additions
14 - PATCH: Bug fixes, no schema changes
16 Current schema version: 1.0.0
18Storage Architecture:
19 Conversations are stored as trees where each node represents a message.
20 The tree structure is serialized as:
21 - **Nodes**: List of ConversationNode objects (messages with metadata)
22 - **Edges**: List of [parent_id, child_id] relationships
23 - **Current Position**: Node ID showing where you are in the conversation
25 This format supports:
26 - Full conversation history with branching
27 - Efficient deserialization and tree reconstruction
28 - Schema evolution with automatic migration
29 - Backend-agnostic storage (works with any dataknobs backend)
31Example:
32 ```python
33 from dataknobs_data import database_factory
34 from dataknobs_llm.conversations import (
35 ConversationState,
36 ConversationNode,
37 DataknobsConversationStorage
38 )
39 from dataknobs_llm.llm.base import LLMMessage
40 from dataknobs_structures.tree import Tree
42 # Create storage backend
43 db = database_factory.create(backend="memory")
44 storage = DataknobsConversationStorage(db)
46 # Create conversation state
47 root_node = ConversationNode(
48 message=LLMMessage(role="system", content="You are helpful"),
49 node_id=""
50 )
51 tree = Tree(root_node)
52 state = ConversationState(
53 conversation_id="conv-123",
54 message_tree=tree,
55 current_node_id="",
56 metadata={"user_id": "alice"}
57 )
59 # Save conversation
60 await storage.save_conversation(state)
62 # Load conversation
63 loaded = await storage.load_conversation("conv-123")
64 messages = loaded.get_current_messages()
66 # List all conversations for user
67 user_convos = await storage.list_conversations(
68 filter_metadata={"user_id": "alice"}
69 )
70 ```
72Serialization Format:
73 The serialized format (from `ConversationState.to_dict()`) looks like:
75 ```python
76 {
77 "schema_version": "1.0.0",
78 "conversation_id": "conv-123",
79 "current_node_id": "0.1",
80 "metadata": {"user_id": "alice"},
81 "created_at": "2024-01-01T00:00:00",
82 "updated_at": "2024-01-01T00:05:00",
83 "nodes": [
84 {
85 "node_id": "",
86 "message": {
87 "role": "system",
88 "content": "You are helpful",
89 "name": None,
90 "metadata": {}
91 },
92 "timestamp": "2024-01-01T00:00:00",
93 "prompt_name": None,
94 "branch_name": None,
95 "metadata": {}
96 },
97 {
98 "node_id": "0",
99 "message": {"role": "user", "content": "Hello", ...},
100 ...
101 },
102 {
103 "node_id": "0.0",
104 "message": {"role": "assistant", "content": "Hi!", ...},
105 "metadata": {"usage": {...}, "cost_usd": 0.0001}
106 },
107 {
108 "node_id": "0.1", # Alternative response branch
109 "message": {"role": "assistant", "content": "Greetings!", ...},
110 "branch_name": "polite-variant"
111 }
112 ],
113 "edges": [
114 ["", "0"], # Root -> user message
115 ["0", "0.0"], # User -> assistant (first branch)
116 ["0", "0.1"] # User -> assistant (alternative branch)
117 ]
118 }
119 ```
121See Also:
122 ConversationManager: High-level conversation orchestration
123 AsyncPromptBuilder: Prompt rendering with RAG integration
124"""
126from abc import ABC, abstractmethod
127from dataclasses import dataclass, field
128from datetime import datetime
129from typing import Any, Dict, List
130import logging
132from dataknobs_structures.tree import Tree
133from dataknobs_llm.llm.base import LLMMessage
135# Current schema version - increment when making schema changes
136SCHEMA_VERSION = "1.0.0"
138logger = logging.getLogger(__name__)
141@dataclass
142class ConversationNode:
143 """Data stored in each conversation tree node.
145 Each node represents a single message (system, user, or assistant) in the
146 conversation. The tree structure allows for branching conversations where
147 multiple alternative messages can be explored.
149 Attributes:
150 message: The LLM message (role + content)
151 node_id: Dot-delimited child positions from root (e.g., "0.1.2")
152 timestamp: When this message was created
153 prompt_name: Optional name of prompt template used to generate this
154 branch_name: Optional human-readable label for this branch
155 metadata: Additional metadata (usage stats, model info, etc.)
157 Example:
158 >>> node = ConversationNode(
159 ... message=LLMMessage(role="user", content="Hello"),
160 ... node_id="0.1",
161 ... timestamp=datetime.now(),
162 ... prompt_name="greeting",
163 ... branch_name="polite-variant"
164 ... )
165 """
166 message: LLMMessage
167 node_id: str
168 timestamp: datetime = field(default_factory=datetime.now)
169 prompt_name: str | None = None
170 branch_name: str | None = None
171 metadata: Dict[str, Any] = field(default_factory=dict)
173 def to_dict(self) -> Dict[str, Any]:
174 """Convert node to dictionary for storage."""
175 return {
176 "message": {
177 "role": self.message.role,
178 "content": self.message.content,
179 "name": self.message.name,
180 "metadata": self.message.metadata or {}
181 },
182 "node_id": self.node_id,
183 "timestamp": self.timestamp.isoformat(),
184 "prompt_name": self.prompt_name,
185 "branch_name": self.branch_name,
186 "metadata": self.metadata
187 }
189 @classmethod
190 def from_dict(cls, data: Dict[str, Any]) -> "ConversationNode":
191 """Create node from dictionary."""
192 msg_data = data["message"]
193 return cls(
194 message=LLMMessage(
195 role=msg_data["role"],
196 content=msg_data["content"],
197 name=msg_data.get("name"),
198 metadata=msg_data.get("metadata", {})
199 ),
200 node_id=data["node_id"],
201 timestamp=datetime.fromisoformat(data["timestamp"]),
202 prompt_name=data.get("prompt_name"),
203 branch_name=data.get("branch_name"),
204 metadata=data.get("metadata", {})
205 )
208def calculate_node_id(node: Tree) -> str:
209 """Calculate dot-delimited node ID by walking up to root.
211 The node ID represents the path from root to this node as a series of
212 child indexes. For example, "0.1.2" means: root's child 0, then that
213 node's child 1, then that node's child 2.
215 Args:
216 node: Tree node to calculate ID for
218 Returns:
219 Dot-delimited node ID (e.g., "0", "0.1", "0.1.2")
221 Example:
222 >>> root = Tree(data)
223 >>> child = root.add_child(data2)
224 >>> grandchild = child.add_child(data3)
225 >>> calculate_node_id(grandchild)
226 '0.0'
227 """
228 if node.parent is None:
229 # Root node has no parent, so it's just "0" or we could use ""
230 # Let's use "" for root to make child IDs cleaner
231 return ""
233 # Walk up to root, collecting child indexes
234 indexes = []
235 current = node
236 while current.parent is not None:
237 indexes.append(str(current.sibnum))
238 current = current.parent
240 # Reverse to get root-to-node order
241 indexes.reverse()
243 return ".".join(indexes) if indexes else "0"
246def get_node_by_id(tree: Tree, node_id: str) -> Tree | None:
247 """Retrieve tree node by its dot-delimited ID.
249 Args:
250 tree: Root of the tree
251 node_id: Dot-delimited node ID (e.g., "0.1.2")
253 Returns:
254 Tree node with that ID, or None if not found
256 Example:
257 >>> node = get_node_by_id(tree, "0.1.2")
258 >>> # Equivalent to: tree.children[0].children[1].children[2]
259 """
260 if not node_id or node_id == "":
261 return tree # Root node
263 # Split into child indexes
264 try:
265 indexes = [int(i) for i in node_id.split(".")]
266 except ValueError:
267 return None # Invalid node_id format
269 # Navigate down the tree
270 current = tree
271 for idx in indexes:
272 if not current.children or idx >= len(current.children):
273 return None # Invalid path
274 current = current.children[idx]
276 return current
279def get_messages_for_llm(tree: Tree, node_id: str) -> List[LLMMessage]:
280 """Get linear message sequence from root to specified node.
282 This is what gets sent to the LLM - the path through the tree from
283 root to current position.
285 Args:
286 tree: Root of conversation tree
287 node_id: ID of current position
289 Returns:
290 List of messages from root to current node
292 Example:
293 >>> messages = get_messages_for_llm(tree, "0.1.2")
294 >>> # Returns: [root_msg, child_0_msg, child_1_msg, child_2_msg]
295 """
296 node = get_node_by_id(tree, node_id)
297 if node is None:
298 return []
300 # Get path from root to node
301 path = node.get_path()
303 # Extract messages from each node's data
304 messages = []
305 for tree_node in path:
306 if isinstance(tree_node.data, ConversationNode):
307 messages.append(tree_node.data.message)
309 return messages
312@dataclass
313class ConversationState:
314 """State of a conversation with tree-based branching support.
316 This replaces the linear message history with a tree structure that
317 supports multiple branches (alternative conversation paths).
319 Attributes:
320 conversation_id: Unique conversation identifier
321 message_tree: Root of conversation tree (Tree[ConversationNode])
322 current_node_id: ID of current position in tree (dot-delimited)
323 metadata: Additional conversation metadata
324 created_at: Conversation creation timestamp
325 updated_at: Last update timestamp
326 schema_version: Version of the storage schema used
328 Example:
329 >>> # Create conversation with system message
330 >>> root_node = ConversationNode(
331 ... message=LLMMessage(role="system", content="You are helpful"),
332 ... node_id=""
333 ... )
334 >>> tree = Tree(root_node)
335 >>> state = ConversationState(
336 ... conversation_id="conv-123",
337 ... message_tree=tree,
338 ... current_node_id="",
339 ... metadata={"user_id": "alice"}
340 ... )
341 >>>
342 >>> # Add user message
343 >>> user_node = ConversationNode(
344 ... message=LLMMessage(role="user", content="Hello"),
345 ... node_id="0"
346 ... )
347 >>> tree.add_child(Tree(user_node))
348 >>> state.current_node_id = "0"
349 """
350 conversation_id: str
351 message_tree: Tree # Tree[ConversationNode]
352 current_node_id: str = ""
353 metadata: Dict[str, Any] = field(default_factory=dict)
354 created_at: datetime = field(default_factory=datetime.now)
355 updated_at: datetime = field(default_factory=datetime.now)
356 schema_version: str = SCHEMA_VERSION
358 def get_current_node(self) -> Tree | None:
359 """Get the current tree node."""
360 return get_node_by_id(self.message_tree, self.current_node_id)
362 def get_current_messages(self) -> List[LLMMessage]:
363 """Get messages from root to current position (for LLM)."""
364 return get_messages_for_llm(self.message_tree, self.current_node_id)
366 def to_dict(self) -> Dict[str, Any]:
367 """Convert state to dictionary for storage.
369 The tree is serialized as a list of edges (parent_id, child_id, node_data).
370 Includes schema_version for backward compatibility.
371 """
372 # Collect all nodes and their data
373 nodes = []
374 edges = []
376 all_nodes = self.message_tree.find_nodes(lambda n: True, traversal="bfs") # noqa: ARG005
377 for tree_node in all_nodes:
378 if isinstance(tree_node.data, ConversationNode):
379 nodes.append(tree_node.data.to_dict())
381 # Add edge to parent (if not root)
382 if tree_node.parent is not None:
383 parent_id = calculate_node_id(tree_node.parent)
384 child_id = tree_node.data.node_id
385 edges.append([parent_id, child_id])
387 return {
388 "schema_version": self.schema_version,
389 "conversation_id": self.conversation_id,
390 "nodes": nodes,
391 "edges": edges,
392 "current_node_id": self.current_node_id,
393 "metadata": self.metadata,
394 "created_at": self.created_at.isoformat(),
395 "updated_at": self.updated_at.isoformat()
396 }
398 @classmethod
399 def from_dict(cls, data: Dict[str, Any]) -> "ConversationState":
400 """Create state from dictionary.
402 Reconstructs the tree from nodes and edges.
403 Handles schema version migration if needed.
404 """
405 # Check schema version
406 stored_version = data.get("schema_version", "0.0.0") # Default to 0.0.0 if missing
408 # Apply migrations if needed
409 if stored_version != SCHEMA_VERSION:
410 logger.info(
411 f"Migrating conversation {data['conversation_id']} "
412 f"from schema {stored_version} to {SCHEMA_VERSION}"
413 )
414 data = cls._migrate_schema(data, stored_version, SCHEMA_VERSION)
416 # Create nodes indexed by ID
417 nodes_by_id: Dict[str, ConversationNode] = {}
418 for node_data in data["nodes"]:
419 node = ConversationNode.from_dict(node_data)
420 nodes_by_id[node.node_id] = node
422 # Find root (node with empty ID)
423 root_node = nodes_by_id.get("")
424 if root_node is None:
425 # Try to find node with no parent in edges
426 child_ids = {edge[1] for edge in data["edges"]}
427 parent_ids = {edge[0] for edge in data["edges"]}
428 root_ids = parent_ids - child_ids
429 if root_ids:
430 root_node = nodes_by_id[root_ids.pop()]
431 else:
432 # Fallback: first node
433 root_node = next(iter(nodes_by_id.values()))
435 tree = Tree(root_node)
436 tree_nodes_by_id = {"": tree} # Map node_id -> Tree node
438 # Build tree by adding edges
439 for parent_id, child_id in data["edges"]:
440 if parent_id in tree_nodes_by_id:
441 parent_tree_node = tree_nodes_by_id[parent_id]
442 child_node = nodes_by_id[child_id]
443 child_tree_node = parent_tree_node.add_child(Tree(child_node))
444 tree_nodes_by_id[child_id] = child_tree_node
446 return cls(
447 conversation_id=data["conversation_id"],
448 message_tree=tree,
449 current_node_id=data["current_node_id"],
450 metadata=data["metadata"],
451 created_at=datetime.fromisoformat(data["created_at"]),
452 updated_at=datetime.fromisoformat(data["updated_at"]),
453 schema_version=SCHEMA_VERSION # Always use current version after migration
454 )
456 @staticmethod
457 def _migrate_schema(
458 data: Dict[str, Any],
459 from_version: str,
460 to_version: str
461 ) -> Dict[str, Any]:
462 """Migrate data from one schema version to another.
464 This method applies migrations sequentially to transform data from
465 an older schema version to the current version. Migrations are applied
466 in order (e.g., 0.0.0 → 1.0.0 → 1.1.0 → 1.2.0).
468 Args:
469 data: Data in old schema format
470 from_version: Source schema version
471 to_version: Target schema version
473 Returns:
474 Data in new schema format
476 Raises:
477 SchemaVersionError: If migration path is not supported
479 Example:
480 ```python
481 # Example migration from 1.0.0 to 1.1.0 might add a new field
482 old_data = {
483 "schema_version": "1.0.0",
484 "conversation_id": "conv-123",
485 "nodes": [...],
486 "edges": [...]
487 }
489 # After migration to 1.1.0
490 new_data = ConversationState._migrate_schema(
491 old_data,
492 from_version="1.0.0",
493 to_version="1.1.0"
494 )
495 # new_data might now include: {"tags": [], ...}
496 ```
498 Note:
499 **Adding New Migration Paths**:
501 When introducing schema changes, add a migration method:
503 ```python
504 @staticmethod
505 def _migrate_1_0_to_1_1(data: Dict[str, Any]) -> Dict[str, Any]:
506 '''Migrate from schema 1.0 to 1.1.'''
507 # Example: Add new optional field
508 data["tags"] = []
509 data["schema_version"] = "1.1.0"
510 return data
511 ```
513 Then update this method to call it:
515 ```python
516 if from_version == "1.0.0" and to_version >= "1.1.0":
517 data = cls._migrate_1_0_to_1_1(data)
518 ```
519 """
520 # Parse version strings
521 from_major, _from_minor, _from_patch = map(int, from_version.split("."))
522 to_major, _to_minor, _to_patch = map(int, to_version.split("."))
524 # No migration needed if versions match
525 if from_version == to_version:
526 return data
528 # Apply migrations based on version transitions
529 # Future migrations will be added here as needed
531 # Example migration patterns:
532 # if from_version == "0.0.0" and to_version >= "1.0.0":
533 # data = cls._migrate_0_to_1(data)
534 # if from_version < "1.1.0" and to_version >= "1.1.0":
535 # data = cls._migrate_1_0_to_1_1(data)
537 # For now, version 0.0.0 (no version field) to 1.0.0 is a no-op
538 # because the schema didn't change, we just added versioning
539 if from_version == "0.0.0":
540 logger.debug("Migrating from unversioned schema to 1.0.0 (no changes needed)")
541 data["schema_version"] = "1.0.0"
542 return data
544 # If we get here and versions still don't match, it's unsupported
545 if from_major > to_major:
546 raise SchemaVersionError(
547 f"Cannot downgrade from schema {from_version} to {to_version}"
548 )
550 logger.warning(
551 f"No migration path defined from {from_version} to {to_version}. "
552 "Using data as-is."
553 )
554 data["schema_version"] = to_version
555 return data
557 # Future migration methods will be added here as needed:
558 # @staticmethod
559 # def _migrate_1_0_to_1_1(data: Dict[str, Any]) -> Dict[str, Any]:
560 # """Migrate from schema 1.0 to 1.1."""
561 # # Add new field with default value
562 # data["new_field"] = "default_value"
563 # return data
566class ConversationStorage(ABC):
567 """Abstract storage interface for conversations.
569 This interface defines the contract for persisting conversation state.
570 Implementations can use any backend (SQL, NoSQL, file, etc.).
571 """
573 @abstractmethod
574 async def save_conversation(self, state: ConversationState) -> None:
575 """Save conversation state (upsert).
577 Args:
578 state: Conversation state to save
579 """
580 pass
582 @abstractmethod
583 async def load_conversation(
584 self,
585 conversation_id: str
586 ) -> ConversationState | None:
587 """Load conversation state.
589 Args:
590 conversation_id: Conversation identifier
592 Returns:
593 Conversation state or None if not found
594 """
595 pass
597 @abstractmethod
598 async def delete_conversation(self, conversation_id: str) -> bool:
599 """Delete conversation.
601 Args:
602 conversation_id: Conversation identifier
604 Returns:
605 True if deleted, False if not found
606 """
607 pass
609 @abstractmethod
610 async def list_conversations(
611 self,
612 filter_metadata: Dict[str, Any] | None = None,
613 limit: int = 100,
614 offset: int = 0
615 ) -> List[ConversationState]:
616 """List conversations with optional filtering.
618 Args:
619 filter_metadata: Optional metadata filters
620 limit: Maximum number of results
621 offset: Offset for pagination
623 Returns:
624 List of conversation states
625 """
626 pass
629class DataknobsConversationStorage(ConversationStorage):
630 """Conversation storage using dataknobs_data backends.
632 Stores conversations as Records with the tree serialized as nodes + edges.
633 Works with any dataknobs backend (Memory, File, S3, Postgres, etc.).
635 The storage layer handles:
636 - Automatic serialization/deserialization of conversation trees
637 - Schema version migration when loading old conversations
638 - Metadata-based filtering for listing conversations
639 - Upsert operations (insert or update)
641 Attributes:
642 backend: Dataknobs async database backend instance
644 Example:
645 ```python
646 from dataknobs_data import database_factory
647 from dataknobs_llm.conversations import DataknobsConversationStorage
649 # Memory backend (development/testing)
650 db = database_factory.create(backend="memory")
651 storage = DataknobsConversationStorage(db)
653 # File backend (local persistence)
654 db = database_factory.create(
655 backend="file",
656 file_path="./conversations.jsonl"
657 )
658 storage = DataknobsConversationStorage(db)
660 # S3 backend (cloud storage)
661 db = database_factory.create(
662 backend="s3",
663 bucket="my-conversations",
664 region="us-west-2"
665 )
666 storage = DataknobsConversationStorage(db)
668 # Postgres backend (production)
669 db = database_factory.create(
670 backend="postgres",
671 host="db.example.com",
672 database="conversations",
673 user="app",
674 password="secret"
675 )
676 storage = DataknobsConversationStorage(db)
678 # Save conversation
679 await storage.save_conversation(state)
681 # Load conversation
682 state = await storage.load_conversation("conv-123")
684 # List user's conversations
685 user_convos = await storage.list_conversations(
686 filter_metadata={"user_id": "alice"},
687 limit=50
688 )
690 # Delete conversation
691 deleted = await storage.delete_conversation("conv-123")
692 ```
694 Note:
695 **Backend Selection**:
697 - **Memory**: Fast, no persistence. Use for testing or ephemeral conversations.
698 - **File**: Simple local persistence. Good for single-server deployments.
699 - **S3**: Scalable cloud storage. Best for serverless or distributed systems.
700 - **Postgres**: Full ACID guarantees. Best for production multi-server setups.
702 All backends support the same API, so you can switch between them
703 by changing the database_factory configuration.
705 See Also:
706 ConversationStorage: Abstract interface
707 ConversationState: State structure being stored
708 dataknobs_data.database_factory: Backend creation utilities
709 """
711 def __init__(self, backend: Any):
712 """Initialize storage with dataknobs backend.
714 Args:
715 backend: Dataknobs async database backend (AsyncMemoryDatabase, etc.)
716 """
717 self.backend = backend
719 def _state_to_record(self, state: ConversationState) -> Any:
720 """Convert ConversationState to Record.
722 Args:
723 state: Conversation state to convert
725 Returns:
726 Record object for storage
727 """
728 # Import here to avoid circular dependency
729 try:
730 from dataknobs_data.records import Record
731 except ImportError:
732 raise StorageError(
733 "dataknobs_data package not available. "
734 "Install it to use DataknobsConversationStorage."
735 ) from None
737 # Convert state to dict
738 data = state.to_dict()
740 # Create Record with conversation_id as storage_id
741 return Record(
742 data=data,
743 storage_id=state.conversation_id
744 )
746 def _record_to_state(self, record: Any) -> ConversationState:
747 """Convert Record to ConversationState.
749 Args:
750 record: Record object from storage
752 Returns:
753 Conversation state
754 """
755 # Extract data from record
756 data = {}
757 for field_name, field_obj in record.fields.items():
758 data[field_name] = field_obj.value
760 # Reconstruct conversation state
761 return ConversationState.from_dict(data)
763 async def save_conversation(self, state: ConversationState) -> None:
764 """Save conversation to backend."""
765 try:
766 record = self._state_to_record(state)
767 # Use upsert to insert or update
768 await self.backend.upsert(state.conversation_id, record)
769 except Exception as e:
770 raise StorageError(f"Failed to save conversation: {e}") from e
772 async def load_conversation(
773 self,
774 conversation_id: str
775 ) -> ConversationState | None:
776 """Load conversation from backend."""
777 try:
778 # Read record by ID
779 record = await self.backend.read(conversation_id)
780 if record is None:
781 return None
783 return self._record_to_state(record)
785 except Exception as e:
786 raise StorageError(f"Failed to load conversation: {e}") from e
788 async def delete_conversation(self, conversation_id: str) -> bool:
789 """Delete conversation from backend."""
790 try:
791 return await self.backend.delete(conversation_id)
792 except Exception as e:
793 raise StorageError(f"Failed to delete conversation: {e}") from e
795 async def list_conversations(
796 self,
797 filter_metadata: Dict[str, Any] | None = None,
798 limit: int = 100,
799 offset: int = 0
800 ) -> List[ConversationState]:
801 """List conversations from backend."""
802 try:
803 # Import Query for filtering
804 try:
805 from dataknobs_data.query import Query
806 except ImportError:
807 raise StorageError(
808 "dataknobs_data package not available. "
809 "Install it to use DataknobsConversationStorage."
810 ) from None
812 # Build query with metadata filters using fluent interface
813 query = Query()
814 query.limit(limit).offset(offset)
816 if filter_metadata:
817 for key, value in filter_metadata.items():
818 # Add filter for metadata.key = value
819 query.filter(f"metadata.{key}", "=", value)
821 # Search with query
822 results = await self.backend.search(query)
824 # Convert records to conversation states
825 return [self._record_to_state(record) for record in results]
827 except Exception as e:
828 raise StorageError(f"Failed to list conversations: {e}") from e
831class StorageError(Exception):
832 """Exception raised for storage operation errors."""
833 pass
836class SchemaVersionError(Exception):
837 """Exception raised for schema version incompatibilities."""
838 pass