Coverage for src/dataknobs_llm/conversations/flow/adapter.py: 22%
103 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-08 13:51 -0700
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-08 13:51 -0700
1"""Conversation flow adapter for FSM execution.
3This module provides the adapter that converts high-level ConversationFlow
4definitions into FSM configurations and manages execution.
5"""
7import logging
8from typing import Dict, Any, List
9from dataclasses import dataclass, field
11from dataknobs_fsm.api.simple import SimpleFSM
12from dataknobs_fsm.core.data_modes import DataHandlingMode
14from .flow import ConversationFlow, FlowState, TransitionCondition
16logger = logging.getLogger(__name__)
19@dataclass
20class FlowExecutionState:
21 """Tracks execution state during flow execution.
23 Attributes:
24 loop_counts: Count of visits to each state
25 total_transitions: Total number of transitions made
26 current_state: Current state name
27 context: Current context dictionary
28 history: List of (state_name, response) tuples
29 """
31 loop_counts: Dict[str, int] = field(default_factory=dict)
32 total_transitions: int = 0
33 current_state: str | None = None
34 context: Dict[str, Any] = field(default_factory=dict)
35 history: List[tuple] = field(default_factory=list)
37 def increment_loop_count(self, state_name: str) -> int:
38 """Increment and return loop count for a state."""
39 count = self.loop_counts.get(state_name, 0) + 1
40 self.loop_counts[state_name] = count
41 return count
43 def add_to_history(self, state_name: str, response: str):
44 """Add a state transition to history."""
45 self.history.append((state_name, response))
48class ConversationFlowAdapter:
49 """Adapts ConversationFlow to FSM execution.
51 This class converts high-level conversation flow definitions into
52 FSM configurations and manages the execution lifecycle.
53 """
55 def __init__(
56 self,
57 flow: ConversationFlow,
58 prompt_builder: Any, # AsyncPromptBuilder
59 llm: Any | None = None # AsyncLLMProvider
60 ):
61 """Initialize the adapter.
63 Args:
64 flow: ConversationFlow definition
65 prompt_builder: Prompt builder for rendering prompts
66 llm: Optional LLM provider (can be in context)
67 """
68 self.flow = flow
69 self.prompt_builder = prompt_builder
70 self.llm = llm
71 self.execution_state = FlowExecutionState()
72 self._function_registry: Dict[str, Any] = {}
74 def to_fsm_config(self) -> Dict[str, Any]:
75 """Convert ConversationFlow to FSM configuration.
77 Returns:
78 FSM configuration dictionary
79 """
80 states = []
81 arcs = []
83 # Create FSM states from flow states
84 for state_name, flow_state in self.flow.states.items():
85 # Determine state type
86 is_start = (state_name == self.flow.initial_state)
87 is_end = (len(flow_state.transitions) == 0)
89 fsm_state = {
90 "name": state_name,
91 "is_start": is_start,
92 "is_end": is_end,
93 "transform": self._create_state_transform_function(state_name, flow_state)
94 }
96 states.append(fsm_state)
98 # Create FSM arcs from flow transitions
99 for state_name, flow_state in self.flow.states.items():
100 for condition_name, target_state in flow_state.transitions.items():
101 condition = flow_state.transition_conditions[condition_name]
103 arc = {
104 "from": state_name,
105 "to": target_state,
106 "name": f"{state_name}_to_{target_state}_{condition_name}",
107 "pre_test": self._register_condition_function(
108 condition_name,
109 condition,
110 state_name
111 )
112 }
114 arcs.append(arc)
116 # Build complete FSM config
117 config = {
118 "name": self.flow.name,
119 "version": self.flow.version,
120 "description": self.flow.description or f"Conversation flow: {self.flow.name}",
121 "states": states,
122 "arcs": arcs,
123 "functions": self._function_registry
124 }
126 return config
128 def _create_state_transform_function(
129 self,
130 state_name: str,
131 flow_state: FlowState
132 ) -> str:
133 """Create and register transform function for a state.
135 Args:
136 state_name: Name of the state
137 flow_state: FlowState configuration
139 Returns:
140 Function name for FSM registration
141 """
142 function_name = f"transform_{state_name}"
144 async def transform_func(data: Dict[str, Any], context: Dict[str, Any]) -> Dict[str, Any]:
145 """Transform function for state execution."""
146 # Check loop limits
147 loop_count = self.execution_state.increment_loop_count(state_name)
149 if flow_state.max_loops and loop_count > flow_state.max_loops:
150 logger.warning(
151 f"State '{state_name}' exceeded max loops ({flow_state.max_loops})"
152 )
153 return {
154 **data,
155 "_error": f"Max loops exceeded for state {state_name}",
156 "_force_end": True
157 }
159 # Check total transition limit
160 self.execution_state.total_transitions += 1
161 if self.execution_state.total_transitions > self.flow.max_total_loops:
162 logger.warning(
163 f"Flow exceeded max total transitions ({self.flow.max_total_loops})"
164 )
165 return {
166 **data,
167 "_error": "Max total transitions exceeded",
168 "_force_end": True
169 }
171 # Call on_enter hook if defined
172 if flow_state.on_enter:
173 try:
174 await flow_state.on_enter(state_name, data, context)
175 except Exception as e:
176 logger.error(f"on_enter hook failed for state '{state_name}': {e}")
178 # Merge prompt params with data
179 prompt_params = {
180 **data,
181 **flow_state.prompt_params,
182 **context,
183 "state": state_name,
184 "loop_count": loop_count
185 }
187 # Render and build prompt
188 try:
189 result = await self.prompt_builder.build_prompt(
190 prompt_name=flow_state.prompt_name,
191 params=prompt_params
192 )
193 except Exception as e:
194 logger.error(f"Failed to build prompt for state '{state_name}': {e}")
195 return {
196 **data,
197 "_error": f"Prompt building failed: {e!s}",
198 "response": f"[Error in state {state_name}]"
199 }
201 # Store response in data
202 response = result.content if hasattr(result, 'content') else str(result)
204 # Add to history
205 self.execution_state.add_to_history(state_name, response)
207 # Call on_exit hook if defined
208 if flow_state.on_exit:
209 try:
210 await flow_state.on_exit(state_name, data, context)
211 except Exception as e:
212 logger.error(f"on_exit hook failed for state '{state_name}': {e}")
214 # Update data with response
215 return {
216 **data,
217 "response": response,
218 "state": state_name,
219 "loop_count": loop_count,
220 "history": list(self.execution_state.history)
221 }
223 # Register function
224 self._function_registry[function_name] = transform_func
226 return function_name
228 def _register_condition_function(
229 self,
230 condition_name: str,
231 condition: TransitionCondition,
232 state_name: str
233 ) -> str:
234 """Register a condition function for arc pre_test.
236 Args:
237 condition_name: Name of the condition
238 condition: TransitionCondition instance
239 state_name: Name of the source state
241 Returns:
242 Function name for FSM registration
243 """
244 function_name = f"condition_{state_name}_{condition_name}"
246 async def condition_func(data: Dict[str, Any], context: Dict[str, Any]) -> bool:
247 """Condition function for arc evaluation."""
248 # Check if forced to end
249 if data.get("_force_end"):
250 return False
252 response = data.get("response", "")
254 # Evaluate condition
255 try:
256 result = await condition.evaluate(response, {**context, **data})
257 logger.debug(
258 f"Condition '{condition_name}' for state '{state_name}': {result}"
259 )
260 return result
261 except Exception as e:
262 logger.error(
263 f"Condition '{condition_name}' evaluation failed: {e}"
264 )
265 return False
267 # Register function
268 self._function_registry[function_name] = condition_func
270 return function_name
272 async def execute(
273 self,
274 initial_data: Dict[str, Any] | None = None
275 ) -> Dict[str, Any]:
276 """Execute the conversation flow.
278 Args:
279 initial_data: Initial data for the flow
281 Returns:
282 Final data after flow execution
283 """
284 # Initialize execution state
285 self.execution_state = FlowExecutionState(
286 current_state=self.flow.initial_state,
287 context={**self.flow.initial_context}
288 )
290 # Prepare initial data
291 data = initial_data or {}
292 data = {**data, **self.flow.initial_context}
294 # Convert to FSM config
295 fsm_config = self.to_fsm_config()
297 # Create and execute FSM
298 fsm = SimpleFSM(fsm_config, data_mode=DataHandlingMode.COPY)
300 try:
301 result = await fsm.process_async(data)
302 return result.get("data", result)
303 except Exception as e:
304 logger.error(f"Flow execution failed: {e}")
305 return {
306 **data,
307 "_error": str(e),
308 "_execution_failed": True
309 }
311 def get_execution_summary(self) -> Dict[str, Any]:
312 """Get summary of flow execution.
314 Returns:
315 Dictionary with execution statistics
316 """
317 return {
318 "total_transitions": self.execution_state.total_transitions,
319 "loop_counts": dict(self.execution_state.loop_counts),
320 "current_state": self.execution_state.current_state,
321 "history_length": len(self.execution_state.history),
322 "states_visited": list(self.execution_state.loop_counts.keys())
323 }