Coverage for llm_dataset_engine/orchestration/sync_executor.py: 58%

36 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-10-15 18:04 +0200

1""" 

2Synchronous execution strategy. 

3 

4Default executor that maintains current behavior using ThreadPoolExecutor 

5for concurrent LLM calls. 

6""" 

7 

8from datetime import datetime 

9from decimal import Decimal 

10from typing import List 

11 

12import pandas as pd 

13 

14from llm_dataset_engine.adapters import create_llm_client 

15from llm_dataset_engine.core.models import ( 

16 CostEstimate, 

17 ExecutionResult, 

18 ProcessingStats, 

19) 

20from llm_dataset_engine.orchestration.execution_context import ExecutionContext 

21from llm_dataset_engine.orchestration.execution_strategy import ExecutionStrategy 

22from llm_dataset_engine.stages import ( 

23 DataLoaderStage, 

24 LLMInvocationStage, 

25 PromptFormatterStage, 

26 ResponseParserStage, 

27 ResultWriterStage, 

28) 

29from llm_dataset_engine.stages.pipeline_stage import PipelineStage 

30from llm_dataset_engine.utils import RateLimiter, RetryHandler, get_logger 

31 

32logger = get_logger(__name__) 

33 

34 

35class SyncExecutor(ExecutionStrategy): 

36 """ 

37 Synchronous execution strategy. 

38  

39 Uses ThreadPoolExecutor for concurrent LLM calls while maintaining 

40 sequential stage execution. This is the default strategy that preserves 

41 current behavior. 

42 """ 

43 

44 def __init__(self): 

45 """Initialize synchronous executor.""" 

46 self.logger = logger 

47 

48 def execute( 

49 self, 

50 stages: List[PipelineStage], 

51 context: ExecutionContext, 

52 ) -> ExecutionResult: 

53 """ 

54 Execute stages synchronously. 

55 

56 Args: 

57 stages: Pipeline stages 

58 context: Execution context 

59 

60 Returns: 

61 ExecutionResult with data and metrics 

62 """ 

63 start_time = datetime.now() 

64 

65 try: 

66 # Execute stages sequentially 

67 result_data = self._execute_stages(stages, context) 

68 

69 end_time = datetime.now() 

70 duration = (end_time - start_time).total_seconds() 

71 

72 # Calculate stats 

73 stats = ProcessingStats( 

74 total_rows=context.total_rows, 

75 processed_rows=context.last_processed_row + 1, 

76 failed_rows=context.total_rows - (context.last_processed_row + 1), 

77 skipped_rows=0, 

78 rows_per_second=context.total_rows / duration if duration > 0 else 0, 

79 total_duration_seconds=duration, 

80 ) 

81 

82 # Get cost estimate 

83 cost_estimate = CostEstimate( 

84 total_cost=context.accumulated_cost, 

85 total_tokens=context.accumulated_tokens, 

86 input_tokens=0, # Would need to track separately 

87 output_tokens=0, 

88 rows=context.total_rows, 

89 confidence="actual", 

90 ) 

91 

92 return ExecutionResult( 

93 data=result_data, 

94 metrics=stats, 

95 costs=cost_estimate, 

96 execution_id=context.session_id, 

97 start_time=start_time, 

98 end_time=end_time, 

99 success=True, 

100 ) 

101 

102 except Exception as e: 

103 self.logger.error(f"Pipeline execution failed: {e}") 

104 raise 

105 

106 def _execute_stages( 

107 self, 

108 stages: List[PipelineStage], 

109 context: ExecutionContext, 

110 ) -> pd.DataFrame: 

111 """Execute all stages sequentially.""" 

112 # This would contain the current Pipeline._execute_stages logic 

113 # For now, return empty DataFrame as placeholder 

114 return pd.DataFrame() 

115 

116 def supports_async(self) -> bool: 

117 """Sync executor doesn't support async.""" 

118 return False 

119 

120 def supports_streaming(self) -> bool: 

121 """Sync executor doesn't support streaming.""" 

122 return False 

123 

124 @property 

125 def name(self) -> str: 

126 """Strategy name.""" 

127 return "SyncExecutor" 

128