Coverage for llm_dataset_engine/orchestration/sync_executor.py: 58%
36 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-15 18:04 +0200
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-15 18:04 +0200
1"""
2Synchronous execution strategy.
4Default executor that maintains current behavior using ThreadPoolExecutor
5for concurrent LLM calls.
6"""
8from datetime import datetime
9from decimal import Decimal
10from typing import List
12import pandas as pd
14from llm_dataset_engine.adapters import create_llm_client
15from llm_dataset_engine.core.models import (
16 CostEstimate,
17 ExecutionResult,
18 ProcessingStats,
19)
20from llm_dataset_engine.orchestration.execution_context import ExecutionContext
21from llm_dataset_engine.orchestration.execution_strategy import ExecutionStrategy
22from llm_dataset_engine.stages import (
23 DataLoaderStage,
24 LLMInvocationStage,
25 PromptFormatterStage,
26 ResponseParserStage,
27 ResultWriterStage,
28)
29from llm_dataset_engine.stages.pipeline_stage import PipelineStage
30from llm_dataset_engine.utils import RateLimiter, RetryHandler, get_logger
32logger = get_logger(__name__)
35class SyncExecutor(ExecutionStrategy):
36 """
37 Synchronous execution strategy.
39 Uses ThreadPoolExecutor for concurrent LLM calls while maintaining
40 sequential stage execution. This is the default strategy that preserves
41 current behavior.
42 """
44 def __init__(self):
45 """Initialize synchronous executor."""
46 self.logger = logger
48 def execute(
49 self,
50 stages: List[PipelineStage],
51 context: ExecutionContext,
52 ) -> ExecutionResult:
53 """
54 Execute stages synchronously.
56 Args:
57 stages: Pipeline stages
58 context: Execution context
60 Returns:
61 ExecutionResult with data and metrics
62 """
63 start_time = datetime.now()
65 try:
66 # Execute stages sequentially
67 result_data = self._execute_stages(stages, context)
69 end_time = datetime.now()
70 duration = (end_time - start_time).total_seconds()
72 # Calculate stats
73 stats = ProcessingStats(
74 total_rows=context.total_rows,
75 processed_rows=context.last_processed_row + 1,
76 failed_rows=context.total_rows - (context.last_processed_row + 1),
77 skipped_rows=0,
78 rows_per_second=context.total_rows / duration if duration > 0 else 0,
79 total_duration_seconds=duration,
80 )
82 # Get cost estimate
83 cost_estimate = CostEstimate(
84 total_cost=context.accumulated_cost,
85 total_tokens=context.accumulated_tokens,
86 input_tokens=0, # Would need to track separately
87 output_tokens=0,
88 rows=context.total_rows,
89 confidence="actual",
90 )
92 return ExecutionResult(
93 data=result_data,
94 metrics=stats,
95 costs=cost_estimate,
96 execution_id=context.session_id,
97 start_time=start_time,
98 end_time=end_time,
99 success=True,
100 )
102 except Exception as e:
103 self.logger.error(f"Pipeline execution failed: {e}")
104 raise
106 def _execute_stages(
107 self,
108 stages: List[PipelineStage],
109 context: ExecutionContext,
110 ) -> pd.DataFrame:
111 """Execute all stages sequentially."""
112 # This would contain the current Pipeline._execute_stages logic
113 # For now, return empty DataFrame as placeholder
114 return pd.DataFrame()
116 def supports_async(self) -> bool:
117 """Sync executor doesn't support async."""
118 return False
120 def supports_streaming(self) -> bool:
121 """Sync executor doesn't support streaming."""
122 return False
124 @property
125 def name(self) -> str:
126 """Strategy name."""
127 return "SyncExecutor"