Coverage for llm_dataset_engine/orchestration/state_manager.py: 32%

50 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-10-15 18:04 +0200

1"""State management for checkpointing and recovery.""" 

2 

3from pathlib import Path 

4from typing import List, Optional 

5from uuid import UUID 

6 

7from llm_dataset_engine.adapters.checkpoint_storage import CheckpointStorage 

8from llm_dataset_engine.core.models import CheckpointInfo 

9from llm_dataset_engine.orchestration.execution_context import ExecutionContext 

10from llm_dataset_engine.utils import get_logger 

11 

12logger = get_logger(__name__) 

13 

14 

15class StateManager: 

16 """ 

17 Manages execution state persistence and recovery. 

18  

19 Follows Single Responsibility: only handles state management. 

20 Uses Strategy pattern for pluggable storage backends. 

21 """ 

22 

23 def __init__( 

24 self, storage: CheckpointStorage, checkpoint_interval: int = 500 

25 ): 

26 """ 

27 Initialize state manager. 

28 

29 Args: 

30 storage: Checkpoint storage backend 

31 checkpoint_interval: Rows between checkpoints 

32 """ 

33 self.storage = storage 

34 self.checkpoint_interval = checkpoint_interval 

35 self._last_checkpoint_row = 0 

36 

37 def should_checkpoint(self, current_row: int) -> bool: 

38 """ 

39 Check if checkpoint should be saved. 

40 

41 Args: 

42 current_row: Current row index 

43 

44 Returns: 

45 True if checkpoint due 

46 """ 

47 return ( 

48 current_row - self._last_checkpoint_row 

49 ) >= self.checkpoint_interval 

50 

51 def save_checkpoint(self, context: ExecutionContext) -> bool: 

52 """ 

53 Save checkpoint for execution context. 

54 

55 Args: 

56 context: Execution context to save 

57 

58 Returns: 

59 True if successful 

60 """ 

61 try: 

62 checkpoint_data = context.to_checkpoint() 

63 success = self.storage.save(context.session_id, checkpoint_data) 

64 

65 if success: 

66 self._last_checkpoint_row = context.last_processed_row 

67 logger.info( 

68 f"Checkpoint saved at row {context.last_processed_row}" 

69 ) 

70 

71 return success 

72 except Exception as e: 

73 logger.error(f"Failed to save checkpoint: {e}") 

74 return False 

75 

76 def load_checkpoint( 

77 self, session_id: UUID 

78 ) -> Optional[ExecutionContext]: 

79 """ 

80 Load checkpoint for session. 

81 

82 Args: 

83 session_id: Session identifier 

84 

85 Returns: 

86 Restored execution context or None 

87 """ 

88 try: 

89 checkpoint_data = self.storage.load(session_id) 

90 

91 if checkpoint_data is None: 

92 return None 

93 

94 context = ExecutionContext.from_checkpoint(checkpoint_data) 

95 logger.info( 

96 f"Checkpoint loaded from row {context.last_processed_row}" 

97 ) 

98 

99 return context 

100 except Exception as e: 

101 logger.error(f"Failed to load checkpoint: {e}") 

102 return None 

103 

104 def can_resume(self, session_id: UUID) -> bool: 

105 """ 

106 Check if session can be resumed. 

107 

108 Args: 

109 session_id: Session identifier 

110 

111 Returns: 

112 True if checkpoint exists 

113 """ 

114 return self.storage.exists(session_id) 

115 

116 def cleanup_checkpoints(self, session_id: UUID) -> bool: 

117 """ 

118 Delete checkpoints for session. 

119 

120 Args: 

121 session_id: Session identifier 

122 

123 Returns: 

124 True if deleted 

125 """ 

126 try: 

127 success = self.storage.delete(session_id) 

128 if success: 

129 logger.info(f"Checkpoints cleaned up for session {session_id}") 

130 return success 

131 except Exception as e: 

132 logger.error(f"Failed to cleanup checkpoints: {e}") 

133 return False 

134 

135 def list_checkpoints(self) -> List[CheckpointInfo]: 

136 """ 

137 List all available checkpoints. 

138 

139 Returns: 

140 List of checkpoint information 

141 """ 

142 return self.storage.list_checkpoints() 

143