Coverage for src/dataknobs_llm/conversations/flow/adapter.py: 22%

103 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-11-08 13:51 -0700

1"""Conversation flow adapter for FSM execution. 

2 

3This module provides the adapter that converts high-level ConversationFlow 

4definitions into FSM configurations and manages execution. 

5""" 

6 

7import logging 

8from typing import Dict, Any, List 

9from dataclasses import dataclass, field 

10 

11from dataknobs_fsm.api.simple import SimpleFSM 

12from dataknobs_fsm.core.data_modes import DataHandlingMode 

13 

14from .flow import ConversationFlow, FlowState, TransitionCondition 

15 

16logger = logging.getLogger(__name__) 

17 

18 

19@dataclass 

20class FlowExecutionState: 

21 """Tracks execution state during flow execution. 

22 

23 Attributes: 

24 loop_counts: Count of visits to each state 

25 total_transitions: Total number of transitions made 

26 current_state: Current state name 

27 context: Current context dictionary 

28 history: List of (state_name, response) tuples 

29 """ 

30 

31 loop_counts: Dict[str, int] = field(default_factory=dict) 

32 total_transitions: int = 0 

33 current_state: str | None = None 

34 context: Dict[str, Any] = field(default_factory=dict) 

35 history: List[tuple] = field(default_factory=list) 

36 

37 def increment_loop_count(self, state_name: str) -> int: 

38 """Increment and return loop count for a state.""" 

39 count = self.loop_counts.get(state_name, 0) + 1 

40 self.loop_counts[state_name] = count 

41 return count 

42 

43 def add_to_history(self, state_name: str, response: str): 

44 """Add a state transition to history.""" 

45 self.history.append((state_name, response)) 

46 

47 

48class ConversationFlowAdapter: 

49 """Adapts ConversationFlow to FSM execution. 

50 

51 This class converts high-level conversation flow definitions into 

52 FSM configurations and manages the execution lifecycle. 

53 """ 

54 

55 def __init__( 

56 self, 

57 flow: ConversationFlow, 

58 prompt_builder: Any, # AsyncPromptBuilder 

59 llm: Any | None = None # AsyncLLMProvider 

60 ): 

61 """Initialize the adapter. 

62 

63 Args: 

64 flow: ConversationFlow definition 

65 prompt_builder: Prompt builder for rendering prompts 

66 llm: Optional LLM provider (can be in context) 

67 """ 

68 self.flow = flow 

69 self.prompt_builder = prompt_builder 

70 self.llm = llm 

71 self.execution_state = FlowExecutionState() 

72 self._function_registry: Dict[str, Any] = {} 

73 

74 def to_fsm_config(self) -> Dict[str, Any]: 

75 """Convert ConversationFlow to FSM configuration. 

76 

77 Returns: 

78 FSM configuration dictionary 

79 """ 

80 states = [] 

81 arcs = [] 

82 

83 # Create FSM states from flow states 

84 for state_name, flow_state in self.flow.states.items(): 

85 # Determine state type 

86 is_start = (state_name == self.flow.initial_state) 

87 is_end = (len(flow_state.transitions) == 0) 

88 

89 fsm_state = { 

90 "name": state_name, 

91 "is_start": is_start, 

92 "is_end": is_end, 

93 "transform": self._create_state_transform_function(state_name, flow_state) 

94 } 

95 

96 states.append(fsm_state) 

97 

98 # Create FSM arcs from flow transitions 

99 for state_name, flow_state in self.flow.states.items(): 

100 for condition_name, target_state in flow_state.transitions.items(): 

101 condition = flow_state.transition_conditions[condition_name] 

102 

103 arc = { 

104 "from": state_name, 

105 "to": target_state, 

106 "name": f"{state_name}_to_{target_state}_{condition_name}", 

107 "pre_test": self._register_condition_function( 

108 condition_name, 

109 condition, 

110 state_name 

111 ) 

112 } 

113 

114 arcs.append(arc) 

115 

116 # Build complete FSM config 

117 config = { 

118 "name": self.flow.name, 

119 "version": self.flow.version, 

120 "description": self.flow.description or f"Conversation flow: {self.flow.name}", 

121 "states": states, 

122 "arcs": arcs, 

123 "functions": self._function_registry 

124 } 

125 

126 return config 

127 

128 def _create_state_transform_function( 

129 self, 

130 state_name: str, 

131 flow_state: FlowState 

132 ) -> str: 

133 """Create and register transform function for a state. 

134 

135 Args: 

136 state_name: Name of the state 

137 flow_state: FlowState configuration 

138 

139 Returns: 

140 Function name for FSM registration 

141 """ 

142 function_name = f"transform_{state_name}" 

143 

144 async def transform_func(data: Dict[str, Any], context: Dict[str, Any]) -> Dict[str, Any]: 

145 """Transform function for state execution.""" 

146 # Check loop limits 

147 loop_count = self.execution_state.increment_loop_count(state_name) 

148 

149 if flow_state.max_loops and loop_count > flow_state.max_loops: 

150 logger.warning( 

151 f"State '{state_name}' exceeded max loops ({flow_state.max_loops})" 

152 ) 

