Coverage for src / keyword_research / google_planner.py: 100%

51 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-02-13 20:29 +0800

1"""Google Keyword Planner API 封装. 

2 

3提供 Google Ads API 的关键词研究功能, 

4支持异步调用、结果缓存和限流控制。 

5""" 

6 

7import asyncio 

8import json 

9from dataclasses import asdict, dataclass 

10from datetime import datetime, timedelta 

11from pathlib import Path 

12from typing import Any 

13 

14import aiohttp 

15 

16from src.utils.rate_limiter import TokenBucketRateLimiter 

17 

18 

19@dataclass 

20class KeywordData: 

21 """关键词数据类. 

22 

23 Attributes: 

24 keyword: 关键词 

25 avg_monthly_searches: 平均月搜索量 

26 competition: 竞争度 (LOW, MEDIUM, HIGH) 

27 low_cpc: 最低 CPC 

28 high_cpc: 最高 CPC 

29 trend: 12个月搜索趋势列表 

30 fetched_at: 数据获取时间 

31 """ 

32 

33 keyword: str 

34 avg_monthly_searches: int 

35 competition: str 

36 low_cpc: float 

37 high_cpc: float 

38 trend: list[int] 

39 fetched_at: datetime 

40 

41 def to_dict(self) -> dict[str, Any]: 

42 """转换为字典.""" 

43 data = asdict(self) 

44 data["fetched_at"] = self.fetched_at.isoformat() 

45 return data 

46 

47 

48class GoogleKeywordPlanner: 

49 """Google Keyword Planner API 封装. 

50 

51 提供关键词数据获取、缓存和限流功能。 

52 支持异步上下文管理器。 

53 

54 Example: 

55 >>> async with GoogleKeywordPlanner("api_key") as planner: 

56 ... data = await planner.get_keyword_data("seo tools") 

57 """ 

58 

59 def __init__( 

60 self, 

61 api_key: str, 

62 cache_dir: str = ".cache", 

63 cache_ttl_hours: int = 24, 

64 ): 

65 """初始化. 

66 

67 Args: 

68 api_key: Google Ads API 密钥 

69 cache_dir: 缓存目录 

70 cache_ttl_hours: 缓存有效期(小时) 

71 """ 

72 self.api_key = api_key 

73 self.cache_dir = Path(cache_dir) 

74 self.cache_dir.mkdir(exist_ok=True) 

75 self.cache_ttl = timedelta(hours=cache_ttl_hours) 

76 self.rate_limiter = TokenBucketRateLimiter() 

77 self.session: aiohttp.ClientSession | None = None 

78 

79 async def __aenter__(self) -> "GoogleKeywordPlanner": 

80 """异步上下文入口.""" 

81 self.session = aiohttp.ClientSession() 

82 return self 

83 

84 async def __aexit__( 

85 self, 

86 exc_type: type | None, 

87 exc_val: BaseException | None, 

88 exc_tb: Any | None, 

89 ) -> None: 

90 """异步上下文出口.""" 

91 if self.session: 

92 await self.session.close() 

93 

94 def _get_cache_path(self, keyword: str) -> Path: 

95 """获取缓存文件路径.""" 

96 safe_keyword = "".join(c if c.isalnum() else "_" for c in keyword) 

97 return self.cache_dir / f"kw_{safe_keyword}.json" 

98 

99 def _is_cache_valid(self, cache_path: Path) -> bool: 

100 """检查缓存是否有效.""" 

101 if not cache_path.exists(): 

102 return False 

103 mtime = datetime.fromtimestamp(cache_path.stat().st_mtime) 

104 return datetime.now() - mtime < self.cache_ttl 

105 

106 async def _fetch_from_api(self, keyword: str) -> KeywordData: 

107 """从 API 获取数据.""" 

108 async with self.rate_limiter: 

109 # TODO: 实现真实 API 调用 

110 # 当前使用模拟数据 

111 await asyncio.sleep(0.1) # 模拟 API 延迟 

112 return KeywordData( 

113 keyword=keyword, 

114 avg_monthly_searches=1000, 

115 competition="MEDIUM", 

116 low_cpc=0.5, 

117 high_cpc=2.0, 

118 trend=[800, 900, 1000, 1100, 1000, 900, 1000, 1100, 1200, 1100, 1000, 1000], 

119 fetched_at=datetime.now(), 

120 ) 

121 

122 async def get_keyword_data(self, keyword: str) -> KeywordData: 

123 """获取关键词数据. 

124 

125 优先从缓存获取,缓存无效则从 API 获取。 

126 

127 Args: 

128 keyword: 关键词 

129 

130 Returns: 

131 KeywordData 对象 

132 """ 

133 cache_path = self._get_cache_path(keyword) 

134 

135 # 检查缓存 

136 if self._is_cache_valid(cache_path): 

137 with open(cache_path, encoding="utf-8") as f: 

138 data = json.load(f) 

139 data["fetched_at"] = datetime.fromisoformat(data["fetched_at"]) 

140 return KeywordData(**data) 

141 

142 # 从 API 获取 

143 result = await self._fetch_from_api(keyword) 

144 

145 # 保存缓存 

146 with open(cache_path, "w", encoding="utf-8") as f: 

147 json.dump(result.to_dict(), f, indent=2) 

148 

149 return result