Coverage for src / keyword_research / batch_processor.py: 90%

40 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-02-13 20:29 +0800

1"""批量关键词处理器. 

2 

3支持异步批量处理、指数退避重试和进度报告。 

4""" 

5 

6import asyncio 

7from collections.abc import Callable 

8from dataclasses import dataclass 

9from typing import TYPE_CHECKING, Any 

10 

11if TYPE_CHECKING: 

12 from src.keyword_research.google_planner import GoogleKeywordPlanner, KeywordData 

13 

14 

15@dataclass 

16class BatchResult: 

17 """批量处理结果. 

18 

19 Attributes: 

20 keyword: 关键词 

21 data: 关键词数据 

22 success: 是否成功 

23 error: 错误信息(如果有) 

24 """ 

25 

26 keyword: str 

27 data: "KeywordData | None" = None 

28 success: bool = True 

29 error: str = "" 

30 

31 

32class BatchProcessor: 

33 """异步批量关键词处理器. 

34 

35 支持并发处理、失败重试和进度回调。 

36 

37 Example: 

38 >>> processor = BatchProcessor(planner) 

39 >>> results = await processor.process(["kw1", "kw2", "kw3"]) 

40 """ 

41 

42 def __init__( 

43 self, 

44 planner: "GoogleKeywordPlanner", 

45 max_concurrent: int = 5, 

46 max_retries: int = 3, 

47 ) -> None: 

48 """初始化. 

49 

50 Args: 

51 planner: GoogleKeywordPlanner 实例 

52 max_concurrent: 最大并发数 

53 max_retries: 最大重试次数 

54 """ 

55 self.planner = planner 

56 self.max_concurrent = max_concurrent 

57 self.max_retries = max_retries 

58 

59 async def process( 

60 self, 

61 keywords: list[str], 

62 progress_callback: Callable[[int, int], None] | None = None, 

63 ) -> list[BatchResult]: 

64 """批量处理关键词. 

65 

66 Args: 

67 keywords: 关键词列表 

68 progress_callback: 进度回调函数 (completed, total) 

69 

70 Returns: 

71 BatchResult 列表 

72 """ 

73 semaphore = asyncio.Semaphore(self.max_concurrent) 

74 completed = 0 

75 total = len(keywords) 

76 

77 async def process_one(keyword: str) -> BatchResult: 

78 nonlocal completed 

79 async with semaphore: 

80 for attempt in range(self.max_retries): 

81 try: 

82 data = await self.planner.get_keyword_data(keyword) 

83 result = BatchResult(keyword=keyword, data=data, success=True) 

84 break 

85 except Exception as e: 

86 if attempt == self.max_retries - 1: 

87 result = BatchResult(keyword=keyword, success=False, error=str(e)) 

88 else: 

89 await asyncio.sleep(2**attempt) # 指数退避 

90 completed += 1 

91 if progress_callback: 

92 progress_callback(completed, total) 

93 return result 

94 

95 tasks = [process_one(kw) for kw in keywords] 

96 return await asyncio.gather(*tasks) 

97 

98 async def process_with_progress(self, keywords: list[str]) -> dict[str, Any]: 

99 """处理并返回进度信息. 

100 

101 Returns: 

102 包含结果、成功数、失败数的字典 

103 """ 

104 results = await self.process(keywords) 

105 

106 successful = [r for r in results if r.success] 

107 failed = [r for r in results if not r.success] 

108 

109 return { 

110 "results": results, 

111 "successful_count": len(successful), 

112 "failed_count": len(failed), 

113 "total": len(keywords), 

114 }