153 return { 

154 **data, 

155 "_error": f"Max loops exceeded for state {state_name}", 

156 "_force_end": True 

157 } 

158 

159 # Check total transition limit 

160 self.execution_state.total_transitions += 1 

161 if self.execution_state.total_transitions > self.flow.max_total_loops: 

162 logger.warning( 

163 f"Flow exceeded max total transitions ({self.flow.max_total_loops})" 

164 ) 

165 return { 

166 **data, 

167 "_error": "Max total transitions exceeded", 

168 "_force_end": True 

169 } 

170 

171 # Call on_enter hook if defined 

172 if flow_state.on_enter: 

173 try: 

174 await flow_state.on_enter(state_name, data, context) 

175 except Exception as e: 

176 logger.error(f"on_enter hook failed for state '{state_name}': {e}") 

177 

178 # Merge prompt params with data 

179 prompt_params = { 

180 **data, 

181 **flow_state.prompt_params, 

182 **context, 

183 "state": state_name, 

184 "loop_count": loop_count 

185 } 

186 

187 # Render and build prompt 

188 try: 

189 result = await self.prompt_builder.build_prompt( 

190 prompt_name=flow_state.prompt_name, 

191 params=prompt_params 

192 ) 

193 except Exception as e: 

194 logger.error(f"Failed to build prompt for state '{state_name}': {e}") 

195 return { 

196 **data, 

197 "_error": f"Prompt building failed: {e!s}", 

198 "response": f"[Error in state {state_name}]" 

199 } 

200 

201 # Store response in data 

202 response = result.content if hasattr(result, 'content') else str(result) 

203 

204 # Add to history 

205 self.execution_state.add_to_history(state_name, response) 

206 

207 # Call on_exit hook if defined 

208 if flow_state.on_exit: 

209 try: 

210 await flow_state.on_exit(state_name, data, context) 

211 except Exception as e: 

212 logger.error(f"on_exit hook failed for state '{state_name}': {e}") 

213 

214 # Update data with response 

215 return { 

216 **data, 

217 "response": response, 

218 "state": state_name, 

219 "loop_count": loop_count, 

220 "history": list(self.execution_state.history) 

221 } 

222 

223 # Register function 

224 self._function_registry[function_name] = transform_func 

225 

226 return function_name 

227 

228 def _register_condition_function( 

229 self, 

230 condition_name: str, 

231 condition: TransitionCondition, 

232 state_name: str 

233 ) -> str: 

234 """Register a condition function for arc pre_test. 

235 

236 Args: 

237 condition_name: Name of the condition 

238 condition: TransitionCondition instance 

239 state_name: Name of the source state 

240 

241 Returns: 

242 Function name for FSM registration 

243 """ 

244 function_name = f"condition_{state_name}_{condition_name}" 

245 

246 async def condition_func(data: Dict[str, Any], context: Dict[str, Any]) -> bool: 

247 """Condition function for arc evaluation.""" 

248 # Check if forced to end 

249 if data.get("_force_end"): 

250 return False 

251 

252 response = data.get("response", "") 

253 

254 # Evaluate condition 

255 try: 

256 result = await condition.evaluate(response, {**context, **data}) 

257 logger.debug( 

258 f"Condition '{condition_name}' for state '{state_name}': {result}" 

259 ) 

260 return result 

261 except Exception as e: 

262 logger.error( 

263 f"Condition '{condition_name}' evaluation failed: {e}" 

264 ) 

265 return False 

266 

267 # Register function 

268 self._function_registry[function_name] = condition_func 

269 

270 return function_name 

271 

272 async def execute( 

273 self, 

274 initial_data: Dict[str, Any] | None = None 

275 ) -> Dict[str, Any]: 

276 """Execute the conversation flow. 

277 

278 Args: 

279 initial_data: Initial data for the flow 

280 

281 Returns: 

282 Final data after flow execution 

283 """ 

284 # Initialize execution state 

285 self.execution_state = FlowExecutionState( 

286 current_state=self.flow.initial_state, 

287 context={**self.flow.initial_context} 

288 ) 

289 

290 # Prepare initial data 

291 data = initial_data or {} 

292 data = {**data, **self.flow.initial_context} 

293 

294 # Convert to FSM config 

295 fsm_config = self.to_fsm_config() 

296 

297 # Create and execute FSM 

298 fsm = SimpleFSM(fsm_config, data_mode=DataHandlingMode.COPY) 

299 

300 try: 

301 result = await fsm.process_async(data) 

302 return result.get("data", result) 

303 except Exception as e: 

304 logger.error(f"Flow execution failed: {e}") 

305 return { 

306 **data, 

307 "_error": str(e), 

308 "_execution_failed": True 

309 } 

310 

311 def get_execution_summary(self) -> Dict[str, Any]: 

312 """Get summary of flow execution. 

313 

314 Returns: 

315 Dictionary with execution statistics 

316 """ 

317 return { 

318 "total_transitions": self.execution_state.total_transitions, 

319 "loop_counts": dict(self.execution_state.loop_counts), 

320 "current_state": self.execution_state.current_state, 

321 "history_length": len(self.execution_state.history), 

322 "states_visited": list(self.execution_state.loop_counts.keys()) 

323 }