Coverage for little_loops / state.py: 40%

82 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-03-18 16:18 -0500

1"""State persistence for little-loops automation. 

2 

3Provides state management for resume capability during automated processing. 

4""" 

5 

6from __future__ import annotations 

7 

8import json 

9from dataclasses import dataclass, field 

10from datetime import datetime 

11from pathlib import Path 

12from typing import Any 

13 

14from little_loops.logger import Logger 

15 

16 

17@dataclass 

18class ProcessingState: 

19 """Persistent state for automated issue processing. 

20 

21 Enables resume capability after interruption by tracking: 

22 - Currently processing issue 

23 - Completed issues 

24 - Failed issues with reasons 

25 - Timing information 

26 - Auto-corrections made during validation 

27 

28 Attributes: 

29 current_issue: Path to currently processing issue file 

30 phase: Current processing phase 

31 timestamp: Last update timestamp 

32 completed_issues: List of completed issue IDs 

33 failed_issues: Mapping of issue ID to failure reason 

34 attempted_issues: Set of issues already attempted 

35 timing: Per-issue timing breakdown 

36 corrections: Mapping of issue ID to list of corrections made 

37 """ 

38 

39 current_issue: str = "" 

40 phase: str = "idle" 

41 timestamp: str = "" 

42 completed_issues: list[str] = field(default_factory=list) 

43 failed_issues: dict[str, str] = field(default_factory=dict) 

44 attempted_issues: set[str] = field(default_factory=set) 

45 timing: dict[str, dict[str, float]] = field(default_factory=dict) 

46 corrections: dict[str, list[str]] = field(default_factory=dict) 

47 

48 def to_dict(self) -> dict[str, Any]: 

49 """Convert state to dictionary for JSON serialization.""" 

50 return { 

51 "current_issue": self.current_issue, 

52 "phase": self.phase, 

53 "timestamp": self.timestamp, 

54 "completed_issues": self.completed_issues, 

55 "failed_issues": self.failed_issues, 

56 "attempted_issues": list(self.attempted_issues), 

57 "timing": self.timing, 

58 "corrections": self.corrections, 

59 } 

60 

61 @classmethod 

62 def from_dict(cls, data: dict[str, Any]) -> ProcessingState: 

63 """Create state from dictionary (JSON deserialization).""" 

64 return cls( 

65 current_issue=data.get("current_issue", ""), 

66 phase=data.get("phase", "idle"), 

67 timestamp=data.get("timestamp", ""), 

68 completed_issues=list(data.get("completed_issues", [])), 

69 failed_issues=dict(data.get("failed_issues", {})), 

70 attempted_issues=set(data.get("attempted_issues", [])), 

71 timing=dict(data.get("timing", {})), 

72 corrections=dict(data.get("corrections", {})), 

73 ) 

74 

75 

76class StateManager: 

77 """Manages persistence of processing state. 

78 

79 Handles loading, saving, and cleanup of state files for 

80 automated issue processing with resume capability. 

81 """ 

82 

83 def __init__(self, state_file: Path, logger: Logger) -> None: 

84 """Initialize state manager. 

85 

86 Args: 

87 state_file: Path to the state file 

88 logger: Logger instance for output 

89 """ 

90 self.state_file = state_file 

91 self.logger = logger 

92 self._state: ProcessingState | None = None 

93 

94 @property 

95 def state(self) -> ProcessingState: 

96 """Get current state, creating new if needed.""" 

97 if self._state is None: 

98 self._state = ProcessingState(timestamp=datetime.now().isoformat()) 

99 return self._state 

100 

101 def load(self) -> ProcessingState | None: 

102 """Load state from file. 

103 

104 Returns: 

105 Loaded state or None if file doesn't exist 

106 """ 

107 try: 

108 if self.state_file.exists(): 

109 data = json.loads(self.state_file.read_text()) 

110 self._state = ProcessingState.from_dict(data) 

111 self.logger.info(f"State loaded from {self.state_file}") 

112 return self._state 

113 except json.JSONDecodeError as e: 

114 self.logger.error(f"Failed to parse state file: {e}") 

115 except Exception as e: 

116 self.logger.error(f"Failed to load state: {e}") 

117 return None 

118 

119 def save(self) -> None: 

120 """Save current state to file.""" 

121 try: 

122 self.state.timestamp = datetime.now().isoformat() 

123 self.state_file.write_text(json.dumps(self.state.to_dict(), indent=2)) 

124 self.logger.info(f"State saved to {self.state_file}") 

125 except Exception as e: 

126 self.logger.error(f"Failed to save state: {e}") 

127 

128 def cleanup(self) -> None: 

129 """Remove state file.""" 

130 try: 

131 if self.state_file.exists(): 

132 self.state_file.unlink() 

133 self.logger.info("State file cleaned up") 

134 except Exception as e: 

135 self.logger.error(f"Failed to cleanup state file: {e}") 

136 

137 def update_current(self, issue_path: str, phase: str) -> None: 

138 """Update current issue and phase. 

139 

140 Args: 

141 issue_path: Path to current issue file 

142 phase: Current processing phase 

143 """ 

144 self.state.current_issue = issue_path 

145 self.state.phase = phase 

146 self.save() 

147 

148 def mark_attempted(self, issue_id: str, *, save: bool = True) -> None: 

149 """Mark an issue as attempted. 

150 

151 Args: 

152 issue_id: Issue identifier 

153 save: Whether to persist state immediately (default True) 

154 """ 

155 self.state.attempted_issues.add(issue_id) 

156 if save: 

157 self.save() 

158 

159 def mark_completed(self, issue_id: str, timing: dict[str, float] | None = None) -> None: 

160 """Mark an issue as completed. 

161 

162 Args: 

163 issue_id: Issue identifier 

164 timing: Optional timing breakdown 

165 """ 

166 self.state.completed_issues.append(issue_id) 

167 if timing: 

168 self.state.timing[issue_id] = timing 

169 self.state.current_issue = "" 

170 self.state.phase = "idle" 

171 self.save() 

172 

173 def mark_failed(self, issue_id: str, reason: str) -> None: 

174 """Mark an issue as failed. 

175 

176 Args: 

177 issue_id: Issue identifier 

178 reason: Failure reason 

179 """ 

180 self.state.failed_issues[issue_id] = reason 

181 self.save() 

182 

183 def is_attempted(self, issue_id: str) -> bool: 

184 """Check if an issue has been attempted. 

185 

186 Args: 

187 issue_id: Issue identifier 

188 

189 Returns: 

190 True if issue was already attempted 

191 """ 

192 return issue_id in self.state.attempted_issues 

193 

194 def record_corrections(self, issue_id: str, corrections: list[str]) -> None: 

195 """Record corrections made to an issue. 

196 

197 Args: 

198 issue_id: Issue identifier 

199 corrections: List of correction descriptions 

200 """ 

201 if corrections: 

202 self.state.corrections[issue_id] = corrections 

203 self.save()