Coverage for src / keyword_research / batch_processor.py: 90%
40 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-13 20:29 +0800
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-13 20:29 +0800
1"""批量关键词处理器.
3支持异步批量处理、指数退避重试和进度报告。
4"""
6import asyncio
7from collections.abc import Callable
8from dataclasses import dataclass
9from typing import TYPE_CHECKING, Any
11if TYPE_CHECKING:
12 from src.keyword_research.google_planner import GoogleKeywordPlanner, KeywordData
15@dataclass
16class BatchResult:
17 """批量处理结果.
19 Attributes:
20 keyword: 关键词
21 data: 关键词数据
22 success: 是否成功
23 error: 错误信息(如果有)
24 """
26 keyword: str
27 data: "KeywordData | None" = None
28 success: bool = True
29 error: str = ""
32class BatchProcessor:
33 """异步批量关键词处理器.
35 支持并发处理、失败重试和进度回调。
37 Example:
38 >>> processor = BatchProcessor(planner)
39 >>> results = await processor.process(["kw1", "kw2", "kw3"])
40 """
42 def __init__(
43 self,
44 planner: "GoogleKeywordPlanner",
45 max_concurrent: int = 5,
46 max_retries: int = 3,
47 ) -> None:
48 """初始化.
50 Args:
51 planner: GoogleKeywordPlanner 实例
52 max_concurrent: 最大并发数
53 max_retries: 最大重试次数
54 """
55 self.planner = planner
56 self.max_concurrent = max_concurrent
57 self.max_retries = max_retries
59 async def process(
60 self,
61 keywords: list[str],
62 progress_callback: Callable[[int, int], None] | None = None,
63 ) -> list[BatchResult]:
64 """批量处理关键词.
66 Args:
67 keywords: 关键词列表
68 progress_callback: 进度回调函数 (completed, total)
70 Returns:
71 BatchResult 列表
72 """
73 semaphore = asyncio.Semaphore(self.max_concurrent)
74 completed = 0
75 total = len(keywords)
77 async def process_one(keyword: str) -> BatchResult:
78 nonlocal completed
79 async with semaphore:
80 for attempt in range(self.max_retries):
81 try:
82 data = await self.planner.get_keyword_data(keyword)
83 result = BatchResult(keyword=keyword, data=data, success=True)
84 break
85 except Exception as e:
86 if attempt == self.max_retries - 1:
87 result = BatchResult(keyword=keyword, success=False, error=str(e))
88 else:
89 await asyncio.sleep(2**attempt) # 指数退避
90 completed += 1
91 if progress_callback:
92 progress_callback(completed, total)
93 return result
95 tasks = [process_one(kw) for kw in keywords]
96 return await asyncio.gather(*tasks)
98 async def process_with_progress(self, keywords: list[str]) -> dict[str, Any]:
99 """处理并返回进度信息.
101 Returns:
102 包含结果、成功数、失败数的字典
103 """
104 results = await self.process(keywords)
106 successful = [r for r in results if r.success]
107 failed = [r for r in results if not r.success]
109 return {
110 "results": results,
111 "successful_count": len(successful),
112 "failed_count": len(failed),
113 "total": len(keywords),
114 }