Coverage for llm_dataset_engine/orchestration/async_executor.py: 37%
54 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-15 18:04 +0200
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-15 18:04 +0200
1"""
2Asynchronous execution strategy.
4Provides async/await support for non-blocking execution, ideal for
5integration with FastAPI, aiohttp, and other async frameworks.
6"""
8import asyncio
9from datetime import datetime
10from decimal import Decimal
11from typing import List
13import pandas as pd
15from llm_dataset_engine.core.models import (
16 CostEstimate,
17 ExecutionResult,
18 ProcessingStats,
19)
20from llm_dataset_engine.orchestration.execution_context import ExecutionContext
21from llm_dataset_engine.orchestration.execution_strategy import ExecutionStrategy
22from llm_dataset_engine.stages.pipeline_stage import PipelineStage
23from llm_dataset_engine.utils import get_logger
25logger = get_logger(__name__)
28class AsyncExecutor(ExecutionStrategy):
29 """
30 Asynchronous execution strategy.
32 Uses asyncio for true non-blocking execution. Leverages LlamaIndex's
33 async methods (acomplete) for concurrent LLM calls without threads.
35 Benefits:
36 - Non-blocking (works with FastAPI, aiohttp)
37 - Better resource utilization
38 - Higher concurrency without thread overhead
39 - Ideal for I/O-bound operations
40 """
42 def __init__(self, max_concurrency: int = 10):
43 """
44 Initialize async executor.
46 Args:
47 max_concurrency: Maximum concurrent async tasks
48 """
49 self.max_concurrency = max_concurrency
50 self.logger = logger
51 self.semaphore = asyncio.Semaphore(max_concurrency)
53 async def execute(
54 self,
55 stages: List[PipelineStage],
56 context: ExecutionContext,
57 ) -> ExecutionResult:
58 """
59 Execute stages asynchronously.
61 Args:
62 stages: Pipeline stages
63 context: Execution context
65 Returns:
66 ExecutionResult with data and metrics
67 """
68 start_time = datetime.now()
70 try:
71 # Execute stages with async/await
72 result_data = await self._execute_stages_async(stages, context)
74 end_time = datetime.now()
75 duration = (end_time - start_time).total_seconds()
77 # Calculate stats
78 stats = ProcessingStats(
79 total_rows=context.total_rows,
80 processed_rows=context.last_processed_row + 1,
81 failed_rows=context.total_rows - (context.last_processed_row + 1),
82 skipped_rows=0,
83 rows_per_second=context.total_rows / duration if duration > 0 else 0,
84 total_duration_seconds=duration,
85 )
87 # Get cost estimate
88 cost_estimate = CostEstimate(
89 total_cost=context.accumulated_cost,
90 total_tokens=context.accumulated_tokens,
91 input_tokens=0,
92 output_tokens=0,
93 rows=context.total_rows,
94 confidence="actual",
95 )
97 return ExecutionResult(
98 data=result_data,
99 metrics=stats,
100 costs=cost_estimate,
101 execution_id=context.session_id,
102 start_time=start_time,
103 end_time=end_time,
104 success=True,
105 )
107 except Exception as e:
108 self.logger.error(f"Async pipeline execution failed: {e}")
109 raise
111 async def _execute_stages_async(
112 self,
113 stages: List[PipelineStage],
114 context: ExecutionContext,
115 ) -> pd.DataFrame:
116 """
117 Execute stages asynchronously.
119 For stages that support async (have process_async method), use it.
120 For sync-only stages, run in thread pool to avoid blocking.
121 """
122 current_data = None
124 for stage in stages:
125 self.logger.info(f"Starting async stage: {stage.name}")
127 # Check if stage has async support
128 if hasattr(stage, "process_async"):
129 # Use native async
130 current_data = await stage.process_async(current_data, context)
131 else:
132 # Run sync stage in thread to avoid blocking
133 current_data = await asyncio.to_thread(
134 stage.process, current_data, context
135 )
137 self.logger.info(f"Completed async stage: {stage.name}")
139 return current_data
141 async def _invoke_llm_batch_async(self, prompts: List[str], llm_client):
142 """
143 Invoke LLM for multiple prompts concurrently with semaphore.
145 Uses asyncio.gather for true parallelism without thread overhead.
146 """
147 async def _invoke_one(prompt: str):
148 async with self.semaphore:
149 # Use LlamaIndex async method
150 if hasattr(llm_client, "acomplete"):
151 response = await llm_client.acomplete(prompt)
152 return response
153 else:
154 # Fallback to sync in thread
155 return await asyncio.to_thread(llm_client.invoke, prompt)
157 # Execute all prompts concurrently
158 tasks = [_invoke_one(prompt) for prompt in prompts]
159 responses = await asyncio.gather(*tasks, return_exceptions=True)
161 return responses
163 def supports_async(self) -> bool:
164 """Async executor supports async."""
165 return True
167 def supports_streaming(self) -> bool:
168 """Async executor doesn't support streaming."""
169 return False
171 @property
172 def name(self) -> str:
173 """Strategy name."""
174 return "AsyncExecutor"