Coverage for src / beautyspot / lifecycle.py: 97%
79 statements
« prev ^ index » next coverage.py v7.13.2, created at 2026-03-11 19:10 +0900
« prev ^ index » next coverage.py v7.13.2, created at 2026-03-11 19:10 +0900
1# src/beautyspot/lifecycle.py
3import fnmatch
4import re
5import threading
6from dataclasses import dataclass
7from datetime import timedelta
8from typing import Optional, List, Union
9from beautyspot.exceptions import ValidationError
11# Regex for parsing retention strings (e.g., "7d", "12h", "30m", "10s")
12_TIME_PATTERN = re.compile(r"^(\d+)([dhms])$")
15class _ForeverSentinel:
16 """ライフサイクルポリシーを明示的にバイパスし、無期限保持を指定するセンチネル。
18 ``Retention.FOREVER`` として公開され、``@spot.mark(retention=Retention.FOREVER)``
19 のようにデコレータに渡すことで、グローバルなライフサイクルポリシーが設定されていても
20 そのキャッシュエントリを無期限に保持できます。
21 """
23 _instance: "_ForeverSentinel | None" = None
24 _lock = threading.Lock()
26 def __new__(cls) -> "_ForeverSentinel":
27 if cls._instance is None:
28 with cls._lock:
29 # ダブルチェック: PEP 703 (free-threading) 環境で
30 # 複数スレッドが同時に __new__ を呼んだ場合の重複生成を防止。
31 if cls._instance is None: 31 ↛ 33line 31 didn't jump to line 33
32 cls._instance = super().__new__(cls)
33 return cls._instance
35 def __repr__(self) -> str:
36 return "Retention.FOREVER"
38 def __bool__(self) -> bool:
39 return True
42_FOREVER = _ForeverSentinel()
45# parse_retention の受け取り可能な型 (FOREVER sentinel を含む)
46RetentionSpec = Union[str, timedelta, int, float, _ForeverSentinel, None]
49def parse_retention(
50 value: RetentionSpec
51) -> Optional[timedelta]:
52 """
53 Helper function to normalize retention specification to timedelta.
54 None means 'indefinite' (defers to lifecycle policy).
55 int is treated as 'seconds'.
57 Note:
58 ``_ForeverSentinel`` (``Retention.FOREVER``) は本関数では処理しません。
59 呼び出し元 (``Spot._calculate_expires_at``) で事前にチェックしてください。
60 """
61 if value is None:
62 return None
64 if isinstance(value, timedelta):
65 if value.total_seconds() <= 0:
66 raise ValidationError(f"Retention timedelta must be positive, got {value}.")
67 return value
69 if isinstance(value, (int, float)):
70 if value <= 0:
71 raise ValidationError(
72 f"Retention must be a positive number (seconds), got {value}."
73 )
74 return timedelta(seconds=value)
76 if isinstance(value, str):
77 match = _TIME_PATTERN.match(value)
78 if not match:
79 raise ValidationError(
80 f"Invalid retention format: '{value}'. Use format like '7d', '12h', '30m', '10s'."
81 )
83 amount, unit = int(match.group(1)), match.group(2)
84 if amount <= 0:
85 raise ValidationError(
86 f"Retention duration must be positive, got '{value}'."
87 )
88 if unit == "d":
89 return timedelta(days=amount)
90 elif unit == "h":
91 return timedelta(hours=amount)
92 elif unit == "m":
93 return timedelta(minutes=amount)
94 elif unit == "s": 94 ↛ 97line 94 didn't jump to line 97 because the condition on line 94 was always true
95 return timedelta(seconds=amount)
97 raise ValidationError(
98 f"Retention must be str, int, float, or timedelta, got {type(value)}"
99 )
102class Retention:
103 """Constants for retention policies.
105 Attributes:
106 INDEFINITE: ライフサイクルポリシーに委ねるデフォルト値 (None)。
107 ポリシーが設定されている場合はそのルールに従い、
108 未設定の場合は無期限保持となります。
109 FOREVER: ライフサイクルポリシーを明示的にバイパスし、
110 このキャッシュエントリを常に無期限保持することを宣言します。
111 ``@spot.mark(retention=Retention.FOREVER)`` で使用します。
112 """
114 INDEFINITE = None
115 FOREVER: _ForeverSentinel = _FOREVER
118@dataclass
119class Rule:
120 """
121 A rule defining retention policy based on function name pattern.
122 """
124 pattern: str
125 retention: Union[str, timedelta, int, None]
128class LifecyclePolicy:
129 """
130 Manages data retention policies based on function names.
131 """
133 def __init__(
134 self,
135 rules: List[Rule],
136 default_retention: Union[str, timedelta, int, float, None] = "30d",
137 ):
138 if isinstance(default_retention, _ForeverSentinel): 138 ↛ 139line 138 didn't jump to line 139 because the condition on line 138 was never true
139 raise ValidationError(
140 "Retention.FOREVER cannot be used as default_retention. "
141 "Use it on individual @spot.mark(retention=Retention.FOREVER) instead."
142 )
143 self.rules = rules
144 self._default_retention = parse_retention(default_retention)
146 def resolve(self, func_name: str) -> Optional[timedelta]:
147 """
148 Find the first matching rule for the given function name.
149 Returns the retention timedelta, or the default retention if no match.
150 """
151 for rule in self.rules:
152 if fnmatch.fnmatch(func_name, rule.pattern):
153 return parse_retention(rule.retention)
154 return self._default_retention
156 def resolve_with_fallback(
157 self, func_identifier: str, func_name: str
158 ) -> Optional[timedelta]:
159 """
160 Resolve retention using the fully-qualified identifier first, then
161 fall back to the short function name for backward compatibility.
162 """
163 for rule in self.rules:
164 if fnmatch.fnmatch(func_identifier, rule.pattern):
165 return parse_retention(rule.retention)
167 for rule in self.rules:
168 if fnmatch.fnmatch(func_name, rule.pattern):
169 return parse_retention(rule.retention)
171 return self._default_retention
173 @classmethod
174 def default(cls) -> "LifecyclePolicy":
175 """Default policy: 30-day retention."""
176 return cls(rules=[], default_retention="30d")