Coverage for src / keyword_research / google_planner.py: 100%
51 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-13 20:29 +0800
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-13 20:29 +0800
1"""Google Keyword Planner API 封装.
3提供 Google Ads API 的关键词研究功能,
4支持异步调用、结果缓存和限流控制。
5"""
7import asyncio
8import json
9from dataclasses import asdict, dataclass
10from datetime import datetime, timedelta
11from pathlib import Path
12from typing import Any
14import aiohttp
16from src.utils.rate_limiter import TokenBucketRateLimiter
19@dataclass
20class KeywordData:
21 """关键词数据类.
23 Attributes:
24 keyword: 关键词
25 avg_monthly_searches: 平均月搜索量
26 competition: 竞争度 (LOW, MEDIUM, HIGH)
27 low_cpc: 最低 CPC
28 high_cpc: 最高 CPC
29 trend: 12个月搜索趋势列表
30 fetched_at: 数据获取时间
31 """
33 keyword: str
34 avg_monthly_searches: int
35 competition: str
36 low_cpc: float
37 high_cpc: float
38 trend: list[int]
39 fetched_at: datetime
41 def to_dict(self) -> dict[str, Any]:
42 """转换为字典."""
43 data = asdict(self)
44 data["fetched_at"] = self.fetched_at.isoformat()
45 return data
48class GoogleKeywordPlanner:
49 """Google Keyword Planner API 封装.
51 提供关键词数据获取、缓存和限流功能。
52 支持异步上下文管理器。
54 Example:
55 >>> async with GoogleKeywordPlanner("api_key") as planner:
56 ... data = await planner.get_keyword_data("seo tools")
57 """
59 def __init__(
60 self,
61 api_key: str,
62 cache_dir: str = ".cache",
63 cache_ttl_hours: int = 24,
64 ):
65 """初始化.
67 Args:
68 api_key: Google Ads API 密钥
69 cache_dir: 缓存目录
70 cache_ttl_hours: 缓存有效期(小时)
71 """
72 self.api_key = api_key
73 self.cache_dir = Path(cache_dir)
74 self.cache_dir.mkdir(exist_ok=True)
75 self.cache_ttl = timedelta(hours=cache_ttl_hours)
76 self.rate_limiter = TokenBucketRateLimiter()
77 self.session: aiohttp.ClientSession | None = None
79 async def __aenter__(self) -> "GoogleKeywordPlanner":
80 """异步上下文入口."""
81 self.session = aiohttp.ClientSession()
82 return self
84 async def __aexit__(
85 self,
86 exc_type: type | None,
87 exc_val: BaseException | None,
88 exc_tb: Any | None,
89 ) -> None:
90 """异步上下文出口."""
91 if self.session:
92 await self.session.close()
94 def _get_cache_path(self, keyword: str) -> Path:
95 """获取缓存文件路径."""
96 safe_keyword = "".join(c if c.isalnum() else "_" for c in keyword)
97 return self.cache_dir / f"kw_{safe_keyword}.json"
99 def _is_cache_valid(self, cache_path: Path) -> bool:
100 """检查缓存是否有效."""
101 if not cache_path.exists():
102 return False
103 mtime = datetime.fromtimestamp(cache_path.stat().st_mtime)
104 return datetime.now() - mtime < self.cache_ttl
106 async def _fetch_from_api(self, keyword: str) -> KeywordData:
107 """从 API 获取数据."""
108 async with self.rate_limiter:
109 # TODO: 实现真实 API 调用
110 # 当前使用模拟数据
111 await asyncio.sleep(0.1) # 模拟 API 延迟
112 return KeywordData(
113 keyword=keyword,
114 avg_monthly_searches=1000,
115 competition="MEDIUM",
116 low_cpc=0.5,
117 high_cpc=2.0,
118 trend=[800, 900, 1000, 1100, 1000, 900, 1000, 1100, 1200, 1100, 1000, 1000],
119 fetched_at=datetime.now(),
120 )
122 async def get_keyword_data(self, keyword: str) -> KeywordData:
123 """获取关键词数据.
125 优先从缓存获取,缓存无效则从 API 获取。
127 Args:
128 keyword: 关键词
130 Returns:
131 KeywordData 对象
132 """
133 cache_path = self._get_cache_path(keyword)
135 # 检查缓存
136 if self._is_cache_valid(cache_path):
137 with open(cache_path, encoding="utf-8") as f:
138 data = json.load(f)
139 data["fetched_at"] = datetime.fromisoformat(data["fetched_at"])
140 return KeywordData(**data)
142 # 从 API 获取
143 result = await self._fetch_from_api(keyword)
145 # 保存缓存
146 with open(cache_path, "w", encoding="utf-8") as f:
147 json.dump(result.to_dict(), f, indent=2)
149 return result