Metadata-Version: 2.4
Name: pynomadic
Version: 0.1.8
Summary: 一个python工具包,包括自动配置系统,常用工具包,控制台输出系统,日志系统,命名系统,缓存系统,返回值封装类
License: MIT
License-File: LICENSE
Author: nomadicooer
Author-email: nomadicooer@qq.com
Requires-Python: >=3.13
Classifier: License :: OSI Approved :: MIT License
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.13
Classifier: Programming Language :: Python :: 3.14
Requires-Dist: cryptography (>=46.0.5,<47.0.0)
Project-URL: Homepage, https://gitee.com/nomadicooer/pynomad
Project-URL: Issues, https://gitee.com/nomadicooer/pynomad/issues
Project-URL: Repository, https://gitee.com/nomadicooer/pynomad
Description-Content-Type: text/markdown

# PyNomad

> **重要说明**：本框架大部分代码由 AI 生成，README.md 文档完全由 AI 生成，可能存在部分说明错误。
>
> 具体使用方式可以参见 pynomad 项目的测试用例，仓库地址：https://gitee.com/nomadicooer/pynomad.git
>
> 也可以参见 findata 项目实际项目使用示例：https://gitee.com/nomadicooer/findata.git
>
> ## 关于配置 Schema
>
> 自动配置生成了部分 schema，可以在 `settings.toml` 文件中引用：
> ```toml
> #:schema https://gitee.com/nomadicooer/schema/raw/master/settings.schema.json
> ```
>
> **注意**：schema 文件对 logging 配置支持不完善，使用 `#:schema url` 方式引用时可能遇到问题。建议安装 **Even Better TOML** 插件以获得更好的支持。
>
> 本项目开发主要使用 VSCode，由于个人时间原因，代码更新频率可能较低，可能存在一些 bug。推荐的使用方式是**下载源码，然后直接引用到项目中**，如遇到 bug 请根据实际需求自行修改。
>
> ## 项目管理方式
>
> 项目使用 **poetry** 管理项目。
>
> - **poetry 安装项目的方式**：
>   ```bash
>   poetry add pynomadic
>   ```
>
> - **poetry 引用本地项目的方式**：
>   ```toml
>   [tool.poetry]
>   packages = [{ include = "pynomad", from = "src" }]
>   ```
>
>   必要时在 `dependencies` 中也应该引用，避免首次引用安装不了：
>   ```toml
>   dependencies = [
>       "pynomadic @ file:///${PROJECT_ROOT}/../pynomad",
>       "cryptography (>=46.0.5,<47.0.0)",
>   ]
>   ```
>
>   其中 `${PROJECT_ROOT}` 需要替换为你的项目根目录路径。

---

Python 通用工具库，提供常用工具函数、装饰器和类型定义。内置**智能配置管理**和**多级缓存系统**，简化应用开发流程。

## 安装

```bash
pip install pynomadic
```

## 快速开始

```python
from pynomad import dt, environ, get_logger, Result, memcached

# 日期时间工具
dt.parse_date("2024-01-15")  # date(2024, 1, 15)

# 环境信息
environ.project_dir  # 项目根目录

# 日志
logger = get_logger(__name__)
logger.info("Hello, PyNomad!")

# 缓存装饰器
@memcached
def expensive_func(x: int) -> int:
    return x * x

# 结果处理
result = Result.success(data="OK")
logger.info(result.data)  # "OK"
```

---

## 目录

