Coverage for llm_dataset_engine/orchestration/async_executor.py: 37%

54 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-10-15 18:04 +0200

1""" 

2Asynchronous execution strategy. 

3 

4Provides async/await support for non-blocking execution, ideal for 

5integration with FastAPI, aiohttp, and other async frameworks. 

6""" 

7 

8import asyncio 

9from datetime import datetime 

10from decimal import Decimal 

11from typing import List 

12 

13import pandas as pd 

14 

15from llm_dataset_engine.core.models import ( 

16 CostEstimate, 

17 ExecutionResult, 

18 ProcessingStats, 

19) 

20from llm_dataset_engine.orchestration.execution_context import ExecutionContext 

21from llm_dataset_engine.orchestration.execution_strategy import ExecutionStrategy 

22from llm_dataset_engine.stages.pipeline_stage import PipelineStage 

23from llm_dataset_engine.utils import get_logger 

24 

25logger = get_logger(__name__) 

26 

27 

28class AsyncExecutor(ExecutionStrategy): 

29 """ 

30 Asynchronous execution strategy. 

31  

32 Uses asyncio for true non-blocking execution. Leverages LlamaIndex's 

33 async methods (acomplete) for concurrent LLM calls without threads. 

34  

35 Benefits: 

36 - Non-blocking (works with FastAPI, aiohttp) 

37 - Better resource utilization 

38 - Higher concurrency without thread overhead 

39 - Ideal for I/O-bound operations 

40 """ 

41 

42 def __init__(self, max_concurrency: int = 10): 

43 """ 

44 Initialize async executor. 

45 

46 Args: 

47 max_concurrency: Maximum concurrent async tasks 

48 """ 

49 self.max_concurrency = max_concurrency 

50 self.logger = logger 

51 self.semaphore = asyncio.Semaphore(max_concurrency) 

52 

53 async def execute( 

54 self, 

55 stages: List[PipelineStage], 

56 context: ExecutionContext, 

57 ) -> ExecutionResult: 

58 """ 

59 Execute stages asynchronously. 

60 

61 Args: 

62 stages: Pipeline stages 

63 context: Execution context 

64 

65 Returns: 

66 ExecutionResult with data and metrics 

67 """ 

68 start_time = datetime.now() 

69 

70 try: 

71 # Execute stages with async/await 

72 result_data = await self._execute_stages_async(stages, context) 

73 

74 end_time = datetime.now() 

75 duration = (end_time - start_time).total_seconds() 

76 

77 # Calculate stats 

78 stats = ProcessingStats( 

79 total_rows=context.total_rows, 

80 processed_rows=context.last_processed_row + 1, 

81 failed_rows=context.total_rows - (context.last_processed_row + 1), 

82 skipped_rows=0, 

83 rows_per_second=context.total_rows / duration if duration > 0 else 0, 

84 total_duration_seconds=duration, 

85 ) 

86 

87 # Get cost estimate 

88 cost_estimate = CostEstimate( 

89 total_cost=context.accumulated_cost, 

90 total_tokens=context.accumulated_tokens, 

91 input_tokens=0, 

92 output_tokens=0, 

93 rows=context.total_rows, 

94 confidence="actual", 

95 ) 

96 

97 return ExecutionResult( 

98 data=result_data, 

99 metrics=stats, 

100 costs=cost_estimate, 

101 execution_id=context.session_id, 

102 start_time=start_time, 

103 end_time=end_time, 

104 success=True, 

105 ) 

106 

107 except Exception as e: 

108 self.logger.error(f"Async pipeline execution failed: {e}") 

109 raise 

110 

111 async def _execute_stages_async( 

112 self, 

113 stages: List[PipelineStage], 

114 context: ExecutionContext, 

115 ) -> pd.DataFrame: 

116 """ 

117 Execute stages asynchronously. 

118  

119 For stages that support async (have process_async method), use it. 

120 For sync-only stages, run in thread pool to avoid blocking. 

121 """ 

122 current_data = None 

123 

124 for stage in stages: 

125 self.logger.info(f"Starting async stage: {stage.name}") 

126 

127 # Check if stage has async support 

128 if hasattr(stage, "process_async"): 

129 # Use native async 

130 current_data = await stage.process_async(current_data, context) 

131 else: 

132 # Run sync stage in thread to avoid blocking 

133 current_data = await asyncio.to_thread( 

134 stage.process, current_data, context 

135 ) 

136 

137 self.logger.info(f"Completed async stage: {stage.name}") 

138 

139 return current_data 

140 

141 async def _invoke_llm_batch_async(self, prompts: List[str], llm_client): 

142 """ 

143 Invoke LLM for multiple prompts concurrently with semaphore. 

144  

145 Uses asyncio.gather for true parallelism without thread overhead. 

146 """ 

147 async def _invoke_one(prompt: str): 

148 async with self.semaphore: 

149 # Use LlamaIndex async method 

150 if hasattr(llm_client, "acomplete"): 

151 response = await llm_client.acomplete(prompt) 

152 return response 

153 else: 

154 # Fallback to sync in thread 

155 return await asyncio.to_thread(llm_client.invoke, prompt) 

156 

157 # Execute all prompts concurrently 

158 tasks = [_invoke_one(prompt) for prompt in prompts] 

159 responses = await asyncio.gather(*tasks, return_exceptions=True) 

160 

161 return responses 

162 

163 def supports_async(self) -> bool: 

164 """Async executor supports async.""" 

165 return True 

166 

167 def supports_streaming(self) -> bool: 

168 """Async executor doesn't support streaming.""" 

169 return False 

170 

171 @property 

172 def name(self) -> str: 

173 """Strategy name.""" 

174 return "AsyncExecutor" 

175