Coverage for src / utils / validators.py: 98%
55 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-13 20:29 +0800
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-13 20:29 +0800
1"""输入验证模块.
3提供关键词、URL、内容长度等通用验证函数。
4所有验证函数返回 (bool, str) 元组,表示验证结果和错误信息。
5"""
7from __future__ import annotations
9import re
10from typing import Any
13def validate_keyword(keyword: str, min_length: int = 2, max_length: int = 100) -> tuple[bool, str]:
14 """验证关键词有效性.
16 Args:
17 keyword: 待验证的关键词
18 min_length: 最小长度,默认 2
19 max_length: 最大长度,默认 100
21 Returns:
22 (is_valid, error_message) 元组
24 Example:
25 >>> validate_keyword("digital marketing")
26 (True, "")
27 >>> validate_keyword("a")
28 (False, "关键词长度必须在 2-100 之间")
29 """
30 if not keyword or not isinstance(keyword, str):
31 return False, "关键词不能为空"
33 keyword = keyword.strip()
35 if len(keyword) < min_length:
36 return False, f"关键词长度必须至少 {min_length} 个字符"
38 if len(keyword) > max_length:
39 return False, f"关键词长度不能超过 {max_length} 个字符"
41 # 检查是否包含非法字符 (XSS/SQL注入防护)
42 dangerous_chars = r'[<>{}"\';\-]'
43 if re.search(dangerous_chars, keyword):
44 return False, "关键词包含非法字符"
46 # 检查SQL注入关键词
47 sql_keywords = ["drop", "delete", "insert", "update", "select", "union", "--"]
48 keyword_lower = keyword.lower()
49 for sql_kw in sql_keywords:
50 if sql_kw in keyword_lower:
51 return False, "关键词包含非法字符"
53 # 检查路径遍历攻击
54 path_traversal_patterns = [
55 "..", # Directory traversal
56 "%2e%2e", # URL-encoded ..
57 "%252e%252e", # Double URL-encoded ..
58 ".%00", # Null byte injection attempt
59 ]
60 for pattern in path_traversal_patterns:
61 if pattern in keyword_lower:
62 return False, "关键词包含非法字符"
64 return True, ""
67def validate_url(url: str, allowed_schemes: list[str] | None = None) -> tuple[bool, str]:
68 """验证 URL 有效性.
70 Args:
71 url: 待验证的 URL
72 allowed_schemes: 允许的协议列表,默认 ["http", "https"]
74 Returns:
75 (is_valid, error_message) 元组
77 Example:
78 >>> validate_url("https://example.com")
79 (True, "")
80 >>> validate_url("ftp://example.com")
81 (False, "URL 协议必须是: http, https")
82 """
83 if not url or not isinstance(url, str):
84 return False, "URL 不能为空"
86 if allowed_schemes is None:
87 allowed_schemes = ["http", "https"]
89 # 基本 URL 模式匹配
90 pattern = r"^(?P<scheme>[^:]+)://(?P<host>[^/]+)(?P<path>/.*)?$"
91 match = re.match(pattern, url.strip())
93 if not match:
94 return False, "URL 格式无效"
96 scheme = match.group("scheme").lower()
97 if scheme not in allowed_schemes:
98 return False, f"URL 协议必须是: {', '.join(allowed_schemes)}"
100 return True, ""
103def validate_content_length(
104 content: Any, min_length: int = 0, max_length: int = 50000
105) -> tuple[bool, str]:
106 """验证内容长度.
108 Args:
109 content: 待验证的内容
110 min_length: 最小长度,默认 0
111 max_length: 最大长度,默认 50000
113 Returns:
114 (is_valid, error_message) 元组
115 """
116 # Runtime type check for non-string types
117 if not isinstance(content, str):
118 return False, "内容必须是字符串"
120 length = len(content)
122 if length < min_length:
123 return False, f"内容长度必须至少 {min_length} 个字符"
125 if length > max_length:
126 return False, f"内容长度不能超过 {max_length} 个字符"
128 return True, ""
131def validate_platform(
132 platform: str, supported_platforms: list[str] | None = None
133) -> tuple[bool, str]:
134 """验证平台名称有效性.
136 Args:
137 platform: 平台名称
138 supported_platforms: 支持的平台列表,默认
139 ["quora", "medium", "reddit", "linkedin", "twitter"]
141 Returns:
142 (is_valid, error_message) 元组
143 """
144 if supported_platforms is None:
145 supported_platforms = ["quora", "medium", "reddit", "linkedin", "twitter"]
147 if not platform or not isinstance(platform, str):
148 return False, "平台名称不能为空"
150 platform = platform.lower().strip()
152 if platform not in supported_platforms:
153 return False, f"不支持的平台: {platform},支持的平台: {', '.join(supported_platforms)}"
155 return True, ""