- [Common - 通用工具](#common---通用工具)
- [Naming - 命名规范](#naming---命名规范)
- [Config - 配置管理](#config---配置管理)
- [Result - 结果处理](#result---结果处理)
- [Logger - 日志系统](#logger---日志系统)
- [Cache - 缓存系统](#cache---缓存系统)

---

## Common - 通用工具

### DateTimeUtil - 日期时间工具

智能日期时间解析工具，支持多种格式自动识别。

```python
from pynomad import dt

# 智能识别日期格式
dt.parse_date("2024-01-15")      # date(2024, 1, 15)
dt.parse_date("2024/01/15")      # date(2024, 1, 15)
dt.parse_date("2024年01月15日")   # date(2024, 1, 15)
dt.parse_date("20240115")        # date(2024, 1, 15)

# 指定格式解析
dt.parse_date("01/15/2024", "%m/%d/%Y")

# 智能识别日期时间
dt.parse_datetime("2024-01-15 10:30:45")
dt.parse_datetime("2024/01/15 10:30:45")

# 格式化输出
dt.format_date(date(2024, 1, 15), "%Y-%m-%d")     # "2024-01-15"
dt.format_datetime(datetime.now(), "%Y-%m-%d %H:%M:%S")

# 年份边界
dt.year_start("2024-06-15")       # date(2024, 1, 1)
dt.year_end("2024-06-15")         # date(2024, 12, 31)

# 时间戳转换
dt.datetime_to_timestamp(datetime(2024, 1, 15, 10, 30, 45))  # 1705297845.0
dt.timestamp_to_datetime(1705297845)  # datetime(2024, 1, 15, 10, 30, 45)

# 获取当前时间
dt.today()
```

### Environment - 环境信息

获取项目环境信息的单例类。

```python
from pynomad import environ, get_logger

logger = get_logger(__name__)

# 获取项目根目录
project_dir = environ.project_dir
logger.info(project_dir)  # Path 对象，项目绝对路径

# 获取项目名称
project_name = environ.project_name
logger.info(project_name)  # 项目名称

# 获取平台信息
environ.is_windows   # 是否 Windows
environ.is_linux     # 是否 Linux
environ.is_macos     # 是否 macOS

# 获取机器码（用于设备唯一标识）
machine_code = environ.machine_code      # hex 格式，64字符
machine_code_b64 = environ.machine_code_base64  # base64 格式
logger.info(machine_code)  # "a1b2c3d4..."
logger.info(machine_code_b64)  # "abc123..."
```

### EnhancedDict - 增强字典

支持属性访问和敏感信息脱敏的字典类。

```python
from pynomad import EnhancedDict, get_logger

logger = get_logger(__name__)

# 创建增强字典
ed = EnhancedDict({"name": "Alice", "age": 30, "address": {"city": "Beijing"}})

# 属性访问
logger.info(ed.name)       # "Alice"
logger.info(ed.address.city)  # "Beijing"

# 嵌套访问
logger.info(ed.get_nested("address.city"))  # "Beijing"

# 敏感信息脱敏
ed["password"] = "123456"
ed["phone"] = "13800138000"
logger.info(ed.mask_sensitive())  # 脱敏后的字典

# 类型检查
ed.check_type("age", int)  # True
ed.check_type("name", str)  # True
```

### Exceptions - 异常类

统一的异常定义，与 Result 的错误分类对应。

```python
from pynomad import ex, get_logger

logger = get_logger(__name__)

# 客户端异常
try:
    raise ex.ClientException("用户输入的参数无效")
except ex.ClientException as e:
    logger.info(str(e))  # "用户输入的参数无效"

# 验证异常（继承自 ClientException）
try:
    raise ex.ValidationException("必填字段 name 不能为空")
except ex.ValidationException as e:
    logger.info(str(e))

# 未找到异常（继承自 ClientException）
try:
    raise ex.NotFoundException("用户 ID 12345 不存在")
except ex.ClientException as e:  # 可以用父类捕获
    logger.info(str(e))

# 冲突异常
raise ex.ConflictException("邮箱 alice@example.com 已被注册")

# 权限异常
raise ex.PermissionException("您没有权限访问此资源")

# 网络异常
raise ex.NetworkException("无法连接到服务器")

# 请求异常
raise ex.RequestException("HTTP 请求失败，状态码: 500")

# 登录异常（继承自 RequestException）
raise ex.LoginException("用户名或密码错误")

# 超时异常（继承自 NetworkException）
raise ex.TimeoutException("操作超时，等待时间超过 30 秒")

# 限流异常（继承自 NetworkException）
raise ex.RateLimitException("请求过于频繁，请稍后重试")

# 服务端异常
raise ex.ServerException("服务器内部错误")

# 第三方服务异常
raise ex.ThirdPartyException("微信 API 调用失败")

# 缓存相关异常
raise ex.CacheMissException("缓存键 'user:123' 不存在")
raise ex.CacheExpiredException("缓存项 'user:123' 已过期")
raise ex.CacheValueException("缓存值格式不正确，预期为 JSON 字符串")
raise ex.CacheTypeNotInitializedException("缓存类型无法推导，请先设置一个值")
raise ex.CacheException("序列化失败: 无法序列化对象")

# 缓存刷新需求异常（支持携带新参数重新调用）
raise ex.NeedsRefreshException(
    "部分数据缺失",
    new_args=("2024-01-01", "2024-12-31"),
    new_kwargs={"force_refresh": True}
)
```

**异常继承层次**:

- `ClientException` → `ValidationException`, `NotFoundException`, `ConflictException`, `PermissionException`, `CacheMissException`, `CacheExpiredException`, `CacheValueException`, `CacheTypeNotInitializedException`
- `NetworkException` → `TimeoutException`, `RateLimitException`
- `RequestException` → `LoginException`
- `ServerException` → `CacheException`

### Decorator - 装饰器

常用装饰器集合。

```python
from pynomad import singleton, retry, thread_safe, get_logger

logger = get_logger(__name__)

# 单例装饰器
@singleton
class DatabaseConnection:
    def __init__(self):
        logger.info("初始化数据库连接")

db1 = DatabaseConnection()
db2 = DatabaseConnection()
logger.info(db1 is db2)  # True

# 重试装饰器
@retry(max_attempts=3, delay=1.0)
def fetch_data():
    # 可能失败的操作
    pass

# 线程安全装饰器
@thread_safe
class Counter:
    def __init__(self):
        self.value = 0

    def increment(self):
        self.value += 1
```

### Network - 网络工具

网络连接检查工具。

```python
from pynomad import check_network, stop_check_network, get_logger

logger = get_logger(__name__)

# 检查网络连接
result = check_network(host="www.google.com", port=80)
logger.info(result)  # True 或 False

# 停止后台网络检查线程
stop_check_network()
```

### Types - 类型定义

通用类型常量。

```python
from pynomad.common.types import Level, OutputTarget

# 日志级别
Level.TRACE    # 追踪
Level.DEBUG    # 调试
Level.INFO     # 信息
Level.WARN     # 警告
Level.ERROR    # 错误
Level.FATAL    # 致命

# 输出目标
OutputTarget.CONSOLE
OutputTarget.FILE
OutputTarget.BOTH
```

---

## Naming - 命名规范

命名检查、风格检测和转换工具。

```python
from pynomad import naming, get_logger

logger = get_logger(__name__)

# === 标识符和关键字检查 ===

# Python 标识符检查
naming.is_py_identifier("my_var")      # True
naming.is_py_identifier("123invalid")  # False

# 通用标识符检查
naming.is_identifier("valid-name")    # True

# Python 关键字检查
naming.is_py_keyword("class")        # True
naming.is_py_keyword("myvar")         # False

# 软关键字检查
naming.is_pysoftkeyword("match")     # True

# 其他语言关键字
naming.is_java_keyword("public")     # True
naming.is_ts_keyword("interface")    # True

# 通用关键字检查（所有语言）
naming.is_keyword("class")           # True
naming.is_keyword("myvar")           # False

# === 命名风格检测 ===

# 基本风格检测
naming.is_pascal("MyClass")         # True
naming.is_camel("myVariable")       # True
naming.is_snake("my_variable")      # True
naming.is_kebab("my-variable")      # True

# 短横线风格变体
naming.is_standard_kebab("my-variable")   # True
naming.is_upper_kebab("MY-VARIABLE")       # True
naming.is_pascal_kebab("My-Variable")      # True

# 蛇形风格变体
naming.is_standard_snake("my_variable")    # True
naming.is_upper_snake("MY_VARIABLE")       # True
naming.is_pascal_snake("My_Variable")      # True
naming.is_magic_snake("__magic__")         # True
naming.is_protected_snake("_protected")    # True
naming.is_private_snake("__private")      # True
naming.is_end_snake("end_")                # True

# 检测风格
naming.detect_style("MyClass")      # NameStyle.PASCAL
naming.detect_style("my_variable") # NameStyle.STANDARD_SNAKE
naming.detect_style("INVALID@NAME") # NameStyle.INVALID

# 检测可能的风格（返回多个匹配）
naming.detect_possible_styles("save")  # [NameStyle.STANDARD_SNAKE, NameStyle.STANDARD_KEBAB, ...]

# 无效命名检查
naming.is_invalid("INVALID@NAME")  # True

# === 命名转换 ===

# 转为帕斯卡命名
naming.pascalize("my_variable")    # "MyVariable"
naming.pascalize("my-variable")    # "MyVariable"
naming.pascalize("myVariable")     # "MyVariable"

# 转为驼峰命名
naming.camelize("MyClass")         # "myClass"
naming.camelize("my_variable")     # "myVariable"

# 转为短横线命名
naming.kebabize("MyClass")                     # "my-class"
naming.kebabize("my_variable")                 # "my-variable"
naming.kebabize("my_variable", to_style=naming.NameStyle.UPPER_KEBAB)  # "MY-VARIABLE"

# 转为蛇形命名
naming.snakeize("MyClass")                     # "my_class"
naming.snakeize("my-variable")                 # "my_variable"
naming.snakeize("MyClass", to_style=naming.NameStyle.UPPER_SNAKE)      # "MY_CLASS"
naming.snakeize("method", to_style=naming.NameStyle.MAGIC_SNAKE)       # "__method__"
naming.snakeize("protected", to_style=naming.NameStyle.PROTECTED_SNAKE)  # "_protected"
naming.snakeize("private", to_style=naming.NameStyle.PRIVATE_SNAKE)      # "__private"

# 从指定风格转换
naming.snakeize("MyVariable", from_style=naming.NameStyle.PASCAL)  # "my_variable"

# 异常处理
naming.snakeize("INVALID@NAME", exception=False, return_empty=True)  # ""

# === 字典转换 ===

config = {
    "UserName": "Alice",
    "UserProfile": {
        "EmailAddress": "alice@example.com",
        "PhoneNumber": "1234567890",
    },
    "UserSettings": ["Setting1", "Setting2"],
}

# 转换字典键为 snake_case
naming.snakeize_dict(config)  # {"user_name": "Alice", "user_profile": {...}, "user_settings": [...]}
```

---

## Config - 配置管理

智能配置管理系统，支持环境切换和自动注入。

### 配置原理

#### 环境配置读取原理

配置系统通过以下优先级加载配置：

1. **环境后缀方式** (`[dev.database]`):
   - 读取 `active` 字段指定的环境
   - 只加载该环境下的配置项
   - 配置键为 `[<环境>.<前缀>.<字段>]`

2. **环境嵌套方式** (`[env.dev.database]`):
   - 读取 `active` 字段指定的环境
   - 从 `[env.<环境>.<前缀>]` 路径读取配置
   - 支持多个环境按顺序合并配置

3. **默认配置方式** (`[database]`):
   - 不使用环境前缀，直接从根节点读取
   - 适用于不需要环境切换的场景

**配置加载流程**:
```
settings.toml → LoaderManager → 环境解析 → 配置字典 → 注入到配置类
```

#### 自动配置原理

`@inject` 装饰器通过以下机制实现自动配置：

1. **单例模式**:
   - 每个配置类只创建一个实例
   - 多次调用构造函数返回同一对象

2. **配置注入**:
   - 根据 `prefix` 参数匹配配置键
   - 自动从配置字典读取并填充字段值
   - 支持类型转换（str → int/float/bool/Path）

3. **嵌套配置**:
   - 支持嵌套配置对象（属性也是 `@inject` 类）
   - 递归注入嵌套配置

4. **配置缓存**:
   - 使用 LRU 缓存配置字典
   - 避免重复读取文件

**注入流程**:
```
创建实例 → 检查单例 → 设置默认值 → 从配置字典注入值 → 返回实例
```

### 配置文件详解

#### 默认配置文件名

配置系统支持以下默认配置文件名（按优先级从高到低）：

- `pynomad` / `.pynomad`
- `config` / `.config`
- `setting` / `.setting`
- `settings` / `.settings`
- `<项目名>` / `.<项目名>`

#### 支持的文件格式

配置文件支持以下格式（按优先级从高到低）：

- `.toml` / `.tom` - TOML 格式（推荐）
- `.yaml` / `.yml` - YAML 格式
- `.json` - JSON 格式
- `.ini` - INI 格式

#### 默认解析路径

配置文件按以下路径顺序查找（优先级从高到低）：

1. **项目根目录**
   - 项目根目录
   - `config/` 子目录
   - `.config/` 子目录
   - `data/` 子目录
   - `.data/` 子目录
   - `.data/config/` 子目录

2. **外部配置目录**
   - `~/<项目名>/` - 用户目录
   - `<localdata>/<项目名>/` - 本地数据目录
   - `<config_home>/<项目名>/` - 配置目录
   - `~/.pynomad/` - pynomad 框架目录
   - `<project_data>/pynomad/` - 项目数据目录
   - `<config_home>/pynomad/` - 配置目录

3. **内置资源**
   - 项目包内的 `data/` 和 `config/` 目录
   - pynomad 框架包内的默认配置

#### 配置加载优先级

不同加载器的优先级（数值越小优先级越高）：

| 加载器类型 | 优先级 | 说明 |
|-----------|-------|------|
| 命令行加载器 | 10 | 命令行参数（最高优先级） |
| 环境变量加载器 | 19-20 | 环境变量 |
| 文件加载器 | 40-50 | 外部配置文件 |
| 资源加载器 | sys.maxsize | 项目内置资源（最低优先级） |

#### 配置合并规则

- **相同层级**：后加载的配置覆盖先加载的配置
- **不同层级**：深层配置不会被浅层配置覆盖
- **多个环境**：多个环境按顺序合并，后环境的配置覆盖先环境的配置

#### 路径展开变量

Path 类型字段支持内置路径变量展开：

**内置变量列表**：

| 变量名 | 说明 | 示例 |
|-------|------|------|
| `{workspace}` | 项目根目录 | `{workspace}/cache` |
| `{projectdir}` | 项目根目录（同上） | `{projectdir}/data` |
| `{projectname}` | 项目名称 | `{projectname}.log` |
| `{homedir}` | 用户主目录 | `{homedir}/.config` |
| `{home}` | 用户主目录（同上） | `{home}/.cache` |
| `{localdata}` | 本地数据目录 | `{localdata}/app` |
| `{roamingdata}` | 漫游数据目录（仅 Windows） | `{roamingdata}/app` |
| `{config}` | 配置目录 | `{config}/myapp` |
| `{cache}` | 缓存目录 | `{cache}/temp` |
| `{state}` | 状态目录 | `{state}/app` |
| `{tmp}` | 临时目录 | `{tmp}/upload` |
| `{app}` | 应用安装目录 | `{app}/data` |
| `{env:VAR_NAME}` | 环境变量 | `{env:USER_HOME}/data` |

**使用示例**：

```python
from pynomad import inject, field

@inject(prefix="app")
class AppProperties:
    cache_dir: Path = field(default="{cache}/myapp")
    data_dir: Path = field(default="{workspace}/data")
    config_dir: Path = field(default="{config}/myapp")
```

```toml
[app]
cache_dir = "{cache}/myapp"
data_dir = "{workspace}/data"
config_dir = "{config}/myapp"
```

### 配置文件方式

#### 默认配置方式

最简单的配置方式，直接在 `settings.toml` 中定义配置项，不需要环境配置：

```toml
[database]
host = "localhost"
port = 3306
name = "myapp"

[logging]
level.root = "INFO"
pretty = true

[app]
name = "MyApp"
version = "1.0.0"
```

```python
from pynomad import inject, field, get_logger

logger = get_logger(__name__)

@inject(prefix="database")
class DatabaseProperties:
    host: str = field(default="localhost")
    port: int = field(default=3306)
    name: str = field()

db_config = DatabaseProperties()
logger.info(db_config.host)  # "localhost"
logger.info(db_config.port)  # 3306
logger.info(db_config.name)  # "myapp"
```

#### 方式1：使用环境后缀

在 `settings.toml` 中定义多个环境：

```toml
environments = ["dev", "test", "prod"]
active = ["dev"]

[dev.database]
host = "localhost"
port = 3306
name = "myapp_dev"

[dev.logging]
level.root = "DEBUG"
pretty = true

[prod.database]
host = "prod.example.com"
port = 3306
name = "myapp_prod"

[prod.logging]
level.root = "INFO"
pretty = false
```

#### 方式2：使用 [env.<环境名>] 嵌套

```toml
active = ["development"]

[default]
app_name = "MyApp"
version = "1.0.0"

[env.development.database]
host = "localhost"
port = 3306

[env.production.database]
host = "prod.example.com"
port = 3306
```

### 属性类使用

定义配置类并使用 `@inject` 装饰器自动注入配置：

```python
from pynomad import inject, field, BuiltinProperties, get_logger

logger = get_logger(__name__)

@inject(prefix="database")
class DatabaseProperties:
    host: str = field(default="localhost")
    port: int = field(default=3306)
    name: str = field()

@inject(prefix="logging")
class LoggingProperties:
    level_root: str = field(default="INFO")
    pretty: bool = field(default=False)

# 使用配置
db_config = DatabaseProperties()
logger.info(db_config.host)  # "localhost" 或从配置文件读取
logger.info(db_config.port)  # 3306
```

**field 参数**:

```python
# default: 默认值
field(default="localhost")

# default_factory: 默认值工厂（用于可变对象）
field(default_factory=list)

# converter: 自定义类型转换器
field(converter=lambda x: int(x))

# alias: 配置键别名（单个别名）
field(alias="db_host")
```

**使用 field 别名示例**：

```python
from pynomad import inject, field, get_logger

logger = get_logger(__name__)

@inject(prefix="database")
class DatabaseProperties:
    host: str = field(default="localhost")
    port: int = field(default=3306)
    # 配置文件使用 db_name 而不是 name
    name: str = field(alias="db_name", default="mydb")

# 配置文件中：
# [database]
# host = "localhost"
# port = 3306
# db_name = "myapp_db"  # 别名映射到 name 字段
```

### @alias 装饰器

> ⚠️ **不建议使用**: 建议在配置文件中直接使用与 TypedDict 字段名一致的键名，避免额外的别名映射。

为 TypedDict 定义字段别名映射（仅用于 TypedDict）：

```python
from typing import TypedDict
from pynomad import inject, field, alias, get_logger

logger = get_logger(__name__)

@alias(pynomad_logging="pynomad.logging")
class LevelTypeDict(TypedDict):
    root: str
    pynomad: str
    pynomad_logging: str

@inject(prefix="logging")
class LoggingProperties:
    level: LevelTypeDict = field()

# 配置文件中：
# [logging.level]
# root = "INFO"
# pynomad = "DEBUG"
# pynomad.logging = "TRACE"
# 
# 注意：配置文件使用 "pynomad.logging"，但 TypedDict 字段名为 "pynomad_logging"
```

### 环境切换

> ⚠️ **不建议使用**: 运行时切换环境可能导致配置不一致，建议在应用启动时确定环境。

```python
from pynomad import set_active_environment, get_logger

logger = get_logger(__name__)

# 切换环境（不建议运行时切换）
set_active_environment("test")
set_active_environment("prod")

# 切换后，原有配置类引用自动更新
db_config = DatabaseProperties()
set_active_environment("prod")
logger.info(db_config.host)  # 自动更新为 production 环境配置
```

**推荐做法**: 通过配置文件的 `active` 字段指定环境，或者在应用启动时设置 `active` 环境变量。

### 自定义配置文件加载

> ⚠️ **不建议使用**: 建议直接在 `settings.toml` 中定义所有配置，避免使用 `add_file_loader` 等方法。

```python
from pynomad import add_file_loader, remove_loader, get_config
from pynomad import environ, get_logger

logger = get_logger(__name__)

# 添加自定义配置文件（不建议）
config_path = environ.project_dir / "config" / "custom.toml"
add_file_loader(config_path)

# 读取配置
config = get_config()
logger.info(config.get("custom.key"))

# 移除配置加载器
remove_loader(loader)
```

**推荐做法**: 直接在 `settings.toml` 中定义所有配置。

### 配置生成器

自动从配置文件生成配置类：

**配置文件示例 (settings.toml)**:

```toml
[database]
host = "localhost"
port = 3306
name = "myapp"
username = "root"
password = "password"

[logging]
level.root = "INFO"
pretty = true

[app]
name = "MyApp"
version = "1.0.0"
```

**生成配置类**:

```python
from pynomad import ConfigGenerator
from pathlib import Path

# 创建生成器
generator = ConfigGenerator(
    output_path=Path("config"),
    class_suffix="Properties",
    root_class_name="ProjectProperties",
)

# 生成配置类（从 settings.toml 自动读取）
generator.generate(class_type="dataclass")
```

> ⚠️ **注意**: 配置生成器会从 `settings.toml` 自动读取配置，无需使用 `add_file_loader`。

生成的配置类示例：

```python
@inject(prefix="database")
class DatabaseProperties:
    host: str = field(default="localhost")
    port: int = field(default=3306)

@inject(prefix="logging")
class LoggingProperties:
    level_root: str = field(default="INFO")
    pretty: bool = field(default=True)

@inject()
class ProjectProperties(BuiltinProperties):
    database: DatabaseProperties = field()
    logging: LoggingProperties = field()
```

### 单例模式

`@inject` 装饰的配置类是单例模式：

```python
from pynomad import inject, field, get_logger

logger = get_logger(__name__)

@inject(prefix="database")
class DatabaseProperties:
    host: str = field()

# 多次实例化返回同一对象
db1 = DatabaseProperties()
db2 = DatabaseProperties()
logger.info(db1 is db2)  # True
```

### 清除缓存

> ⚠️ **不建议使用**: 清除配置缓存可能导致配置不一致，仅用于测试环境。

```python
from pynomad import clear_config_cache

# 清除配置缓存（仅用于测试）
clear_config_cache()
```

---

## Result - 结果处理

统一的结果处理模式，优雅的错误处理。

### 基本使用

```python
from pynomad import Result, ResultAny, get_logger

logger = get_logger(__name__)

# 成功结果
result = Result.success(data={"id": 1, "name": "Alice"})
logger.info(result.data)  # {"id": 1, "name": "Alice"}
logger.info(result.msg)   # "操作成功"

# 失败结果
result = Result.fail(msg="操作失败", code=400)
logger.info(result.msg)   # "操作失败"
logger.info(result.code)  # 400

# 检查结果
if result.is_success():
    logger.info("成功！")
    logger.info(result.data)
else:
    logger.info(f"失败: {result.msg}")

# 链式处理
result = Result.success(10)
result = result.map(lambda x: x * 2)  # Result.success(20)
result = result.and_then(lambda x: Result.success(x + 5))  # Result.success(25)
```

### 错误处理

```python
from pynomad import Result, get_logger

logger = get_logger(__name__)

# 使用 or_else 提供默认值
result = Result.fail("失败")
default_value = result.or_else({"default": "value"})

# 使用 raise_on_error 抛出异常
result = Result.fail("操作失败", code=500)
try:
    data = result.raise_on_error()
except Exception as e:
    logger.info(f"捕获异常: {e}")
```

### Result 装饰器

```python
from pynomad.result.decorator import (
    result,
    ignore_result,
    client_result,
    network_result,
    server_result,
    third_party_result,
    json_result,
    dict_result,
)

# 基本结果装饰器
@result
def divide(a: int, b: int) -> float:
    if b == 0:
        raise ValueError("除数不能为零")
    return a / b

# Result[divide(10, 2)]  # Result.success(5.0)
# Result[divide(10, 0)]  # Result.fail("除数不能为零")

# 客户端错误装饰器
@client_result
def validate_user(user_id: int) -> dict:
    if user_id <= 0:
        raise ValueError("无效的用户ID")
    return {"id": user_id}

# 网络错误装饰器
@network_result
def fetch_data(url: str) -> str:
    response = requests.get(url)
    return response.text

# 服务端错误装饰器
@server_result
def process_request(data: dict) -> dict:
    # 处理服务端逻辑
    pass

# 第三方服务错误装饰器
@third_party_result
def call_external_api(api_key: str) -> dict:
    # 调用外部API
    pass

# JSON 解析装饰器
@json_result
def parse_json(json_str: str) -> dict:
    return json.loads(json_str)

# 字典转换装饰器
@dict_result
def to_dict(obj: Any) -> dict:
    return vars(obj)

# 忽略结果装饰器
@ignore_result
def log_operation(message: str) -> None:
    logger.info(message)
    # 无论是否抛出异常，都返回 Result.success(None)
```

### ResultHandler - 结果处理器

ResultHandler 允许自定义 Result 的初始化行为，实现：
- 统计错误信息（错误类型、消息、分类）
- 自定义错误码生成规则
- 添加错误码前缀
- 错误监控和告警

#### 基本概念

```python
from pynomad.result.result import ResultHandler, HandleResult, DefaultHandler
from pynomad.result.result import Result, StatusCategory

# ResultHandler 是抽象基类
class MyHandler(ResultHandler):
    def handle(
        self,
        data: object = None,
        category: StatusCategory = StatusCategory.SUCCESS,
        exception: Exception | None = None,
    ) -> HandleResult:
        # 自定义处理逻辑
        return HandleResult(
            code=category.exception_code(exception),
            msg=category.exception_message(exception),
            data=data,
        )

# 注册自定义 Handler
Result.set_handler(MyHandler())

# 恢复默认 Handler
Result.set_handler(DefaultHandler())
```

#### 统计错误信息

```python
from collections import Counter
from pynomad.result.result import ResultHandler, HandleResult, Result, StatusCategory
from pynomad import get_logger

logger = get_logger(__name__)

class StatisticsHandler(ResultHandler):
    """统计错误信息的处理器"""

    def __init__(self) -> None:
        self.exception_counter: Counter[str] = Counter()
        self.message_counter: Counter[str] = Counter()
        self.category_counter: Counter[str] = Counter()

    def handle(
        self,
        data: object = None,
        category: StatusCategory = StatusCategory.SUCCESS,
        exception: Exception | None = None,
    ) -> HandleResult:
        # 统计异常类型
        exception_type = type(exception).__name__ if exception else "None"
        self.exception_counter[exception_type] += 1

        # 统计错误消息
        msg = category.exception_message(exception)
        self.message_counter[msg] += 1

        # 统计分类
        self.category_counter[category.name] += 1

        return HandleResult(
            code=category.exception_code(exception),
            msg=msg,
            data=data,
        )

    def get_exception_stats(self) -> dict:
        """获取异常类型统计"""
        return dict(self.exception_counter)

    def get_message_stats(self) -> dict:
        """获取消息统计"""
        return dict(self.message_counter)

    def get_category_stats(self) -> dict:
        """获取分类统计"""
        return dict(self.category_counter)

# 使用示例
stats_handler = StatisticsHandler()
Result.set_handler(stats_handler)

# 模拟一些操作
Result.success({"id": 1})
Result.success({"id": 2})
Result.client_error(ValueError("参数错误"))
Result.client_error(ValueError("参数错误"))
Result.client_error(TypeError("类型错误"))
Result.server_error(RuntimeError("运行时错误"))
Result.network_error(ConnectionError("连接失败"))

# 查看统计结果
logger.info("异常类型统计:", stats_handler.get_exception_stats())
# {'None': 2, 'ValueError': 2, 'TypeError': 1, 'RuntimeError': 1, 'ConnectionError': 1}

logger.info("消息统计:", stats_handler.get_message_stats())
# {'success': 2, '参数错误': 2, '类型错误': 1, '运行时错误': 1, '连接失败': 1}

logger.info("分类统计:", stats_handler.get_category_stats())
# {'SUCCESS': 2, 'CLIENT_ERROR': 3, 'SERVER_ERROR': 1, 'NETWORK_ERROR': 1}
```

#### 自定义错误码前缀

```python
from pynomad.result.result import ResultHandler, HandleResult, Result, StatusCategory

class PrefixCodeHandler(ResultHandler):
    """为错误码添加自定义前缀，用于区分不同服务或模块"""

    def __init__(self, prefix: str = "APP") -> None:
        self.prefix = prefix

    def handle(
        self,
        data: object = None,
        category: StatusCategory = StatusCategory.SUCCESS,
        exception: Exception | None = None,
    ) -> HandleResult:
        base_code = category.exception_code(exception)
        # 生成带前缀的错误码
        code = f"{self.prefix}_{base_code}"

        return HandleResult(
            code=code,
            msg=category.exception_message(exception),
            data=data,
        )

# 使用示例
prefix_handler = PrefixCodeHandler(prefix="MYAPP")
Result.set_handler(prefix_handler)

success_result = Result.success({"status": "ok"})
error_result = Result.client_error(ValueError("错误"))

logger.info("success code:", success_result.code)  # MYAPP_10000000
logger.info("error code:", error_result.code)      # MYAPP_20000000...
```

#### 作用说明

ResultHandler 的主要作用：

1. **统计和监控**：自动统计各类错误的出现次数，用于错误监控、趋势分析和常见错误识别
2. **自定义错误码**：根据业务需求自定义错误码格式，例如添加服务前缀、模块标识等
3. **错误告警**：结合统计功能，当某类错误达到阈值时触发告警
4. **日志增强**：在处理 Result 时自动记录详细的错误信息
5. **全局统一处理**：所有 Result 创建都经过 Handler，便于统一处理

#### 适用场景

- **错误监控系统**：需要统计错误类型、频率和分布
- **多服务系统**：不同服务使用不同的错误码前缀区分
- **日志分析**：需要详细的错误统计信息进行分析
- **错误告警**：根据错误统计实现自动化告警
- **调试和测试**：收集错误信息用于问题定位

---

## Logger - 日志系统

灵活的日志管理系统。

### 基本使用

```python
from pynomad import get_logger

# 获取日志记录器
logger = get_logger(__name__)

# 不同级别的日志
logger.trace("追踪信息")
logger.debug("调试信息")
logger.info("普通信息")
logger.warn("警告信息")
logger.error("错误信息")
logger.fatal("致命错误")
```

### 日志自动配置

日志系统支持通过 `settings.toml` 自动配置，无需手动加载。

#### Root 级别配置

在 `settings.toml` 中配置 root 日志级别：

```toml
[logging]
# root 日志级别：TRACE | DEBUG | INFO | WARN | ERROR | FATAL
# 生产环境（active=prod/product/production）默认为 INFO，开发环境默认为 TRACE
pretty = true
```

```python
from pynomad import get_logger

logger = get_logger(__name__)

# root 级别会从配置文件自动读取
logger.info("普通信息")
logger.debug("调试信息")
```

#### 指定名称的级别配置

支持通配符和层级继承配置不同模块的日志级别：

```toml
[logging]
pretty = true

[logging.level]
# root 级别（会应用到所有 logger）
root = "INFO"

# 精确匹配：特定模块
"pynomad.config" = "DEBUG"
"pynomad.cache" = "TRACE"

# 父级匹配：会应用到子模块
"pynomad" = "DEBUG"  # 会应用到 pynomad.config, pynomad.cache 等

# 通配符匹配：使用 * 和 ?
"test.*" = "WARN"     # test 模块下的所有子模块
"*_test" = "ERROR"    # 以 _test 结尾的模块
```

```python
from pynomad import get_logger

# 不同模块会自动应用匹配的日志级别
logger_root = get_logger()           # 使用 root = "INFO"
logger_config = get_logger("pynomad.config")    # 使用 "pynomad.config" = "DEBUG"
logger_cache = get_logger("pynomad.cache")      # 使用 "pynomad.cache" = "TRACE"
logger_test = get_logger("test.utils")          # 使用 "test.*" = "WARN"
```

#### Pretty 日志配置

启用 pretty 模式，支持结构化日志输出：

```toml
[logging]
pretty = true

[logging.level]
root = "INFO"
```

```python
from pynomad import get_logger
import json

logger = get_logger(__name__)

# pretty 模式会自动格式化字典和列表
logger.info({"name": "Alice", "age": 30})
# 输出格式化的 JSON 对象

logger.info([1, 2, 3, 4, 5])
# 输出格式化的列表
```

#### 环境自动调整

日志系统会根据激活环境自动调整日志级别：

```toml
[env]
active = "prod"
```

- **生产环境**（active 包含 `prod`/`product`/`production`）：
  - root 日志级别：INFO
  - 文件日志级别：INFO

- **开发环境**（其他环境）：
  - root 日志级别：TRACE
  - 文件日志级别：INFO

### 彩色日志生成器

使用 `FormatBuilder` 构建自定义彩色日志格式。

#### 基本使用

```python
from pynomad.logsystem.builder import FormatBuilder
from pynomad.logsystem.formatters import ColorFormatter
from pynomad.common.ansi import Color, AnsiStyle

# 构建格式字符串
builder = FormatBuilder()

# 添加时间戳（洋红色）
builder.add_fore(Color.MAGENTA)
builder.add_asctime(width=9)
builder.reset_style()

# 添加日志名称（青色，左对齐，宽度 25，超长中间截断）
builder.add_fore(Color.CYAN.bright_fore)
builder.add_name(align="<", width=25, truncate=True)
builder.reset_style()

# 添加分隔符
builder.add_string(":")

# 添加行号（蓝色，右对齐，零填充，宽度 4）
builder.add_style(AnsiStyle.DIM)
builder.add_fore(Color.BLUE)
builder.add_lineno(width=4, align=">", fill="0")
builder.reset_style()

# 添加日志级别（使用动态级别颜色，居中对齐，宽度 8）
builder.add_string(" [")
builder.add_levelname(width=8, fill="*", align="^", use_level_color=True)
builder.add_string("] ")

# 添加消息（使用动态级别颜色）
builder.add_message(use_level_color=True)

# 构建格式字符串
_, _, colorfmt_dynamic = builder.build()

# 创建格式化器
formatter = ColorFormatter(
    fmt=colorfmt_dynamic,
    datefmt="%H:%M:%S"
)
```

#### 动态级别颜色

使用 `use_level_color=True` 启用动态级别颜色，不同级别显示不同颜色：

```python
builder = FormatBuilder()

# 级别字段使用动态颜色
builder.add_levelname(width=8, use_level_color=True)
builder.add_string(": ")
builder.add_message(use_level_color=True)

_, _, fmt = builder.build()
formatter = ColorFormatter(fmt)
```

默认级别颜色映射：
- **TRACE**: 青色 + 暗淡样式
- **DEBUG**: 白色
- **INFO**: 绿色
- **WARN**: 黄色
- **ERROR**: 红色
- **FATAL**: 亮红色 + 粗体 + 下划线

#### 自定义颜色和样式

```python
from pynomad.common.ansi import Color, AnsiStyle

builder = FormatBuilder()

# 前景色
builder.add_fore(Color.RED)              # 红色
builder.add_fore(Color.GREEN.bright_fore)  # 亮绿色

# 背景色
builder.add_back(Color.DEFAULT)         # 默认背景
builder.add_back(41)                    # 红色背景

# 样式
builder.add_style(AnsiStyle.BOLD)        # 粗体
builder.add_style(AnsiStyle.UNDERLINE)   # 下划线
builder.add_style(AnsiStyle.DIM)        # 暗淡

# 组合使用
builder.add_fore(Color.YELLOW)
builder.add_back(41)
builder.add_style(AnsiStyle.BOLD)
builder.add_string("警告消息")
builder.reset_style()  # 重置所有颜色和样式
```

#### 格式化选项

```python
builder = FormatBuilder()

# 对齐方式："<" 左对齐, ">" 右对齐, "^" 居中, "=" 填充
builder.add_name(align="<", width=20)    # 左对齐
builder.add_levelname(align=">", width=8)  # 右对齐
builder.add_funcname(align="^", width=15)   # 居中

# 填充字符
builder.add_lineno(width=5, fill="0")    # 用 0 填充
builder.add_string(width=10, fill="-")   # 用 - 填充

# 超长截断（仅在宽度限制时有效）
builder.add_module(width=20, truncate=True)  # 超长中间截断

# 使用动态级别颜色
builder.add_levelname(use_level_color=True)
builder.add_message(use_level_color=True)
```

#### 配置文件中使用格式构建器

```toml
[logging]
pretty = true

[logging.formatters]
# 自定义彩色格式
[logging.formatters.console_colored]
class = "pynomad.logsystem.formatters.ColorFormatter"
# 使用 fmtbuild_method 动态生成格式
fmtbuild_method = "myapp.logger:build_custom_format"
# show 模式：plain/static/dynamic
show = "dynamic"
```

```python
# myapp/logger.py
from pynomad.logsystem.builder import FormatBuilder
from pynomad.common.ansi import Color, AnsiStyle
from pynomad.logsystem.logtypes import FormatStrings

def build_custom_format() -> FormatStrings:
    """自定义日志格式构建函数"""
    builder = FormatBuilder()
    
    builder.add_fore(Color.MAGENTA)
    builder.add_asctime(width=9)
    builder.reset_style()
    builder.add_string(" ")
    builder.add_fore(Color.CYAN.bright_fore)
    builder.add_name(align="<", width=25, truncate=True)
    builder.reset_style()
    builder.add_string(":")
    builder.add_fore(Color.BLUE)
    builder.add_lineno(width=4, align=">", fill="0")
    builder.reset_style()
    builder.add_string(" [")
    builder.add_levelname(width=8, fill=" ", align="^", use_level_color=True)
    builder.add_string("] ")
    builder.add_message(use_level_color=True)
    
    return builder.build()
```

#### 完整示例

```python
from pynomad import get_logger
from pynomad.logsystem.builder import FormatBuilder
from pynomad.logsystem.formatters import ColorFormatter
from pynomad.common.ansi import Color, AnsiStyle
import logging

# 构建自定义格式
def build_custom_format():
    builder = FormatBuilder()
    
    # 时间戳 - 洋红色
    builder.add_fore(Color.MAGENTA)
    builder.add_asctime(width=9)
    builder.reset_style()
    
    # 日志器名 - 青色
    builder.add_fore(Color.CYAN.bright_fore)
    builder.add_name(align="<", width=25, truncate=True)
    builder.reset_style()
    
    # 分隔符
    builder.add_string(":")
    
    # 行号 - 蓝色
    builder.add_fore(Color.BLUE)
    builder.add_lineno(width=4, align=">", fill="0")
    builder.reset_style()
    
    # 级别 - 动态颜色
    builder.add_string(" [")
    builder.add_levelname(width=8, align="^", use_level_color=True)
    builder.add_string("] ")
    
    # 消息 - 动态颜色
    builder.add_message(use_level_color=True)
    
    return builder.build()

# 创建格式化器
_, _, fmt = build_custom_format()
formatter = ColorFormatter(fmt, datefmt="%H:%M:%S")

# 应用到日志器
logger = get_logger(__name__)
handler = logging.StreamHandler()
handler.setFormatter(formatter)
logger.addHandler(handler)

# 测试
logger.trace("追踪信息")  # 青色
logger.debug("调试信息")  # 白色
logger.info("普通信息")   # 绿色
logger.warn("警告信息")   # 黄色
logger.error("错误信息")  # 红色
logger.fatal("致命错误")  # 亮红色 + 粗体 + 下划线
```

---

## Cache - 缓存系统

多级缓存系统，支持多种缓存存储方式。

### 缓存配置

缓存系统支持通过 `settings.toml` 自动配置，无需手动初始化。

#### 配置文件示例

```toml
# ========== 缓存配置 ==========
# 环境配置
environments = ["dev", "test", "prod"]
active = ["dev"]

# 内存缓存配置
[cache.memory]
name = "memory_cache"
ttl = 3

# DataFrame 内存缓存配置
[cache.dfmemory]
name = "df_memory_cache"
ttl = 3

# Pickle 缓存配置
[cache.pickle]
name = "pickle_cache"
cache_dir = "{workspace}/data/cache/pickle"
enable_encryption = true
salt = "pynomad_pickle_cache_v1"
ttl = 3

# DataFrame Pickle 缓存配置
[cache.dfpickle]
cache_dir = "{workspace}/data/cache/df_pickle"
enable_encryption = true
salt = "pynomad_pickle_cache_v1"

# Redis 缓存配置
[cache.redis]
host = "localhost"
port = 6379
ttl = 3
db = 15

# DataFrame Redis 缓存配置
[cache.dfredis]
host = "localhost"
port = 6379
db = 15

# SQL 缓存配置（全局）
[cache.sql]
db_type = "sqlite"
db_url = "{workspace}/data/cache/database/cache.db"
ttl = 3600

# SQL 缓存配置（开发环境）
[dev.cache.sql]
db_type = "mysql"
db_name = "cache_db"
host = "localhost"
port = 3306
username = "root"
password = "your_password"

# SQL 缓存配置（测试环境）
[test.cache.sql]
db_type = "postgresql"
db_name = "cache_db"
host = "localhost"
port = 5432
username = "postgres"
password = "your_password"

# SQL 缓存配置（生产环境）
[prod.cache.sql]
db_type = "postgresql"
db_name = "cache_db"
host = "prod.example.com"
port = 5432
username = "postgres"
password = "prod_password"
```

### Memory Cache - 内存缓存 (@memcached)

内存缓存是最快的缓存方式，数据存储在进程内存中。

#### 基本使用

```python
from pynomad import memcached, get_logger

logger = get_logger(__name__)

@memcached
def expensive_function(x: int) -> int:
    logger.info(f"计算 {x} 的平方")
    return x * x

# 第一次调用 - 缓存未命中
result1 = expensive_function(5)  # 输出: 计算 5 的平方

# 第二次调用 - 缓存命中
result2 = expensive_function(5)  # 无输出，直接返回缓存结果
```

#### 装饰器参数

```python
from pynomad import memcached

# 自定义缓存名称和 TTL
@memcached(name="custom_cache", ttl=60, maxsize=100)
def compute_factorial(n: int) -> int:
    return factorial(n)
```

**参数说明**：
- `name`: 缓存名称，用于区分不同缓存实例
- `ttl`: 过期时间（秒），默认从配置文件读取
- `maxsize`: 最大缓存条目数，默认从配置文件读取
- `keygenerator`: 自定义键生成函数

#### 配置说明

```toml
[cache.memory]
name = "memory_cache"      # 缓存名称
ttl = 3                    # 默认过期时间（秒）
```

---

### Pickle Cache - Pickle 缓存 (@pickled)

Pickle 缓存将数据序列化到文件，支持数据持久化。

#### 基本使用

```python
from pynomad import pickled, get_logger

logger = get_logger(__name__)

@pickled
def complex_function(data: dict) -> list:
    logger.info(f"处理复杂数据: {data}")
    return [data.get("key", "default")]

# 缓存到文件
result1 = complex_function({"key": "value"})  # 执行计算
result2 = complex_function({"key": "value"})  # 从缓存读取
```

#### 装饰器参数

```python
from pynomad import pickled

@pickled(
    name="custom_pickle_cache",
    cache_dir="{workspace}/data/custom_cache",
    enable_encryption=False,
    ttl=3600
)
def process_data(data: dict) -> dict:
    return data
```

**参数说明**：
- `name`: 缓存名称
- `cache_dir`: 缓存目录，支持路径变量展开
- `enable_encryption`: 是否启用加密，默认为 true
- `ttl`: 过期时间（秒）

#### 配置说明

```toml
[cache.pickle]
name = "pickle_cache"
cache_dir = "{workspace}/data/cache/pickle"  # 缓存目录
enable_encryption = true                      # 是否启用加密
salt = "pynomad_pickle_cache_v1"           # 加密盐值
ttl = 3                                    # 默认过期时间（秒）
```

---

### Redis Cache - Redis 缓存 (@rediscached)

Redis 缓存支持分布式缓存，适用于多进程或多服务器场景。

#### 安装依赖

```bash
pip install redis
```

#### 基本使用

```python
from pynomad import rediscached, get_logger

logger = get_logger(__name__)

@rediscached
def fetch_from_api(endpoint: str) -> dict:
    logger.info(f"调用 API: {endpoint}")
    return {"data": "response", "endpoint": endpoint}

# 结果缓存到 Redis
result1 = fetch_from_api("/api/users")  # 执行 API 调用
result2 = fetch_from_api("/api/users")  # 从 Redis 缓存读取
```

#### 装饰器参数

```python
from pynomad import rediscached

@rediscached(
    name="api_cache",
    host="localhost",
    port=6379,
    db=1,
    ttl=300,
    maxsize=1000
)
def fetch_data(url: str) -> dict:
    return {"url": url}
```

**参数说明**：
- `name`: 缓存名称
- `host`: Redis 服务器地址
- `port`: Redis 端口
- `db`: Redis 数据库编号
- `ttl`: 过期时间（秒）
- `maxsize`: 最大缓存条目数

#### 配置说明

```toml
[cache.redis]
host = "localhost"  # Redis 服务器地址
port = 6379         # Redis 端口
ttl = 3             # 默认过期时间（秒）
db = 15             # Redis 数据库编号
```

---

### SQL Cache - SQL DataFrame 缓存 (@sqlcached)

SQL 缓存专门用于 pandas DataFrame 对象的数据库缓存，支持多种数据库类型（SQLite、MySQL、PostgreSQL），适合大数据量、长期缓存的场景。

#### 安装依赖

```bash
pip install sqlalchemy
```

#### 基本使用

```python
from pynomad import sqlcached, get_logger
from pandas import DataFrame

logger = get_logger(__name__)

@sqlcached(db_type="sqlite", db_url="sqlite:///data/cache.db", ttl=300)
def query_database(query: str) -> DataFrame:
    logger.info(f"执行查询: {query}")
    return DataFrame({"query": [query], "result": ["data"]})

# DataFrame 缓存到数据库
df1 = query_database("SELECT * FROM users")  # 执行查询
df2 = query_database("SELECT * FROM users")  # 从数据库缓存读取
```

#### 装饰器参数

```python
from pynomad import sqlcached
from pandas import DataFrame

@sqlcached(
    name="custom_sql_cache",
    db_type="mysql",
    db_url="mysql://user:pass@localhost/db",
    db_name="cache_db",
    host="localhost",
    port=3306,
    username="root",
    password="password",
    ttl=3600
)
def fetch_data(key: str) -> DataFrame:
    return DataFrame({"key": [key], "value": [100]})
```

**参数说明**：
- `name`: 缓存名称
- `db_type`: 数据库类型（sqlite/mysql/postgresql）
- `db_url`: 数据库连接 URL
- `db_name`: 数据库名称
- `host`: 数据库主机
- `port`: 数据库端口
- `username`: 用户名
- `password`: 密码
- `ttl`: 过期时间（秒）

#### 配置说明

```toml
# 全局配置
[cache.sql]
db_type = "sqlite"
db_url = "{workspace}/data/cache/database/cache.db"
ttl = 3600

# 开发环境配置
[dev.cache.sql]
db_type = "mysql"
db_name = "cache_db"
host = "localhost"
port = 3306
username = "root"
password = "your_password"

# 测试环境配置
[test.cache.sql]
db_type = "postgresql"
db_name = "cache_db"
host = "localhost"
port = 5432
username = "postgres"
password = "your_password"
```

**重要说明**：

- `sqlcached` 是 **DataFrame 专用** 缓存装饰器，继承自 `BaseDataFrameDecorator`
- 支持 `DataFrameValueLoader` 三阶段处理（GET、PUT、EXTRACT）
- 返回值类型必须是 `DataFrame`
- 适用于大数据量、长期缓存的 DataFrame 场景

---

### DataFrame 缓存

DataFrame 缓存专门用于 pandas DataFrame 对象的缓存。

#### DataFrameValueLoader - 三阶段数据处理

DataFrame 缓存采用三阶段处理模式，通过 `DataFrameValueLoader` 协议实现灵活的数据处理。

**三个阶段**：

1. **GET 阶段** (`CacheStage.GET`)：从缓存提取数据，决定是否使用缓存
2. **PUT 阶段** (`CacheStage.PUT`)：合并缓存数据和新数据
3. **EXTRACT 阶段** (`CacheStage.EXTRACT`)：从合并数据中提取最终返回数据

#### DataFrameValueLoader 协议详解

`DataFrameValueLoader` 是一个 Protocol（协议），需要实现三个方法：

```python
from pynomad.cache.dataframe.types import DataFrameValueLoader, CacheStage
from pandas import DataFrame
from pynomad.result.result import Result, ResultAny

class CustomDataFrameValueLoader(DataFrameValueLoader):
    """自定义 DataFrame 值加载器"""

    def get(
        self,
        cache_data: DataFrame,
        extra_params: dict[str, Any],
        args: tuple[Any, ...],
        kwargs: dict[str, Any],
    ) -> Result[DataFrame | dict[str, Any]]:
        """GET 阶段：从缓存提取数据

        参数说明：
            cache_data: 从缓存获取的 DataFrame（如果有缓存）
            extra_params: 装饰器传递的额外参数（通常包含上次的 args 和 kwargs）
            args: 当前调用的位置参数（被装饰函数的 args）
            kwargs: 当前调用的关键字参数（被装饰函数的 kwargs）

        返回格式：
            - 使用缓存：Result.success(data=cache_data 或 dict)
            - 刷新数据：Result.client_error(exception=NeedsRefreshException(...))

        使用场景：
            - 检查缓存是否过期（根据时间戳、版本号等）
            - 验证参数是否匹配（与 extra_params 中的旧参数比较）
            - 根据业务逻辑决定是否使用缓存
        """
        # 示例1：检查缓存是否过期
        if self._is_expired(cache_data):
            return Result.client_error(
                exception=NeedsRefreshException(message="缓存已过期")
            )
        return Result.success(data=cache_data)

        # 示例2：比较参数是否匹配
        last_args = tuple(extra_params.get("args", []))
        last_kwargs = extra_params.get("kwargs", {})
        if last_args != args or last_kwargs != kwargs:
            return Result.client_error(
                exception=NeedsRefreshException(message="参数已变化"),
                data={"args": (), "kwargs": kwargs}  # 新参数（用于重新调用）
            )
        return Result.success(data=cache_data)

    def put(
        self,
        cache_data: DataFrame,
        new_data: DataFrame,
        extra_params: dict[str, Any],
        args: tuple[Any, ...],
        kwargs: dict[str, Any],
    ) -> ResultAny[DataFrame]:
        """PUT 阶段：合并缓存数据和新数据

        参数说明：
            cache_data: 原缓存中的 DataFrame（可能为空 DataFrame）
            new_data: 新获取的 DataFrame（被装饰函数的返回值）
            extra_params: 装饰器传递的额外参数
            args: 当前调用的位置参数
            kwargs: 当前调用的关键字参数

        返回格式：
            - Result.success(data=DataFrame): 需要缓存到存储的 DataFrame

        使用场景：
            - 数据追加：旧数据 + 新数据
            - 数据去重：合并后删除重复行
            - 数据更新：根据某些字段更新旧数据
            - 完全覆盖：直接返回 new_data（不使用 cache_data）
        """
        # 示例1：数据追加
        merged = pd.concat([cache_data, new_data], ignore_index=True)
        return Result.success(data=merged)

        # 示例2：数据去重
        merged = pd.concat([cache_data, new_data], ignore_index=True)
        merged = merged.drop_duplicates()
        return Result.success(data=merged)

        # 示例3：根据列更新（如按 symbol 更新价格）
        cache_data.set_index('symbol', inplace=True)
        new_data.set_index('symbol', inplace=True)
        merged = cache_data.update(new_data).reset_index()
        return Result.success(data=merged)

        # 示例4：完全覆盖（不合并）
        return Result.success(data=new_data)

    def extract(
        self,
        merged_data: DataFrame,
        extra_params: dict[str, Any],
        args: tuple[Any, ...],
        kwargs: dict[str, Any],
    ) -> ResultAny[DataFrame]:
        """EXTRACT 阶段：提取最终返回数据

        参数说明：
            merged_data: 合并后的 DataFrame（将被缓存）
            extra_params: 装饰器传递的额外参数
            args: 当前调用的位置参数
            kwargs: 当前调用的关键字参数

        返回格式：
            - Result.success(data=DataFrame): 返回给调用方的最终 DataFrame

        使用场景：
            - 返回部分数据（如最新 N 行、特定列）
            - 数据过滤（根据参数筛选）
            - 数据排序（按时间、价格等排序）
            - 数据分组汇总
        """
        # 示例1：返回最新 N 行
        return Result.success(data=merged_data.tail(10))

        # 示例2：根据参数过滤
        symbol = kwargs.get('symbol')
        if symbol:
            filtered = merged_data[merged_data['symbol'] == symbol]
            return Result.success(data=filtered)
        return Result.success(data=merged_data)

        # 示例3：按时间排序返回
        sorted_data = merged_data.sort_values('timestamp', ascending=False)
        return Result.success(data=sorted_data)

        # 示例4：返回所有数据（不做处理）
        return Result.success(data=merged_data)
```

#### 参数格式说明

**传入参数**：
| 参数 | 类型 | 说明 |
|------|------|------|
| `cache_data` | `DataFrame` | 从缓存读取的 DataFrame（GET/PUT 阶段有此参数） |
| `new_data` | `DataFrame` | 新获取的 DataFrame（仅 PUT 阶段有此参数） |
| `merged_data` | `DataFrame` | 合并后的 DataFrame（仅 EXTRACT 阶段有此参数） |
| `extra_params` | `dict[str, Any]` | 额外参数，通常包含：`{"args": [...], "kwargs": {...}}` |
| `args` | `tuple[Any, ...]` | 被装饰函数的位置参数 |
| `kwargs` | `dict[str, Any]` | 被装饰函数的关键字参数 |

**返回值格式**：
- **GET 阶段**：`Result[DataFrame | dict[str, Any]]`
  - 成功：`Result.success(data=cache_data)` 或 `Result.success(data={"key": "value"})`
  - 失败：`Result.client_error(exception=NeedsRefreshException(...))`
- **PUT 阶段**：`ResultAny[DataFrame]`
  - 成功：`Result.success(data=merged_data)`
- **EXTRACT 阶段**：`ResultAny[DataFrame]`
  - 成功：`Result.success(data=extracted_data)`

#### 完整使用示例

**示例1：增量更新缓存（追加新数据）**

```python
from pynomad import df_memcached
from pynomad.cache.dataframe.types import DataFrameValueLoader
from pandas import DataFrame
from pynomad.result.result import Result, ResultAny

class IncrementalDataLoader(DataFrameValueLoader):
    """增量更新加载器：追加新数据，返回最新数据"""

    def get(self, cache_data: DataFrame, extra_params: dict,
            args: tuple, kwargs: dict) -> Result[DataFrame | dict]:
        # 总是重新获取数据（实现增量更新）
        return Result.client_error(
            exception=NeedsRefreshException("需要获取新数据"),
            data={"args": (), "kwargs": kwargs}
        )

    def put(self, cache_data: DataFrame, new_data: DataFrame,
            extra_params: dict, args: tuple, kwargs: dict) -> ResultAny[DataFrame]:
        # 合并旧数据和新数据
        merged = pd.concat([cache_data, new_data], ignore_index=True)
        # 去重
        merged = merged.drop_duplicates()
        return Result.success(data=merged)

    def extract(self, merged_data: DataFrame, extra_params: dict,
               args: tuple, kwargs: dict) -> ResultAny[DataFrame]:
        # 返回最新 100 条数据
        return Result.success(data=merged_data.tail(100))

@df_memcached(valueloader=IncrementalDataLoader())
def fetch_stock_data(symbol: str) -> DataFrame:
    """获取股票数据"""
    # 模拟获取最新数据
    return DataFrame({"symbol": [symbol], "price": [100.0], "timestamp": [datetime.now()]})
```

**示例2：智能缓存（过期检查 + 参数匹配）**

```python
from datetime import datetime, timedelta

class SmartDataLoader(DataFrameValueLoader):
    """智能加载器：检查过期时间和参数匹配"""

    CACHE_DURATION = timedelta(hours=1)  # 缓存1小时

    def get(self, cache_data: DataFrame, extra_params: dict,
            args: tuple, kwargs: dict) -> Result[DataFrame | dict]:
        # 检查缓存是否为空
        if cache_data.empty:
            return Result.client_error("缓存为空")

        # 检查时间戳列是否存在
        if 'timestamp' not in cache_data.columns:
            return Result.client_error("缓存缺少时间戳")

        # 检查缓存是否过期
        last_time = cache_data['timestamp'].max()
        if datetime.now() - last_time > self.CACHE_DURATION:
            return Result.client_error(
                exception=NeedsRefreshException("缓存已过期")
            )

        # 检查参数是否匹配
        last_args = tuple(extra_params.get("args", []))
        last_kwargs = extra_params.get("kwargs", {})
        if last_args != args or last_kwargs != kwargs:
            return Result.client_error(
                exception=NeedsRefreshException("参数已变化")
            )

        # 使用缓存
        return Result.success(data=cache_data)

    def put(self, cache_data: DataFrame, new_data: DataFrame,
            extra_params: dict, args: tuple, kwargs: dict) -> ResultAny[DataFrame]:
        # 直接覆盖缓存
        return Result.success(data=new_data)

    def extract(self, merged_data: DataFrame, extra_params: dict,
               args: tuple, kwargs: dict) -> ResultAny[DataFrame]:
        # 直接返回所有数据
        return Result.success(data=merged_data)
```

**示例3：数据过滤和分组**

```python
class FilteredDataLoader(DataFrameValueLoader):
    """过滤加载器：根据参数过滤返回数据"""

    def get(self, cache_data: DataFrame, extra_params: dict,
            args: tuple, kwargs: dict) -> Result[DataFrame | dict]:
        # 使用 DefaultDataFrameValueLoader 的逻辑
        last_args = tuple(extra_params.get("args", []))
        last_kwargs = extra_params.get("kwargs", {})
        if last_args == args and last_kwargs == kwargs:
            return Result.success(data=cache_data)
        return Result.client_error("参数已变化")

    def put(self, cache_data: DataFrame, new_data: DataFrame,
            extra_params: dict, args: tuple, kwargs: dict) -> ResultAny[DataFrame]:
        # 合并并去重
        merged = pd.concat([cache_data, new_data], ignore_index=True)
        return Result.success(data=merged.drop_duplicates())

    def extract(self, merged_data: DataFrame, extra_params: dict,
               args: tuple, kwargs: dict) -> ResultAny[DataFrame]:
        # 根据参数过滤
        symbol = kwargs.get('symbol')
        if symbol:
            return Result.success(data=merged_data[merged_data['symbol'] == symbol])

        # 根据 args 过滤
        if args and len(args) > 0:
            symbol = args[0]
            return Result.success(data=merged_data[merged_data['symbol'] == symbol])

        # 返回所有数据
        return Result.success(data=merged_data)
```

#### DefaultDataFrameValueLoader

默认的值加载器，提供简单的缓存逻辑：

- **GET**：比较 `extra_params` 中的旧参数与当前参数
  - 相同：返回 `Result.success(data=cache_data)`
  - 不同：返回 `Result.client_error(NeedsRefreshException(...))`
- **PUT**：直接返回 `new_data`（不合并，完全覆盖缓存）
- **EXTRACT**：直接返回 `merged_data`（不做任何处理）

适用于不需要复杂合并逻辑、直接覆盖缓存的简单场景。

```python
# 使用默认加载器
from pynomad import df_memcached

@df_memcached()  # 默认使用 DefaultDataFrameValueLoader
def get_data(symbol: str) -> DataFrame:
    return DataFrame({"symbol": [symbol], "price": [100.0]})
```

#### Key 生成规则 - 参数参与缓存键的最佳实践

在使用 DataFrame 缓存（特别是三阶段处理）时，合理设计缓存键是提高缓存命中率的关键。

**基本概念**：

缓存键由以下几部分组成：
- **模块名**：函数所在模块
- **类名**：如果是类方法
- **函数名**：被装饰的函数名称
- **参数**：位置参数和关键字参数

**参数参与原则**：

✅ **建议参与 Key 生成的参数**：
- **唯一标识符**：如股票代码、用户 ID、订单号等
- **数据类型/分类**：如复权类型、数据频率等
- **影响数据内容的关键参数**：直接决定返回数据的参数

❌ **不建议参与 Key 生成的参数**：
- **过滤条件**：如时间范围（`start_date`, `end_date`）、分页参数等
- **排序参数**：如 `sort_by`, `order` 等
- **格式参数**：如返回格式、字段选择等
- **临时参数**：如日志标记、调试参数等

**示例：股票数据缓存**

```python
from pynomad import df_memcached
from pynomad.cache.decorator.keygenerator import ConfigableKeygenerator
from pandas import DataFrame

# ❌ 错误示例：所有参数都参与 Key 生成
@df_memcached()
def get_stock_data(code: str, start_date: str, end_date: str, fq: str) -> DataFrame:
    """
    问题：start_date 和 end_date 参与了 Key 生成
    - get_stock_data("AAPL", "2024-01-01", "2024-12-31", "qfq") 生成键 A
    - get_stock_data("AAPL", "2024-01-01", "2024-06-30", "qfq") 生成键 B
    结果：即使 AAPL 的完整数据已缓存，不同日期范围也会重新获取
    """
    # 实际实现会获取完整历史数据
    return DataFrame({"code": [code], "date": ["2024-01-01"], "price": [100.0]})

# ✅ 正确示例：使用 exclude_params 排除过滤条件
@df_memcached(
    keygenerator=ConfigableKeygenerator(
        include_module=True,
        include_function=True,
        include_args=True,
        include_kwargs=True,
        exclude_params={"start_date", "end_date"}  # 排除时间范围参数
    )
)
def get_stock_data(code: str, start_date: str, end_date: str, fq: str) -> DataFrame:
    """
    优点：只有 code 和 fq 参与 Key 生成
    - get_stock_data("AAPL", "2024-01-01", "2024-12-31", "qfq") 生成键 A
    - get_stock_data("AAPL", "2024-01-01", "2024-06-30", "qfq") 生成键 A（相同！）
    结果：首次调用获取完整数据并缓存，后续调用从缓存中过滤返回
    """
    # 获取完整历史数据
    full_data = fetch_full_history(code, fq)
    # 在 EXTRACT 阶段过滤日期范围
    return full_data

# ✅ 正确示例：使用 include_params 只包含必要参数
@df_memcached(
    keygenerator=ConfigableKeygenerator(
        include_params={"code", "fq"}  # 只包含 code 和 fq
    )
)
def get_stock_data(code: str, start_date: str, end_date: str, fq: str) -> DataFrame:
    """与上面效果相同，更明确"""
    return fetch_full_history(code, fq)
```

**配合 ValueLoader 实现过滤**：

```python
from pynomad.cache.dataframe.types import DataFrameValueLoader
from pandas import DataFrame
from pynomad.result.result import Result, ResultAny
from datetime import datetime

class StockDataLoader(DataFrameValueLoader):
    """股票数据加载器：缓存完整数据，按需过滤返回"""

    def get(self, cache_data: DataFrame, extra_params: dict,
            args: tuple, kwargs: dict) -> Result[DataFrame | dict]:
        # 简单返回缓存，不过滤（过滤在 extract 阶段）
        return Result.success(data=cache_data)

    def put(self, cache_data: DataFrame, new_data: DataFrame,
            extra_params: dict, args: tuple, kwargs: dict) -> ResultAny[DataFrame]:
        # 返回新数据（假设总是获取完整数据）
        return Result.success(data=new_data)

    def extract(self, merged_data: DataFrame, extra_params: dict,
               args: tuple, kwargs: dict) -> ResultAny[DataFrame]:
        # 根据参数过滤返回数据
        start_date = kwargs.get('start_date')
        end_date = kwargs.get('end_date')

        filtered_data = merged_data

        # 应用日期范围过滤
        if start_date:
            filtered_data = filtered_data[filtered_data['date'] >= start_date]
        if end_date:
            filtered_data = filtered_data[filtered_data['date'] <= end_date]

        return Result.success(data=filtered_data)

# 使用自定义加载器
@df_memcached(
    keygenerator=ConfigableKeygenerator(
        exclude_params={"start_date", "end_date"}  # 排除过滤参数
    ),
    valueloader=StockDataLoader()
)
def get_stock_data(code: str, start_date: str, end_date: str, fq: str) -> DataFrame:
    """
    工作流程：
    1. 首次调用 get_stock_data("AAPL", "2024-01-01", "2024-03-31", "qfq")
       - 生成 Key：包含 code="AAPL", fq="qfq"
       - 获取完整历史数据并缓存

    2. 二次调用 get_stock_data("AAPL", "2024-04-01", "2024-06-30", "qfq")
       - 生成 Key：相同（因为 start_date, end_date 不参与）
       - 从缓存读取完整数据
       - 在 extract 阶段过滤返回 2024-04-01 到 2024-06-30 的数据

    3. 三次调用 get_stock_data("TSLA", "2024-01-01", "2024-12-31", "qfq")
       - 生成 Key：不同（code="TSLA"）
       - 获取 TSLA 的完整数据并缓存
    """
    return fetch_full_history(code, fq)
```

**常见场景的 Key 生成策略**：

| 场景 | 参与参数 | 不参与参数 | 原因 |
|------|---------|-----------|------|
| 股票历史数据 | `code`, `fq` | `start_date`, `end_date` | 日期范围是过滤条件，不影响数据获取 |
| 用户列表查询 | `role`, `status` | `page`, `page_size` | 分页参数只影响返回数量 |
| 订单查询 | `user_id` | `start_time`, `end_time` | 时间范围是过滤条件 |
| 统计数据 | `metric`, `period` | `fields` | 字段选择只影响返回列 |
| 配置获取 | `config_key` | `version` | 版本号可作为 metadata 存储 |

**ConfigableKeygenerator 参数说明**：

```python
ConfigableKeygenerator(
    include_module=True,      # 是否包含模块名
    include_class=True,       # 是否包含类名
    include_function=True,     # 是否包含函数名
    include_args=True,        # 是否包含位置参数
    include_kwargs=True,      # 是否包含关键字参数
    show_args_names=True,     # 是否显示位置参数名
    include_params=None,      # 只包含这些参数（白名单）
    exclude_params=None,      # 排除这些参数（黑名单）
    hash_params=False,        # 是否对参数进行哈希
    hash_key=False,          # 是否对整个键进行哈希
    hash_algorithm="md5",    # 哈希算法：md5/sha1/sha256/sha512
    hash_truncate=16,        # 哈希值截断长度
    separator=":",           # 分隔符
)
```

**推荐的 Key 生成器配置**：

```python
from pynomad.cache.decorator.keygenerator import (
    default_key_generator,           # 完整信息（模块+类+函数+参数）
    hashed_params_key_generator,     # 参数哈希（减少键长度）
    compact_key_generator,           # 紧凑模式（只函数名+哈希参数）
    module_function_key_generator,   # 不含参数（共享缓存）
)
```

选择建议：
- 大多数场景：`hashed_params_key_generator`（平衡可读性和性能）
- 需要调试：`default_key_generator`（完整信息）
- 极高并发：`compact_key_generator`（最短键）
- 全局共享：`module_function_key_generator`（无参数影响）


---

### Memory DataFrame Cache - DataFrame 内存缓存 (@df_memcached)

```python
from pynomad import df_memcached, get_logger
from pandas import DataFrame

logger = get_logger(__name__)

@df_memcached
def get_stock_data(symbol: str) -> DataFrame:
    logger.info(f"获取股票数据: {symbol}")
    return DataFrame({"symbol": [symbol], "price": [100.0]})

# 第一次调用 - 缓存未命中
df1 = get_stock_data("AAPL")  # 输出: 获取股票数据: AAPL

# 第二次调用 - 缓存命中
df2 = get_stock_data("AAPL")  # 无输出，从缓存读取
```

#### 装饰器参数

```python
@df_memcached(
    name="custom_df_cache",
    maxsize=100,
    ttl=60,
    valueloader=CustomDataFrameValueLoader()
)
def load_dataframe(symbol: str) -> DataFrame:
    return DataFrame({"symbol": [symbol]})
```

#### 配置说明

```toml
[cache.dfmemory]
name = "df_memory_cache"  # 缓存名称
ttl = 3                     # 默认过期时间（秒）
```

---

### Pickle DataFrame Cache - DataFrame Pickle 缓存 (@df_pickled)

```python
from pynomad import df_pickled, get_logger
from pandas import DataFrame

logger = get_logger(__name__)

@df_pickled
def load_dataframe(path: str) -> DataFrame:
    logger.info(f"加载数据: {path}")
    return DataFrame({"path": [path], "data": [1, 2, 3]})

# 缓存到文件
df1 = load_dataframe("data.csv")  # 执行加载
df2 = load_dataframe("data.csv")  # 从缓存读取
```

#### 装饰器参数

```python
@df_pickled(
    name="custom_df_pickle",
    cache_dir="{workspace}/data/custom_df_cache",
    enable_encryption=False,
    ttl=3600
)
def process_dataframe(id: str) -> DataFrame:
    return DataFrame({"id": [id]})
```

#### 配置说明

```toml
[cache.dfpickle]
cache_dir = "{workspace}/data/cache/df_pickle"  # 缓存目录
enable_encryption = true                          # 是否启用加密
salt = "pynomad_pickle_cache_v1"               # 加密盐值
```

---

### Redis DataFrame Cache - DataFrame Redis 缓存 (@df_rediscached)

```python
from pynomad import df_rediscached, get_logger
from pandas import DataFrame

logger = get_logger(__name__)

@df_rediscached
def fetch_market_data(symbol: str) -> DataFrame:
    logger.info(f"获取市场数据: {symbol}")
    return DataFrame({"symbol": [symbol], "price": [150.0]})

# DataFrame 缓存到 Redis
df1 = fetch_market_data("BTC")  # 执行获取
df2 = fetch_market_data("BTC")  # 从 Redis 缓存读取
```

#### 装饰器参数

```python
@df_rediscached(
    name="custom_df_redis",
    host="localhost",
    port=6379,
    db=14,
    ttl=120,
    maxsize=200
)
def fetch_dataframe(key: str) -> DataFrame:
    return DataFrame({"key": [key]})
```

#### 配置说明

```toml
[cache.dfredis]
host = "localhost"  # Redis 服务器地址
port = 6379         # Redis 端口
db = 15             # Redis 数据库编号
```

---

### Multi-Level Cache - 多级缓存 (@multi_cached)

多级缓存组合器支持组合多个缓存装饰器，实现多级缓存策略。

#### 读穿透和写穿透

**读穿透**：
- 按顺序从 L1 -> L2 -> L3 查找缓存
- 命中后回填到上层缓存

**写穿透**：
- 所有缓存都未命中时执行函数
- 将结果写入所有层级

#### 基本使用

```python
from pynomad import multi_cached, memcached, pickled, get_logger

logger = get_logger(__name__)

@multi_cached(
    memcached("l1", ttl=10),   # L1: 内存缓存，10秒
    pickled("l2", ttl=60)       # L2: 文件缓存，60秒
)
def expensive_computation(n: int) -> int:
    logger.info(f"计算斐波那契数列第 {n} 项")
    # 复杂计算...
    return n

# 查找顺序：L1 -> L2 -> 执行函数 -> 写入 L1 和 L2
expensive_computation(10)
```

#### DataFrame 多级缓存

```python
from pynomad import multi_cached, df_memcached, df_pickled, df_rediscached
from pandas import DataFrame

@multi_cached(
    df_memcached("df_l1", ttl=10),    # L1: DataFrame 内存缓存
    df_pickled("df_l2", ttl=60),       # L2: DataFrame 文件缓存
    df_rediscached("df_l3", ttl=300)   # L3: DataFrame Redis 缓存
)
def get_multi_level_df(symbol: str) -> DataFrame:
    # 获取 DataFrame 数据
    return DataFrame({"symbol": [symbol], "price": [100.0]})
```

#### 混合缓存组合

可以混合使用通用缓存和 DataFrame 缓存：

```python
@multi_cached(
    memcached("hybrid_l1", ttl=10),       # L1: 通用内存缓存
    df_rediscached("hybrid_l2", ttl=300)  # L2: DataFrame Redis 缓存
)
def get_data(id: str) -> DataFrame:
    return DataFrame({"id": [id]})
```

#### 多级缓存管理

```python
# 获取多级缓存实例
multi_cache = expensive_computation.__multi_cache__

# 清空所有层级缓存
multi_cache.clear()

# 清空指定层级缓存
multi_cache.clear_decorator(index=1)

# 获取统计信息
stats = multi_cache.get_stats()
print(f"L1 命中次数: {stats[0]['hits']}")
print(f"L2 命中次数: {stats[1]['hits']}")

# 删除指定键
multi_cache.evict(key="cache_key")

# 获取装饰器数量
print(f"缓存层级数量: {multi_cache.count}")
```

#### TTL 优先级规则

1. **装饰器传入参数** > 配置文件默认值
2. **已存在的缓存键不受 TTL 变更影响**：
   - 如果某个键已经缓存，之后修改装饰器参数或更新配置文件的 ttl，不会影响已存在的键的过期时间
   - 只有在缓存过期后被删除，下次调用时才会使用新的 ttl
3. **TTL 计算时机**：在 put() 时设置，过期后自然失效

---

### 缓存装饰器总结

| 装饰器 | 类型 | 用途 | 持久化 | 适用场景 |
|---------|------|------|---------|---------|
| `memcached` | 通用 | 内存 | 否 | 短期缓存、频繁访问 |
| `pickled` | 通用 | 文件 | 是 | 中期缓存、需要持久化 |
| `rediscached` | 通用 | Redis | 是（Redis持久化）| 分布式缓存、多进程 |
| `sqlcached` | DataFrame | 数据库 | 是 | DataFrame 长期缓存、大容量 |
| `df_memcached` | DataFrame | 内存 | 否 | DataFrame 临时缓存 |
| `df_pickled` | DataFrame | 文件 | 是 | DataFrame 持久化缓存 |
| `df_rediscached` | DataFrame | Redis | 是（Redis持久化）| DataFrame 分布式缓存 |
| `multi_cached` | 组合器 | 多级 | 取决于组合 | 多级缓存策略 |

---

### 最佳实践

1. **根据数据特点选择缓存**：
   - 频繁访问、数据量小：内存缓存 (`memcached`, `df_memcached`)
   - 需要持久化：Pickle 缓存 (`pickled`)、DataFrame SQL 缓存 (`sqlcached`)
   - 分布式场景：Redis 缓存 (`rediscached`, `df_rediscached`)
   - DataFrame 数据：使用 df_ 系列缓存或 `sqlcached`

2. **DataFrame 缓存选择策略**：

   **基础缓存器（推荐用于简单场景）**：
   - 如果返回类型是普通对象（`dict`、`list`、`str` 等）：使用基础缓存器
   - 如果返回类型是 `DataFrame` 但不需要复杂处理：使用基础缓存器
   - 如果返回类型是 `Result[DataFrame]` 但不需要复杂处理：使用基础缓存器

   **DataFrame 缓存器（仅在需要复杂处理时使用）**：
   - 需要增量更新缓存（追加、去重、更新数据）
   - 需要自定义缓存命中逻辑（过期检查、参数匹配等）
   - 需要从合并数据中提取部分返回数据（过滤、排序、分组等）
   - 需要三阶段处理（GET、PUT、EXTRACT）来实现复杂业务逻辑

   **选择示例**：
   ```python
   # ✅ 推荐：简单场景使用基础缓存器
   @memcached
   def get_simple_data() -> DataFrame:
       return DataFrame({"key": [1, 2, 3]})

   @pickled
   def fetch_from_api() -> Result[DataFrame]:
       return Result.success(DataFrame({"data": [1, 2, 3]}))

   # ✅ 仅在需要复杂处理时使用 DataFrame 缓存器
   @df_memcached(valueloader=IncrementalDataLoader())
   def get_incremental_data(symbol: str) -> DataFrame:
       # 需要增量更新、去重等复杂处理
       return DataFrame({"symbol": [symbol], "price": [100.0]})
   ```

3. **多级缓存组合策略**：

   **推荐组合模式**：
   - DataFrame 作为第一级（L1）：使用 df_ 系列缓存器
   - 其他级别（L2、L3）：使用基础缓存器

   **原因**：
   - df_ 系列缓存器提供 DataFrame 特定的三阶段处理
   - 基础缓存器提供通用的序列化/反序列化能力
   - 这种组合既利用了 DataFrame 专用处理，又保持了良好的性能

   **组合示例**：
   ```python
   # ✅ 推荐：DataFrame 在第一级，其他级别使用基础缓存器
   @multi_cached(
       df_memcached("df_l1", ttl=10),   # L1: DataFrame 内存缓存（处理三阶段逻辑）
       pickled("l2", ttl=60),            # L2: 基础文件缓存
       rediscached("l3", ttl=300)        # L3: 基础 Redis 缓存
   )
   def get_data(symbol: str) -> DataFrame:
       return DataFrame({"symbol": [symbol]})

   # ❌ 不推荐：所有级别都使用 DataFrame 缓存器
   @multi_cached(
       df_memcached("df_l1", ttl=10),
       df_pickled("df_l2", ttl=60),     # 冗余：L2 不需要 DataFrame 专用处理
       df_rediscached("df_l3", ttl=300)  # 冗余：L3 不需要 DataFrame 专用处理
   )
   def get_data(symbol: str) -> DataFrame:
       return DataFrame({"symbol": [symbol]})
   ```

4. **合理设置 TTL**：
   - 变化频繁的数据：短 TTL（如 10-60 秒）
   - 相对稳定的数据：长 TTL（如 300-3600 秒）

5. **使用多级缓存**：
   - L1 使用内存缓存，提供最快访问
   - L2 使用 Redis/Pickle，提供持久化
   - L3 使用 SQL，提供长期存储

6. **自定义 ValueLoader**：
   - 需要增量更新：自定义 PUT 阶段
   - 需要过滤返回：自定义 EXTRACT 阶段
   - 需要复杂命中逻辑：自定义 GET 阶段

### 缓存配置管理

```python
from pynomad import cp

# 查看缓存属性配置
memory_props = cp.MemoryCacheProperties()
print(memory_props)

redis_props = cp.RedisCacheProperties()
print(redis_props)

df_pickle_props = cp.DataFramePickleCacheProperties()
print(df_pickle_props)
```

---

## 变更日志

### 0.1.8 (2026-03-16)

**Features**
- DataFrame 缓存装饰器参数属性支持默认值填充（args, only_args, kwargs, full_kwargs）
- 添加 ReflashParams.get_full_kwargs 方法，支持参数到参数名的映射
- 添加 create_context 方法封装 DataFrameLoaderContext 创建逻辑
- 添加 _determ_should_wrap_by_explicit_return_type 方法，根据显式返回类型判断是否包装
- DataFrame 缓存装饰器添加 trace 级别日志，记录缓存命中、GET/PUT/EXTRACT 阶段

**Miscellaneous**
- 优化参数比较逻辑，确保未传递参数使用默认值填充

### 0.1.7 (2026-03-15)

**Bugfixes**
- 修复 SQL cache decorator 的 extra_params 未正确传递到 decorator_extra_params 的问题

### 0.1.6 (2026-03-14)

**Features**
- 添加 logs_dir 配置支持，允许自定义日志目录
- dataframe 缓存支持双前缀
- autoconfig 添加 autowarid 标志

**Miscellaneous**
- 添加 towncrier 配置和项目元数据到 PyPI

---

## License

MIT License

---

## 贡献指南

欢迎使用本框架。不欢迎 Issue。

作者现在在黑厂打黑工，996 都算福报，实际是 777 工作制（早 7 点干到晚 7 点，一周 7 天），时间和精力都非常有限，根本没精力处理问题。如果你在使用中发现 bug 或有问题，建议自行下载源码修改。

