Coverage for llm_dataset_engine/orchestration/state_manager.py: 32%
50 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-15 18:04 +0200
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-15 18:04 +0200
1"""State management for checkpointing and recovery."""
3from pathlib import Path
4from typing import List, Optional
5from uuid import UUID
7from llm_dataset_engine.adapters.checkpoint_storage import CheckpointStorage
8from llm_dataset_engine.core.models import CheckpointInfo
9from llm_dataset_engine.orchestration.execution_context import ExecutionContext
10from llm_dataset_engine.utils import get_logger
12logger = get_logger(__name__)
15class StateManager:
16 """
17 Manages execution state persistence and recovery.
19 Follows Single Responsibility: only handles state management.
20 Uses Strategy pattern for pluggable storage backends.
21 """
23 def __init__(
24 self, storage: CheckpointStorage, checkpoint_interval: int = 500
25 ):
26 """
27 Initialize state manager.
29 Args:
30 storage: Checkpoint storage backend
31 checkpoint_interval: Rows between checkpoints
32 """
33 self.storage = storage
34 self.checkpoint_interval = checkpoint_interval
35 self._last_checkpoint_row = 0
37 def should_checkpoint(self, current_row: int) -> bool:
38 """
39 Check if checkpoint should be saved.
41 Args:
42 current_row: Current row index
44 Returns:
45 True if checkpoint due
46 """
47 return (
48 current_row - self._last_checkpoint_row
49 ) >= self.checkpoint_interval
51 def save_checkpoint(self, context: ExecutionContext) -> bool:
52 """
53 Save checkpoint for execution context.
55 Args:
56 context: Execution context to save
58 Returns:
59 True if successful
60 """
61 try:
62 checkpoint_data = context.to_checkpoint()
63 success = self.storage.save(context.session_id, checkpoint_data)
65 if success:
66 self._last_checkpoint_row = context.last_processed_row
67 logger.info(
68 f"Checkpoint saved at row {context.last_processed_row}"
69 )
71 return success
72 except Exception as e:
73 logger.error(f"Failed to save checkpoint: {e}")
74 return False
76 def load_checkpoint(
77 self, session_id: UUID
78 ) -> Optional[ExecutionContext]:
79 """
80 Load checkpoint for session.
82 Args:
83 session_id: Session identifier
85 Returns:
86 Restored execution context or None
87 """
88 try:
89 checkpoint_data = self.storage.load(session_id)
91 if checkpoint_data is None:
92 return None
94 context = ExecutionContext.from_checkpoint(checkpoint_data)
95 logger.info(
96 f"Checkpoint loaded from row {context.last_processed_row}"
97 )
99 return context
100 except Exception as e:
101 logger.error(f"Failed to load checkpoint: {e}")
102 return None
104 def can_resume(self, session_id: UUID) -> bool:
105 """
106 Check if session can be resumed.
108 Args:
109 session_id: Session identifier
111 Returns:
112 True if checkpoint exists
113 """
114 return self.storage.exists(session_id)
116 def cleanup_checkpoints(self, session_id: UUID) -> bool:
117 """
118 Delete checkpoints for session.
120 Args:
121 session_id: Session identifier
123 Returns:
124 True if deleted
125 """
126 try:
127 success = self.storage.delete(session_id)
128 if success:
129 logger.info(f"Checkpoints cleaned up for session {session_id}")
130 return success
131 except Exception as e:
132 logger.error(f"Failed to cleanup checkpoints: {e}")
133 return False
135 def list_checkpoints(self) -> List[CheckpointInfo]:
136 """
137 List all available checkpoints.
139 Returns:
140 List of checkpoint information
141 """
142 return self.storage.list_checkpoints()