Metadata-Version: 2.4
Name: infomankit
Version: 0.3.29
Summary: Infoman base library - A comprehensive toolkit for modern Python applications with dual ORM support
Project-URL: Homepage, https://github.com/infoman-lib/infoman-pykit
Project-URL: Documentation, https://github.com/infoman-lib/infoman-pykit
Project-URL: Repository, https://github.com/infoman-lib/infoman-pykit.git
Project-URL: Issues, https://github.com/infoman-lib/infoman-pykit/issues
Project-URL: Changelog, https://github.com/infoman-lib/infoman-pykit/blob/main/CHANGELOG.md
Author-email: Louis <louishwh@gmail.com>
Maintainer-email: Louis <louishwh@gmail.com>
License: MIT
Keywords: async,database,fastapi,llm,toolkit
Classifier: Development Status :: 4 - Beta
Classifier: Framework :: AsyncIO
Classifier: Framework :: FastAPI
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: MIT License
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Topic :: Internet :: WWW/HTTP :: HTTP Servers
Classifier: Topic :: Software Development :: Libraries :: Python Modules
Requires-Python: >=3.11
Requires-Dist: aiohttp~=3.13.2
Requires-Dist: click~=8.1.0
Requires-Dist: cryptography~=46.0.3
Requires-Dist: dotenv==0.9.9
Requires-Dist: httpx~=0.28.1
Requires-Dist: loguru~=0.7.0
Requires-Dist: psutil~=7.1.3
Requires-Dist: pydantic-settings>=2.12.0
Requires-Dist: pydantic~=2.12.0
Requires-Dist: pyjwt~=2.10.0
Requires-Dist: pyyaml~=6.0.2
Provides-Extra: cache
Requires-Dist: fastapi-cache2~=0.2.2; extra == 'cache'
Requires-Dist: redis[hiredis]~=7.1.0; extra == 'cache'
Provides-Extra: database
Requires-Dist: aiosqlite~=0.22.0; extra == 'database'
Requires-Dist: asyncmy~=0.2.10; extra == 'database'
Requires-Dist: asyncpg~=0.31.0; extra == 'database'
Requires-Dist: tortoise-orm~=0.25.2; extra == 'database'
Provides-Extra: database-pro
Requires-Dist: aiosqlite~=0.22.0; extra == 'database-pro'
Requires-Dist: alembic~=1.14.0; extra == 'database-pro'
Requires-Dist: asyncmy~=0.2.10; extra == 'database-pro'
Requires-Dist: asyncpg~=0.31.0; extra == 'database-pro'
Requires-Dist: sqlalchemy[asyncio]~=2.0.36; extra == 'database-pro'
Provides-Extra: dev
Requires-Dist: mypy>=1.8.0; extra == 'dev'
Requires-Dist: pre-commit>=3.6.0; extra == 'dev'
Requires-Dist: pytest-asyncio>=0.23.0; extra == 'dev'
Requires-Dist: pytest-cov>=4.0.0; extra == 'dev'
Requires-Dist: pytest>=8.0.0; extra == 'dev'
Requires-Dist: ruff>=0.14.10; extra == 'dev'
Provides-Extra: docs
Requires-Dist: mkdocs-material~=9.0.0; extra == 'docs'
Requires-Dist: mkdocstrings[python]>=0.24.0; extra == 'docs'
Requires-Dist: mkdocs~=1.6.1; extra == 'docs'
Provides-Extra: full
Requires-Dist: aiofiles~=25.1.0; extra == 'full'
Requires-Dist: aiosqlite~=0.22.0; extra == 'full'
Requires-Dist: apscheduler~=3.10.0; extra == 'full'
Requires-Dist: asyncmy~=0.2.10; extra == 'full'
Requires-Dist: asyncpg~=0.31.0; extra == 'full'
Requires-Dist: fastapi-cache2~=0.2.2; extra == 'full'
Requires-Dist: fastapi~=0.127.0; extra == 'full'
Requires-Dist: granian~=2.6.0; extra == 'full'
Requires-Dist: jinja2~=3.1.6; extra == 'full'
Requires-Dist: litellm~=1.75.0; extra == 'full'
Requires-Dist: nats-py~=2.10.0; extra == 'full'
Requires-Dist: orjson~=3.11.5; extra == 'full'
Requires-Dist: prometheus-client~=0.23.1; extra == 'full'
Requires-Dist: prometheus-fastapi-instrumentator~=7.1.0; extra == 'full'
Requires-Dist: python-multipart>=0.0.20; extra == 'full'
Requires-Dist: qdrant-client~=1.16.2; extra == 'full'
Requires-Dist: redis[hiredis]~=7.1.0; extra == 'full'
Requires-Dist: tortoise-orm~=0.25.2; extra == 'full'
Provides-Extra: full-pro
Requires-Dist: aiofiles~=25.1.0; extra == 'full-pro'
Requires-Dist: aiosqlite~=0.22.0; extra == 'full-pro'
Requires-Dist: alembic~=1.14.0; extra == 'full-pro'
Requires-Dist: apscheduler~=3.10.0; extra == 'full-pro'
Requires-Dist: asyncmy~=0.2.10; extra == 'full-pro'
Requires-Dist: asyncpg~=0.31.0; extra == 'full-pro'
Requires-Dist: fastapi-cache2~=0.2.2; extra == 'full-pro'
Requires-Dist: fastapi~=0.127.0; extra == 'full-pro'
Requires-Dist: granian~=2.6.0; extra == 'full-pro'
Requires-Dist: jinja2~=3.1.6; extra == 'full-pro'
Requires-Dist: litellm~=1.75.0; extra == 'full-pro'
Requires-Dist: nats-py~=2.10.0; extra == 'full-pro'
Requires-Dist: orjson~=3.11.5; extra == 'full-pro'
Requires-Dist: prometheus-client~=0.23.1; extra == 'full-pro'
Requires-Dist: prometheus-fastapi-instrumentator~=7.1.0; extra == 'full-pro'
Requires-Dist: python-multipart>=0.0.20; extra == 'full-pro'
Requires-Dist: qdrant-client~=1.16.2; extra == 'full-pro'
Requires-Dist: redis[hiredis]~=7.1.0; extra == 'full-pro'
Requires-Dist: sqlalchemy[asyncio]~=2.0.36; extra == 'full-pro'
Provides-Extra: llm
Requires-Dist: litellm~=1.75.0; extra == 'llm'
Provides-Extra: messaging
Requires-Dist: nats-py~=2.10.0; extra == 'messaging'
Provides-Extra: scheduler
Requires-Dist: apscheduler~=3.10.0; extra == 'scheduler'
Provides-Extra: test
Requires-Dist: factory-boy>=3.3.0; extra == 'test'
Requires-Dist: httpx>=0.25.0; extra == 'test'
Requires-Dist: mypy>=1.8.0; extra == 'test'
Requires-Dist: pre-commit>=3.6.0; extra == 'test'
Requires-Dist: pytest-asyncio>=0.23.0; extra == 'test'
Requires-Dist: pytest-benchmark>=4.0.0; extra == 'test'
Requires-Dist: pytest-cov>=4.0.0; extra == 'test'
Requires-Dist: pytest>=8.0.0; extra == 'test'
Requires-Dist: ruff>=0.14.10; extra == 'test'
Provides-Extra: vector
Requires-Dist: qdrant-client~=1.16.2; extra == 'vector'
Provides-Extra: web
Requires-Dist: aiofiles~=25.1.0; extra == 'web'
Requires-Dist: fastapi~=0.127.0; extra == 'web'
Requires-Dist: granian~=2.6.0; extra == 'web'
Requires-Dist: jinja2~=3.1.6; extra == 'web'
Requires-Dist: orjson~=3.11.5; extra == 'web'
Requires-Dist: prometheus-client~=0.23.1; extra == 'web'
Requires-Dist: prometheus-fastapi-instrumentator~=7.1.0; extra == 'web'
Requires-Dist: python-multipart>=0.0.20; extra == 'web'
Provides-Extra: web-basic
Requires-Dist: aiofiles~=25.1.0; extra == 'web-basic'
Requires-Dist: aiosqlite~=0.22.0; extra == 'web-basic'
Requires-Dist: asyncmy~=0.2.10; extra == 'web-basic'
Requires-Dist: asyncpg~=0.31.0; extra == 'web-basic'
Requires-Dist: fastapi~=0.127.0; extra == 'web-basic'
Requires-Dist: granian~=2.6.0; extra == 'web-basic'
Requires-Dist: jinja2~=3.1.6; extra == 'web-basic'
Requires-Dist: orjson~=3.11.5; extra == 'web-basic'
Requires-Dist: prometheus-client~=0.23.1; extra == 'web-basic'
Requires-Dist: prometheus-fastapi-instrumentator~=7.1.0; extra == 'web-basic'
Requires-Dist: python-multipart>=0.0.20; extra == 'web-basic'
Requires-Dist: tortoise-orm~=0.25.2; extra == 'web-basic'
Provides-Extra: web-cache
Requires-Dist: aiofiles~=25.1.0; extra == 'web-cache'
Requires-Dist: aiosqlite~=0.22.0; extra == 'web-cache'
Requires-Dist: asyncmy~=0.2.10; extra == 'web-cache'
Requires-Dist: asyncpg~=0.31.0; extra == 'web-cache'
Requires-Dist: fastapi-cache2~=0.2.2; extra == 'web-cache'
Requires-Dist: fastapi~=0.127.0; extra == 'web-cache'
Requires-Dist: granian~=2.6.0; extra == 'web-cache'
Requires-Dist: jinja2~=3.1.6; extra == 'web-cache'
Requires-Dist: orjson~=3.11.5; extra == 'web-cache'
Requires-Dist: prometheus-client~=0.23.1; extra == 'web-cache'
Requires-Dist: prometheus-fastapi-instrumentator~=7.1.0; extra == 'web-cache'
Requires-Dist: python-multipart>=0.0.20; extra == 'web-cache'
Requires-Dist: redis[hiredis]~=7.1.0; extra == 'web-cache'
Requires-Dist: tortoise-orm~=0.25.2; extra == 'web-cache'
Provides-Extra: web-full
Requires-Dist: aiofiles~=25.1.0; extra == 'web-full'
Requires-Dist: aiosqlite~=0.22.0; extra == 'web-full'
Requires-Dist: apscheduler~=3.10.0; extra == 'web-full'
Requires-Dist: asyncmy~=0.2.10; extra == 'web-full'
Requires-Dist: asyncpg~=0.31.0; extra == 'web-full'
Requires-Dist: fastapi-cache2~=0.2.2; extra == 'web-full'
Requires-Dist: fastapi~=0.127.0; extra == 'web-full'
Requires-Dist: granian~=2.6.0; extra == 'web-full'
Requires-Dist: jinja2~=3.1.6; extra == 'web-full'
Requires-Dist: nats-py~=2.10.0; extra == 'web-full'
Requires-Dist: orjson~=3.11.5; extra == 'web-full'
Requires-Dist: prometheus-client~=0.23.1; extra == 'web-full'
Requires-Dist: prometheus-fastapi-instrumentator~=7.1.0; extra == 'web-full'
Requires-Dist: python-multipart>=0.0.20; extra == 'web-full'
Requires-Dist: qdrant-client~=1.16.2; extra == 'web-full'
Requires-Dist: redis[hiredis]~=7.1.0; extra == 'web-full'
Requires-Dist: tortoise-orm~=0.25.2; extra == 'web-full'
Provides-Extra: web-service
Requires-Dist: aiofiles~=25.1.0; extra == 'web-service'
Requires-Dist: aiosqlite~=0.22.0; extra == 'web-service'
Requires-Dist: asyncmy~=0.2.10; extra == 'web-service'
Requires-Dist: asyncpg~=0.31.0; extra == 'web-service'
Requires-Dist: fastapi-cache2~=0.2.2; extra == 'web-service'
Requires-Dist: fastapi~=0.127.0; extra == 'web-service'
Requires-Dist: granian~=2.6.0; extra == 'web-service'
Requires-Dist: jinja2~=3.1.6; extra == 'web-service'
Requires-Dist: orjson~=3.11.5; extra == 'web-service'
Requires-Dist: prometheus-client~=0.23.1; extra == 'web-service'
Requires-Dist: prometheus-fastapi-instrumentator~=7.1.0; extra == 'web-service'
Requires-Dist: python-multipart>=0.0.20; extra == 'web-service'
Requires-Dist: redis[hiredis]~=7.1.0; extra == 'web-service'
Requires-Dist: tortoise-orm~=0.25.2; extra == 'web-service'
Description-Content-Type: text/markdown

# infomankit

> 现代化 Python/AI 服务脚手架与工具箱。封装了配置加载、日志、FastAPI 服务、LLM 调用、缓存、消息队列、加解密等常用能力，帮助你快速把 idea 变成可部署的生产级服务。

## 特性亮点
- **统一配置体系**：`.env` + `config.py` 支持多环境加载，覆盖应用、数据库、缓存、LLM、MQ、向量库等关键参数。
- **FastAPI 微服务基线**：开箱即可运行的 `infoman.service.app`，内置 CORS、GZip、链路日志、中英文错误码、请求 ID、健康/监控接口。
- **灵活的 ORM 选择**：支持 `Tortoise ORM`（简单易用）和 `SQLAlchemy 2.0`（强大性能），可单独或同时使用。
- **异步基础设施**：MySQL/PostgreSQL、Redis 缓存、Litellm、NATS、Qdrant/Milvus 的集成入口，易于按需扩展。
- **AI/LLM 辅助**：`infoman.llm.LLM` 提供问答、对话、流式输出、翻译、总结、代码审查等常用封装。
- **性能测试工具**：内置标准化性能测试模块，支持定制化接口测试、精美 HTML 报告生成、多种接口类型评估标准。
- **实用工具集**：日志系统、缓存/重试/计时装饰器、AES/RSA、异步 HTTP、文本结构化提取、Feishu Bot 等常用基建。
- **细粒度模块化**：可单独安装 `web`、`database`、`database-alchemy`、`llm`、`vector`、`messaging` 等 extra，仅引入所需依赖。

## 目录速览
```
infoman/
├── config/            # 环境变量加载与全局配置
├── llm/               # Litellm 包装，提供 Chat/Stream/API
├── performance/       # 性能测试模块（新增）
│   ├── config.py      # 测试配置管理
│   ├── runner.py      # 测试运行器
│   ├── reporter.py    # HTML 报告生成
│   ├── standards.py   # 性能标准定义
│   └── cli.py         # 命令行工具
├── service/
│   ├── app.py         # FastAPI Application 入口
│   ├── routers/       # 健康检查与监控 API
│   ├── core/          # 事件、响应、认证
│   ├── infrastructure/  # 数据库，消息队列
│   ├── exception/     # 错误码、异常处理
│   ├── middleware/    # Logging、RequestID、RateLimit、中间件基类
│   ├── models/        # Tortoise 模型基类 & Embedding 配置
│   └── utils/         # redis 缓存装饰器、解析/转换
└── utils/
    ├── log/           # Loguru 配置与上下文
    ├── decorators/    # cache、retry、timing 等装饰器
    ├── encryption/    # AES/RSA/ECC
    ├── http/          # aiohttp 客户端、请求信息提取
    ├── notification/  # 飞书机器人通知
    └── text/          # JSON 结构提取等
```

## 快速开始

### 一键创建项目

```bash
# 安装 infomankit
pip install -U infomankit

# 创建新项目（自动生成标准目录结构）
infomancli init my-awesome-project

# 进入项目
cd my-awesome-project

# 安装依赖并运行
pip install -e .
cp .env.example .env
infoman-serve run main:app --reload
```

访问 http://localhost:8000 查看运行效果！

### 手动安装

```bash
# Python >= 3.11
pip install -U infomankit

# 基础 Web 服务
pip install -U "infomankit[web]"

# 完整功能（使用 Tortoise ORM，100% 向前兼容）
pip install -U "infomankit[full]"

# 完整功能增强版（同时支持 Tortoise + SQLAlchemy）
pip install -U "infomankit[full-enhanced]"
```

常用 extra 组合：

| Extra          | 说明                             |
|----------------|--------------------------------|
| `web`          | FastAPI/Granian/orjson         |
| `database`     | Tortoise ORM（默认）             |
| `database-pro` | SQLAlchemy 2.0（高性能）          |
| `cache`        | Redis + fastapi-cache2        |
| `llm`          | Litellm                        |
| `vector`       | Qdrant                         |
| `messaging`    | NATS                           |
| `full`         | 完整功能（使用 Tortoise）            |
| `full-pro`     | 完整功能增强版（Tortoise + SQLAlchemy） |

本地开发推荐：

```bash
git clone https://github.com/yourusername/infoman-pykit.git
cd infomankit
pip install -e ".[dev,full]"   # 安装所有依赖和 lint/test 工具
```

## 快速上手

### 1. 配置环境变量
创建 `.env.dev`，并设置 `ENV=dev` (默认 dev)。可根据 `infoman/config/config.py` 填写常用变量：

```bash
APP_NAME=Infoman Service
APP_HOST=0.0.0.0
APP_PORT=8808
MYSQL_HOST=127.0.0.1
MYSQL_DB=infoman
MYSQL_USER=root
MYSQL_PASSWORD=secret
REDIS_HOST=127.0.0.1
QDRANT_HOST=127.0.0.1
LLM_PROXY=litellm_proxy
JWT_SECRET_KEY=change-me
```

运行时会依次加载 `.env` 与 `.env.{ENV}`，缺省值可在 `config.py` 中找到。

### 2. 启动 FastAPI 服务
```bash
export ENV=dev
uvicorn infoman.service.app:application --host ${APP_HOST:-0.0.0.0} --port ${APP_PORT:-8808} --reload
# or
python -m infoman.service.launch --mode gunicorn
```

应用启动后默认提供：
- `/api/health`：健康检查，返回 `{code:200}`。
- `/api/monitor`：进程 & 系统指标、环境信息。
- 启动事件中会自动注册 MySQL、Redis 缓存、NATS、Qdrant 等（根据配置是否填写）。

### 3. 调用 LLM
```python
import asyncio
from infoman.llm import LLM

async def main():
    resp = await LLM.ask(
        model="gpt-4o-mini",
        prompt="请用一句话介绍 infoman-pykit。",
        system_prompt="You are a concise assistant."
    )
    if resp.success:
        print(resp.content, resp.total_tokens)

asyncio.run(main())
```

- `LLM.ask/chat/stream` 会自动补全 `LLM_PROXY` 前缀并返回 token 统计。
- `LLM.quick_*` 返回字符串，`LLM.translate/summarize/code_review` 内置常用 system prompt。

### 4. 使用 Redis 缓存装饰器
```python
from pydantic import BaseModel
from infoman.service.utils.cache import redis_cache

class ConfigSchema(BaseModel):
    key: str
    value: str

class ConfigService:
    @redis_cache(prefix="config", ttl=600)
    async def get_config(self, request, key: str) -> ConfigSchema:
        # request.app.state.redis_client 将被装饰器自动读取
        ...
```

返回值可以是 `BaseModel`、`list[BaseModel]` 或普通 `dict`，装饰器会自动序列化/反序列化。

### 5. 消息队列与事件路由

```python
from infoman.service.infrastructure.mq.nats import event_router


@event_router.on("topic.user.created", queue="worker")
async def handle_user_created(msg, nats_cli):
    payload = msg.data.decode()
    ...

# 启动时在 startup 事件中执行：
# await event_router.register(app.state.nats_client)
```

`NATSClient` 支持 `publish/request/subscribe/close`，并在 `events.startup` 中自动连接（配置 `NATS_SERVER` 后生效）。

## 日志与中间件
- `infoman.utils.log.logger` 基于 Loguru，自动创建多种文件（all/info/error/debug）并支持 JSON 日志、请求上下文（RequestID）。
- `LoggingMiddleware`：记录请求耗时、客户端信息；`RequestIDMiddleware` 为每次请求注入 `X-Request-ID`。
- `RateLimitMiddleware`：IP/用户/路径多策略限流，内存或 Redis 持久化。
- `BaseMiddleware` 为自定义中间件提供 session / 处理耗时写入示例。

## 统一错误与响应
- `infoman.service.exception.error` 定义系统、请求、数据库、业务、安全、外部服务等错误码枚举，可中英文提示。
- `AppException` + `handler.py` 将数据库、Pydantic、HTTP 异常统一转换为 `{code, message, details}`。
- `infoman.service.core.response.success/failed` 提供标准响应结构。

## 更多工具箱
- **装饰器**：`retry`(支持 async/sync 指数退避)、`cache`(内存缓存)、`timing`(执行耗时)。
- **加密**：AES(自动填充/随机 IV)、RSA(4096/自定义序列化)。
- **HTTP Client**：`HttpAsyncClient` 支持表单/JSON/文件上传，返回 `HttpResult`。
- **文本处理**：`utils.text.extractor.extract_json_from_string` 可从非结构化文本中提取 JSON。
- **通知**：`notification.feishu.RobotManager` 发送飞书机器人消息。
- **Embedding 配置**：`service.models.type.embed` 统一管理不同向量模型的维度/长度、集合命名。

## 配置清单速查

| 分类         | 重点变量 |
|--------------|----------|
| 应用         | `APP_NAME`, `APP_HOST`, `APP_PORT`, `APP_BASE_URI`, `APP_DEBUG` |
| 安全         | `JWT_SECRET_KEY`, `JWT_ALGORITHM`, `JWT_ACCESS_TOKEN_EXPIRE_MINUTES`, `OAUTH2_REDIRECT_URL` |
| 数据库       | `MYSQL_HOST`, `MYSQL_PORT`, `MYSQL_DB`, `MYSQL_USER`, `MYSQL_PASSWORD`, `MYSQL_TABLE_MODELS` |
| 缓存 / Redis | `REDIS_HOST`, `REDIS_PORT`, `REDIS_DB`, `REDIS_PASSWORD` |
| 向量数据库   | `QDRANT_HOST`/`API_KEY`/`HTTP_PORT`/`GRPC_PORT`、`MILVUS_HOST` 等（Milvus 需实现 `AsyncMilvusClient`） |
| MQ           | `NATS_SERVER`（逗号分隔多实例）, `NATS_NAME` |
| LLM          | `LLM_PROXY`（litellm 代理地址） |
| 日志         | `LOG_LEVEL`, `LOG_FORMAT`, `LOG_DIR`, `LOG_RETENTION`, `LOG_ENABLE_*` |

## 开发 & 测试
```bash
# Lint / 格式化
ruff check infoman
black infoman
isort infoman

# 类型检查
mypy infoman

# 测试
pytest
```

## 🔀 ORM 选择指南

从 v0.3.0 开始，infomankit 支持两种 ORM：

### Tortoise ORM（默认）
**适合**：简单 CRUD、快速开发、学习成本低
```python
from infoman.service.models.base import TimestampMixin
from tortoise import fields

class User(TimestampMixin):
    name = fields.CharField(max_length=100)

# 直接使用
user = await User.create(name="Alice")
```

### SQLAlchemy 2.0（高性能）
**适合**：复杂查询、高性能需求、工业级项目
```python
from infoman.service.models.base import AlchemyBase, AlchemyTimestampMixin
from sqlalchemy import String
from sqlalchemy.orm import Mapped, mapped_column

class User(AlchemyBase, AlchemyTimestampMixin):
    __tablename__ = "users"
    name: Mapped[str] = mapped_column(String(100))

# 使用仓储模式
from infoman.service.models.base import create_repository
user_repo = create_repository(User)
user = await user_repo.create(name="Alice")
```

**详细迁移指南**: 👉 [doc/MIGRATION_TO_SQLALCHEMY.md](./doc/MIGRATION_TO_SQLALCHEMY.md)

---

## 📊 性能测试模块

infomankit 内置了标准化的性能测试工具，支持定制化接口测试和精美的 HTML 报告生成。

### 核心特性

- **标准化评估**：内置 4 种接口类型（fast/normal/complex/heavy）的性能标准
- **定制化配置**：支持 YAML 配置文件，灵活定义测试用例
- **高并发测试**：基于 asyncio 的异步并发执行
- **详细统计**：P50/P95/P99 响应时间、吞吐量、成功率等指标
- **精美报告**：自动生成响应式 HTML 报告，色彩分级展示
- **认证支持**：Bearer Token、Basic Auth 等多种认证方式

### 快速开始

#### 1. 创建配置文件

```yaml
# performance-test.yaml
project_name: "My API"
base_url: "http://localhost:8000"

# 并发配置
concurrent_users: 50
duration: 60  # 秒

# 测试用例
test_cases:
  - name: "健康检查"
    url: "/api/health"
    method: "GET"
    interface_type: "fast"

  - name: "用户列表"
    url: "/api/v1/users"
    method: "GET"
    interface_type: "normal"
    params:
      page: 1
      page_size: 20
```

#### 2. 运行测试

```bash
# 使用 Python 代码
python -c "
import asyncio
from infoman.performance import TestConfig, PerformanceTestRunner, HTMLReporter

async def test():
    config = TestConfig.from_yaml('performance-test.yaml')
    runner = PerformanceTestRunner(config)
    results = await runner.run()

    reporter = HTMLReporter(config)
    reporter.generate(results)

asyncio.run(test())
"

# 或使用 Makefile
make perf-test
make perf-test-api
make perf-test-stress
```

#### 3. 查看报告

测试完成后会生成精美的 HTML 报告，包含：
- 汇总指标（总请求数、成功率、平均响应时间、吞吐量）
- 每个接口的详细统计
- 响应时间百分位（P50/P95/P99）
- 性能评级和优化建议
- 错误信息汇总

### 性能标准

模块内置 4 种接口类型的标准：

| 接口类型 | 优秀 | 良好 | 可接受 | 较差 |
|---------|------|------|--------|------|
| **快速接口** (fast) | <10ms | <30ms | <50ms | <100ms |
| **一般接口** (normal) | <50ms | <100ms | <200ms | <500ms |
| **复杂接口** (complex) | <100ms | <200ms | <500ms | <1s |
| **重型接口** (heavy) | <200ms | <500ms | <1s | <3s |

### 更多文档

- 完整文档：[infoman/performance/README.md](./infoman/performance/README.md)
- 配置示例：[examples/performance/](./examples/performance/)
- 高级用法：[examples/performance/advanced_example.py](./examples/performance/advanced_example.py)

---

## 🛠️ CLI 脚手架工具

infomankit 提供了 `infomancli` 命令行工具,帮助你快速生成标准化的项目结构。

### 基本用法

```bash
# 交互式创建项目
infomancli init

# 直接指定项目名
infomancli init my-project

# 在指定目录创建
infomancli init my-project --dir /path/to/workspace
```

### 生成的项目结构

生成的项目遵循 `infoman/service` 的标准架构:

```
my-project/
├── .env.example          # 环境变量模板
├── .gitignore
├── README.md
├── main.py               # FastAPI 应用入口
├── pyproject.toml
│
├── core/                 # 核心业务逻辑
│   ├── auth.py          # 认证授权
│   └── response.py      # 标准响应模型
├── routers/              # API 路由
├── models/               # 数据模型
│   ├── entity/          # 数据库实体 (ORM)
│   ├── dto/             # 数据传输对象
│   └── schemas/         # Pydantic 验证模式
├── repository/           # 数据访问层
├── services/             # 业务逻辑服务
├── exception/            # 自定义异常
├── middleware/           # 自定义中间件
├── infrastructure/       # 基础设施
│   ├── database/        # 数据库连接
│   └── cache/           # 缓存管理
└── utils/                # 工具函数
    ├── cache/
    └── parse/
```

### 快速体验

```bash
# 1. 创建项目
infomancli init demo-api

# 2. 进入并安装
cd demo-api
pip install -e .

# 3. 启动服务
cp .env.example .env
infoman-serve run main:app --reload

# 4. 访问 API 文档
open http://localhost:8000/docs
```

生成的项目包含:
- ✅ 完整的项目结构
- ✅ FastAPI 应用框架
- ✅ 环境变量配置
- ✅ 健康检查端点
- ✅ Git 配置
- ✅ 开发文档

## License
MIT License © Infoman Contributors


---

# 📖 详细指南

本项目提供了详细的使用指南，帮助你快速上手各个功能模块。

## 指南目录

1. [NATS 事件系统使用指南](#guide-events)
2. [ORM 数据库操作指南](#guide-orm)
3. [定时任务调度器使用指南](#guide-tasks)


---


<a id="guide-events"></a>
## NATS 事件系统使用指南

## 📖 简介

infomankit 提供了基于 NATS 的事件驱动系统，支持自动事件处理器发现、装饰器注册和分布式部署。适用于微服务间通信、异步任务处理和事件驱动架构。

## 🚀 快速开始

### 1. 安装依赖

```bash
pip install "infomankit[messaging]"
```

### 2. 配置 NATS

在 `.env` 或 `config/.env.{env}` 中配置：

```bash
# NATS 服务器地址（支持多个）
NATS_SERVERS=["nats://localhost:4222"]

# 应用名称（可选）
APP_NAME=my_service

# 事件处理器目录（可选，默认 app.events）
MQ_EVENT_PACKAGE=app.events
MQ_EVENT_PATH=./app/events
```

### 3. 创建事件处理器目录

```bash
mkdir -p app/events
touch app/events/__init__.py
```

### 4. 定义事件处理器

创建 `app/events/user_events.py`:

```python
from loguru import logger
from infoman.service.infrastructure.mq.nats.nats_event_router import event_router


@event_router.on("user.created")
async def on_user_created(msg, nats_cli):
    """处理用户创建事件"""
    import json

    data = json.loads(msg.data.decode())
    user_id = data.get("user_id")
    username = data.get("username")

    logger.info(f"新用户创建: {username} (ID: {user_id})")

    # 执行业务逻辑
    # - 发送欢迎邮件
    # - 初始化用户配置
    # - 发布后续事件

    # 发布后续事件
    welcome_data = json.dumps({
        "user_id": user_id,
        "type": "welcome"
    })
    await nats_cli.publish("email.send", welcome_data.encode())


@event_router.on("user.updated", queue="user-processors")
async def on_user_updated(msg, nats_cli):
    """处理用户更新事件（使用队列组实现负载均衡）"""
    import json

    data = json.loads(msg.data.decode())
    user_id = data.get("user_id")

    logger.info(f"用户更新: ID={user_id}")

    # 处理更新逻辑
```

### 5. 启动应用

```python
from fastapi import FastAPI
from infoman.service.core.lifespan import lifespan

app = FastAPI(lifespan=lifespan)

# 事件处理器会自动加载和注册
```

启动后，系统会：
1. 自动连接 NATS 服务器
2. 扫描 `app/events/` 目录
3. 注册所有事件处理器
4. 开始监听事件

### 6. 发布事件

```python
from fastapi import APIRouter, Request
import json

router = APIRouter()


@router.post("/users")
async def create_user(request: Request):
    """创建用户并发布事件"""

    # 创建用户逻辑
    user_data = {
        "user_id": 123,
        "username": "alice"
    }

    # 发布事件
    nats_client = request.app.state.nats_manager.client
    await nats_client.publish(
        "user.created",
        json.dumps(user_data).encode()
    )

    return {"status": "ok", "user": user_data}
```

## 📚 核心概念

### 事件路由器 (EventRouter)

事件路由器负责管理事件订阅和分发。

```python
from infoman.service.infrastructure.mq.nats.nats_event_router import event_router


@event_router.on(
    subject="event.topic",  # 事件主题
    queue="worker-group"    # 队列组（可选）
)
async def handler(msg, nats_cli):
    """事件处理函数

    Args:
        msg: NATS 消息对象
            - msg.data: 消息内容 (bytes)
            - msg.subject: 消息主题
            - msg.reply: 回复主题（用于 request-reply）
        nats_cli: NATS 客户端实例
    """
    pass
```

### 队列组 (Queue Group)

队列组实现负载均衡，同一队列组内的多个订阅者会平均分配消息。

```python
# 实例 A
@event_router.on("order.process", queue="order-workers")
async def process_order_a(msg, nats_cli):
    pass

# 实例 B（相同队列组）
@event_router.on("order.process", queue="order-workers")
async def process_order_b(msg, nats_cli):
    pass

# 每条消息只会被 A 或 B 其中一个处理
```

### 主题模式 (Subject Pattern)

NATS 支持通配符订阅：

```python
# 精确匹配
@event_router.on("user.created")

# 单层通配符（*）
@event_router.on("user.*")  # 匹配 user.created, user.updated 等

# 多层通配符（>）
@event_router.on("user.>")  # 匹配 user.created, user.profile.updated 等
```

## 🎯 使用场景

### 1. 微服务间通信

**用户服务发布事件：**

```python
# user_service/app/routes/users.py
@router.post("/users")
async def create_user(request: Request):
    user = await create_user_in_db(...)

    # 发布用户创建事件
    await request.app.state.nats_manager.client.publish(
        "user.created",
        json.dumps({"user_id": user.id}).encode()
    )
```

**订单服务监听事件：**

```python
# order_service/app/events/user_events.py
@event_router.on("user.created")
async def init_user_wallet(msg, nats_cli):
    """为新用户初始化钱包"""
    data = json.loads(msg.data.decode())
    await create_wallet_for_user(data["user_id"])
```

### 2. 异步任务处理

```python
# app/events/task_events.py
@event_router.on("task.heavy", queue="task-workers")
async def process_heavy_task(msg, nats_cli):
    """处理耗时任务"""
    data = json.loads(msg.data.decode())

    # 执行耗时操作
    result = await process_video(data["video_url"])

    # 发布完成事件
    await nats_cli.publish(
        "task.completed",
        json.dumps({"task_id": data["task_id"], "result": result}).encode()
    )
```

### 3. 事件链式处理

```python
# app/events/order_events.py

@event_router.on("order.created")
async def handle_order_created(msg, nats_cli):
    """订单创建 -> 扣减库存"""
    data = json.loads(msg.data.decode())

    # 扣减库存
    await reduce_inventory(data["items"])

    # 发布库存扣减成功事件
    await nats_cli.publish("inventory.reduced", msg.data)


@event_router.on("inventory.reduced")
async def handle_inventory_reduced(msg, nats_cli):
    """库存扣减 -> 创建支付订单"""
    data = json.loads(msg.data.decode())

    # 创建支付订单
    payment = await create_payment(data["order_id"])

    # 发布支付创建事件
    await nats_cli.publish(
        "payment.created",
        json.dumps({"order_id": data["order_id"], "payment_id": payment.id}).encode()
    )


@event_router.on("payment.created")
async def handle_payment_created(msg, nats_cli):
    """支付创建 -> 发送通知"""
    data = json.loads(msg.data.decode())

    # 发送支付通知
    await send_payment_notification(data["order_id"])
```

### 4. Request-Reply 模式

```python
# 请求端
@router.get("/user/{user_id}/profile")
async def get_user_profile(request: Request, user_id: int):
    """通过 NATS 请求用户信息"""
    nats_client = request.app.state.nats_manager.client

    # 发送请求并等待响应（超时 5 秒）
    response = await nats_client.request(
        "user.get",
        json.dumps({"user_id": user_id}).encode(),
        timeout=5
    )

    user_data = json.loads(response.data.decode())
    return user_data


# 响应端
@event_router.on("user.get")
async def handle_user_get(msg, nats_cli):
    """处理用户查询请求"""
    data = json.loads(msg.data.decode())
    user = await get_user_from_db(data["user_id"])

    # 回复请求
    if msg.reply:
        await nats_cli.publish(
            msg.reply,
            json.dumps(user).encode()
        )
```

## 🔧 高级用法

### 自定义事件目录

```python
# 方式1: 通过配置
# .env
MQ_EVENT_PACKAGE=myapp.custom_events
MQ_EVENT_PATH=./myapp/custom_events

# 方式2: 手动注册
from infoman.service.utils.module_loader import register_event_handlers
from infoman.service.infrastructure.mq.nats.nats_event_router import event_router


async def custom_startup(app):
    # 加载自定义事件处理器
    register_event_handlers(
        package="myapp.custom_events",
        folder="./myapp/custom_events"
    )

    # 注册到 NATS
    nats_client = app.state.nats_manager.client
    await event_router.register(nats_client=nats_client)
```

### 错误处理

```python
@event_router.on("order.process")
async def handle_order(msg, nats_cli):
    """带错误处理的事件处理器"""
    try:
        data = json.loads(msg.data.decode())
        await process_order(data)

    except json.JSONDecodeError as e:
        logger.error(f"JSON 解析失败: {e}")
        # 发布错误事件
        await nats_cli.publish(
            "order.error",
            json.dumps({"error": "invalid_json", "raw": msg.data.decode()}).encode()
        )

    except Exception as e:
        logger.error(f"订单处理失败: {e}", exc_info=True)
        # 发布失败事件用于重试
        await nats_cli.publish("order.retry", msg.data)
```

### 消息验证

```python
from pydantic import BaseModel, ValidationError


class UserCreatedEvent(BaseModel):
    user_id: int
    username: str
    email: str


@event_router.on("user.created")
async def handle_user_created(msg, nats_cli):
    """使用 Pydantic 验证消息"""
    try:
        data = json.loads(msg.data.decode())
        event = UserCreatedEvent(**data)

        # 使用验证后的数据
        logger.info(f"新用户: {event.username} ({event.email})")

    except ValidationError as e:
        logger.error(f"事件数据验证失败: {e}")
    except json.JSONDecodeError as e:
        logger.error(f"JSON 解析失败: {e}")
```

## 📊 监控和调试

### 健康检查

```python
from fastapi import APIRouter

router = APIRouter()


@router.get("/health/nats")
async def nats_health(request: Request):
    """NATS 健康检查"""
    manager = request.app.state.nats_manager

    if not manager.is_available:
        return {"status": "unhealthy", "error": "NATS 未连接"}

    return {
        "status": "healthy",
        "connected": manager.client.connected,
        "servers": manager.client.servers
    }
```

### 事件日志

```python
@event_router.on("important.event")
async def handle_important_event(msg, nats_cli):
    """记录详细的事件处理日志"""
    logger.info(
        f"收到事件",
        extra={
            "event": "important.event",
            "subject": msg.subject,
            "size": len(msg.data),
            "has_reply": msg.reply is not None
        }
    )

    # 处理逻辑...

    logger.info("事件处理完成", extra={"event": "important.event"})
```

## 🎨 最佳实践

### 1. 事件命名规范

```python
# ✅ 推荐：使用层级结构
"user.created"
"user.updated"
"user.deleted"
"order.created"
"order.paid"
"order.cancelled"

# ❌ 不推荐：扁平命名
"user_created"
"create_user"
```

### 2. 目录组织

```
app/
├── events/
│   ├── __init__.py
│   ├── user_events.py     # 用户相关事件
│   ├── order_events.py    # 订单相关事件
│   ├── payment_events.py  # 支付相关事件
│   └── _base.py           # 基类和工具（不会被加载）
```

### 3. 消息格式

```python
# ✅ 推荐：使用 JSON 并包含元数据
{
    "event_id": "evt_123",
    "timestamp": "2025-01-28T10:00:00Z",
    "source": "user-service",
    "data": {
        "user_id": 123,
        "username": "alice"
    }
}

# 发布事件
event = {
    "event_id": str(uuid.uuid4()),
    "timestamp": datetime.utcnow().isoformat(),
    "source": settings.APP_NAME,
    "data": user_data
}
await nats_cli.publish("user.created", json.dumps(event).encode())
```

### 4. 幂等性

```python
@event_router.on("payment.process")
async def process_payment(msg, nats_cli):
    """确保幂等性"""
    data = json.loads(msg.data.decode())
    payment_id = data["payment_id"]

    # 检查是否已处理
    if await is_payment_processed(payment_id):
        logger.info(f"支付 {payment_id} 已处理，跳过")
        return

    # 处理支付
    await do_process_payment(payment_id)

    # 标记已处理
    await mark_payment_processed(payment_id)
```

## 🐛 故障排查

### 问题：事件处理器未被加载

**检查清单：**
1. 确认目录结构正确：`app/events/__init__.py`
2. 文件名不要以下划线开头
3. 确认 NATS 配置正确：`NATS_SERVERS`
4. 查看启动日志中的加载信息

### 问题：事件未被触发

**检查清单：**
1. 确认 NATS 服务器运行正常
2. 检查 subject 名称是否正确
3. 确认消息已成功发布
4. 查看事件处理器是否有异常

### 问题：多实例重复处理

**解决方案：**
使用队列组实现负载均衡：

```python
@event_router.on("task.process", queue="task-workers")
async def process_task(msg, nats_cli):
    # 同一队列组内只有一个实例处理
    pass
```

## 📖 参考资料

- [NATS 官方文档](https://docs.nats.io/)
- [完整示例代码](../examples/nats_events_usage.py)
- [实战案例](../examples/complete_project/app_events_example.py)

---

## 快速参考

```python
# 基础事件处理器
@event_router.on("event.topic")
async def handler(msg, nats_cli):
    data = json.loads(msg.data.decode())
    # 处理逻辑

# 队列组（负载均衡）
@event_router.on("event.topic", queue="workers")

# 发布事件
await nats_cli.publish("event.topic", json.dumps(data).encode())

# Request-Reply
response = await nats_cli.request("service.query", data, timeout=5)

# 通配符订阅
@event_router.on("user.*")     # 单层通配符
@event_router.on("user.>")     # 多层通配符
```


---


<a id="guide-orm"></a>
## ORM 数据库操作指南

infomankit 提供了灵活的 ORM 抽象层，支持 Tortoise ORM 和 SQLAlchemy 2.0 两种后端，可无缝切换，统一的 API 设计让代码迁移零成本。

## 📋 目录

- [快速开始](#快速开始)
- [安装](#安装)
- [核心概念](#核心概念)
- [模型定义](#模型定义)
- [仓储模式](#仓储模式)
- [数据库管理](#数据库管理)
- [高级用法](#高级用法)
- [最佳实践](#最佳实践)

---

## 快速开始

### 1. 安装依赖

```bash
# Tortoise ORM (默认)
pip install infomankit[database]

# SQLAlchemy 2.0
pip install infomankit[database-alchemy]
```

### 2. 配置数据库

创建 `.env` 文件:

```bash
# 选择 ORM 后端
USE_PRO_ORM=false  # false=Tortoise, true=SQLAlchemy

# MySQL 配置
MYSQL_ENABLED=true
MYSQL_HOST=localhost
MYSQL_PORT=3306
MYSQL_DB=myapp
MYSQL_USER=root
MYSQL_PASSWORD=secret
MYSQL_MODELS=user,product  # 模型名称（逗号分隔）
```

### 3. 定义模型

创建 `infoman/models/user.py`:

```python
from infoman.service.models.base import TimestampMixin
from tortoise import fields

class User(TimestampMixin):
    """用户模型"""

    class Meta:
        table = "users"

    name = fields.CharField(max_length=100, description="用户名")
    email = fields.CharField(max_length=255, unique=True, description="邮箱")
    is_active = fields.BooleanField(default=True, description="是否激活")
```

### 4. 使用模型

```python
# 创建用户
user = await User.create(name="Alice", email="alice@example.com")

# 查询用户
user = await User.get(id=1)
users = await User.filter(is_active=True).all()

# 更新用户
user.name = "Alice Wang"
await user.save()

# 删除用户
await user.delete()
```

---

## 安装

### 基础安装

```bash
# Tortoise ORM (轻量级，推荐新项目)
pip install infomankit[database]

# SQLAlchemy 2.0 (企业级，适合复杂查询)
pip install infomankit[database-alchemy]

# 同时安装两者（支持运行时切换）
pip install infomankit[database,database-alchemy]
```

### 数据库驱动

根据使用的数据库安装对应驱动:

```bash
# MySQL
pip install asyncmy aiomysql

# PostgreSQL
pip install asyncpg

# SQLite (Python 内置，无需额外安装)
```

---

## 核心概念

### 双 ORM 支持

infomankit 支持两种 ORM 后端，可通过配置自由切换:

| 特性 | Tortoise ORM | SQLAlchemy 2.0 |
|------|-------------|----------------|
| 性能 | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐ |
| 易用性 | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐ |
| 功能完整性 | ⭐⭐⭐⭐ | ⭐⭐⭐⭐⭐ |
| 生态系统 | ⭐⭐⭐ | ⭐⭐⭐⭐⭐ |
| 适用场景 | 新项目、轻量级应用 | 企业级、复杂查询 |

### 抽象层设计

```
用户代码
    ↓
BaseRepository (抽象接口)
    ↓
    ├── TortoiseRepository (Tortoise 实现)
    └── SQLAlchemyRepository (SQLAlchemy 实现)
```

**优势**:
- 统一 API，切换 ORM 无需修改业务代码
- 支持多数据库连接（MySQL + PostgreSQL + SQLite）
- 自动健康检查和连接池管理

---

## 模型定义

### Tortoise ORM 模型

```python
from infoman.service.models.base import TimestampMixin
from tortoise import fields

class User(TimestampMixin):
    """
    用户模型 (Tortoise ORM)

    继承 TimestampMixin 自动获得:
    - id: 主键
    - created_at: 创建时间
    - updated_at: 更新时间
    """

    class Meta:
        table = "users"
        app = "mysql_models"  # 对应数据库连接

    # 基础字段
    name = fields.CharField(max_length=100, description="用户名")
    email = fields.CharField(max_length=255, unique=True, description="邮箱")
    age = fields.IntField(null=True, description="年龄")

    # 布尔字段
    is_active = fields.BooleanField(default=True, description="是否激活")
    is_superuser = fields.BooleanField(default=False, description="是否管理员")

    # JSON 字段
    metadata = fields.JSONField(default={}, description="元数据")

    # 文本字段
    bio = fields.TextField(null=True, description="个人简介")


class Product(TimestampMixin):
    """产品模型"""

    class Meta:
        table = "products"

    name = fields.CharField(max_length=200, description="产品名")
    price = fields.DecimalField(max_digits=10, decimal_places=2, description="价格")
    stock = fields.IntField(default=0, description="库存")

    # 外键关系
    category_id = fields.IntField(description="分类ID")
```

### SQLAlchemy 2.0 模型

```python
from infoman.service.models.base import AlchemyBase, AlchemyTimestampMixin
from sqlalchemy import String, Integer, Boolean, Text, JSON
from sqlalchemy.orm import Mapped, mapped_column

class User(AlchemyBase, AlchemyTimestampMixin):
    """
    用户模型 (SQLAlchemy 2.0)

    使用 Mapped 类型注解 (Python 3.9+)
    """

    __tablename__ = "users"

    # 必填字段
    name: Mapped[str] = mapped_column(String(100), comment="用户名")
    email: Mapped[str] = mapped_column(String(255), unique=True, comment="邮箱")

    # 可选字段
    age: Mapped[int | None] = mapped_column(Integer, comment="年龄")
    bio: Mapped[str | None] = mapped_column(Text, comment="个人简介")

    # 默认值
    is_active: Mapped[bool] = mapped_column(Boolean, default=True, comment="是否激活")

    # JSON 字段
    metadata: Mapped[dict] = mapped_column(JSON, default={}, comment="元数据")


class Product(AlchemyBase, AlchemyTimestampMixin):
    """产品模型"""

    __tablename__ = "products"

    name: Mapped[str] = mapped_column(String(200), comment="产品名")
    price: Mapped[float] = mapped_column(comment="价格")
    stock: Mapped[int] = mapped_column(default=0, comment="库存")
    category_id: Mapped[int] = mapped_column(comment="分类ID")
```

### 字段类型对照表

| 数据类型 | Tortoise ORM | SQLAlchemy 2.0 |
|---------|-------------|----------------|
| 整数 | `IntField()` | `Mapped[int]` |
| 字符串 | `CharField(max_length=N)` | `Mapped[str] = mapped_column(String(N))` |
| 文本 | `TextField()` | `Mapped[str] = mapped_column(Text)` |
| 布尔 | `BooleanField()` | `Mapped[bool]` |
| 浮点数 | `FloatField()` | `Mapped[float]` |
| 小数 | `DecimalField()` | `Mapped[Decimal]` |
| JSON | `JSONField()` | `Mapped[dict] = mapped_column(JSON)` |
| 日期时间 | `DatetimeField()` | `Mapped[datetime]` |

---

## 仓储模式

### 基础 CRUD 操作

使用仓储模式统一管理数据库操作:

```python
from infoman.service.models.base import create_repository

# 创建仓储实例（自动检测 ORM 后端）
user_repo = create_repository(User)

# 1. 创建 (Create)
user = await user_repo.create(
    name="Alice",
    email="alice@example.com",
    age=25
)
print(f"创建成功: User(id={user.id})")

# 2. 读取 (Read)
# 根据 ID 获取
user = await user_repo.get(1)

# 根据条件筛选
active_users = await user_repo.filter(is_active=True)

# 获取所有记录
all_users = await user_repo.all()

# 3. 更新 (Update)
updated_user = await user_repo.update(
    id=1,
    name="Alice Wang",
    age=26
)

# 4. 删除 (Delete)
success = await user_repo.delete(1)

# 5. 统计 (Count)
count = await user_repo.count(is_active=True)
print(f"活跃用户数: {count}")
```

### 直接使用模型（Tortoise ORM）

```python
from infoman.models.user import User

# 创建
user = await User.create(name="Bob", email="bob@example.com")

# 查询单条
user = await User.get(id=1)
user = await User.get_or_none(email="bob@example.com")

# 查询多条
users = await User.filter(is_active=True).all()
users = await User.filter(age__gte=18).order_by('-created_at').limit(10)

# 更新
user.name = "Bob Smith"
await user.save()

# 批量更新
await User.filter(is_active=False).update(is_active=True)

# 删除
await user.delete()

# 批量删除
await User.filter(age__lt=18).delete()

# 统计
count = await User.filter(is_active=True).count()

# 聚合
from tortoise.functions import Count, Sum
result = await User.annotate(count=Count('id')).values('is_active', 'count')
```

### 直接使用 Session（SQLAlchemy 2.0）

```python
from infoman.service.infrastructure.db_relation.manager_pro import get_db_session
from sqlalchemy import select, update, delete
from fastapi import Depends

@app.get("/users")
async def get_users(session = Depends(get_db_session)):
    """FastAPI 路由中使用数据库会话"""

    # 查询
    stmt = select(User).where(User.is_active == True)
    result = await session.execute(stmt)
    users = result.scalars().all()

    # 创建
    new_user = User(name="Charlie", email="charlie@example.com")
    session.add(new_user)
    await session.commit()
    await session.refresh(new_user)

    # 更新
    stmt = update(User).where(User.id == 1).values(name="Charlie Brown")
    await session.execute(stmt)
    await session.commit()

    # 删除
    stmt = delete(User).where(User.id == 1)
    await session.execute(stmt)
    await session.commit()

    return {"users": [u.name for u in users]}
```

---

## 数据库管理

### 配置多数据库

```bash
# .env 文件

# MySQL 主数据库
MYSQL_ENABLED=true
MYSQL_HOST=localhost
MYSQL_PORT=3306
MYSQL_DB=app
MYSQL_USER=root
MYSQL_PASSWORD=secret
MYSQL_MODELS=user,product

# PostgreSQL 分析数据库
POSTGRES_ENABLED=true
POSTGRES_HOST=localhost
POSTGRES_PORT=5432
POSTGRES_DB=analytics
POSTGRES_USER=postgres
POSTGRES_PASSWORD=secret
POSTGRES_MODELS=log,metric

# SQLite 缓存数据库
SQLITE_ENABLED=true
SQLITE_DB=./cache.db
SQLITE_MODELS=cache
```

### 启动数据库管理器

```python
from fastapi import FastAPI
from infoman.service.infrastructure.db_relation.manager import db_manager
# 或使用 SQLAlchemy 版本
# from infoman.service.infrastructure.db_relation.manager_pro import db_manager

# 方式 1: 使用 lifespan (推荐)
from contextlib import asynccontextmanager

@asynccontextmanager
async def lifespan(app: FastAPI):
    # 启动时初始化数据库
    await db_manager.startup(app)
    yield
    # 关闭时清理连接
    await db_manager.shutdown()

app = FastAPI(lifespan=lifespan)

# 方式 2: 使用事件钩子
@app.on_event("startup")
async def startup_event():
    await db_manager.startup(app)

@app.on_event("shutdown")
async def shutdown_event():
    await db_manager.shutdown()
```

### 健康检查

```python
from fastapi import FastAPI, Request

@app.get("/health/database")
async def check_database(request: Request):
    """数据库健康检查"""

    # 检查所有数据库
    health = await db_manager.health_check()

    # 检查指定数据库
    mysql_health = await db_manager.health_check("mysql")

    return health

# 响应示例:
# {
#   "status": "healthy",
#   "name": "database",
#   "details": {
#     "connections": {
#       "mysql": {
#         "status": "healthy",
#         "type": "mysql",
#         "database": "app",
#         "pool": {"size": 5, "free": 3}
#       }
#     },
#     "count": 1
#   }
# }
```

### 获取数据库统计

```python
@app.get("/stats/database")
async def get_database_stats():
    """获取数据库连接统计"""
    stats = await db_manager.get_stats()
    return stats
```

---

## 高级用法

### 事务处理

**Tortoise ORM:**

```python
from tortoise.transactions import in_transaction

async def transfer_money(from_user_id: int, to_user_id: int, amount: float):
    """转账操作（事务）"""
    async with in_transaction() as conn:
        # 扣款
        from_user = await User.get(id=from_user_id).using_db(conn)
        from_user.balance -= amount
        await from_user.save(using_db=conn)

        # 加款
        to_user = await User.get(id=to_user_id).using_db(conn)
        to_user.balance += amount
        await to_user.save(using_db=conn)

        # 记录日志
        await TransferLog.create(
            from_user_id=from_user_id,
            to_user_id=to_user_id,
            amount=amount,
            using_db=conn
        )
```

**SQLAlchemy 2.0:**

```python
async def transfer_money(session, from_user_id: int, to_user_id: int, amount: float):
    """转账操作（事务）"""
    try:
        # SQLAlchemy session 自动管理事务
        from_user = await session.get(User, from_user_id)
        from_user.balance -= amount

        to_user = await session.get(User, to_user_id)
        to_user.balance += amount

        log = TransferLog(
            from_user_id=from_user_id,
            to_user_id=to_user_id,
            amount=amount
        )
        session.add(log)

        await session.commit()

    except Exception:
        await session.rollback()
        raise
```

### 复杂查询

**Tortoise ORM:**

```python
from tortoise.expressions import Q, F
from tortoise.functions import Count, Avg

# Q 对象（OR 查询）
users = await User.filter(
    Q(age__gte=18) | Q(is_superuser=True)
).all()

# F 对象（字段比较）
users = await User.filter(
    age__gte=F('min_age')
).all()

# 关联查询
users = await User.filter(
    orders__total__gte=1000
).prefetch_related('orders')

# 聚合查询
stats = await User.annotate(
    order_count=Count('orders')
).filter(
    order_count__gte=10
).values('name', 'order_count')

# 分组统计
avg_age = await User.all().annotate(
    avg_age=Avg('age')
).group_by('is_active').values('is_active', 'avg_age')
```

**SQLAlchemy 2.0:**

```python
from sqlalchemy import select, func, and_, or_

# OR 查询
stmt = select(User).where(
    or_(User.age >= 18, User.is_superuser == True)
)
result = await session.execute(stmt)
users = result.scalars().all()

# 聚合查询
stmt = select(
    User.is_active,
    func.count(User.id).label('count'),
    func.avg(User.age).label('avg_age')
).group_by(User.is_active)

result = await session.execute(stmt)
stats = result.all()

# JOIN 查询
stmt = select(User, Order).join(Order).where(Order.total >= 1000)
result = await session.execute(stmt)
```

### 原生 SQL

**Tortoise ORM:**

```python
from tortoise import Tortoise

# 执行原生 SQL
conn = Tortoise.get_connection("default")
results = await conn.execute_query_dict(
    "SELECT * FROM users WHERE age > ?",
    [18]
)
```

**SQLAlchemy 2.0:**

```python
from sqlalchemy import text

# 执行原生 SQL
stmt = text("SELECT * FROM users WHERE age > :age")
result = await session.execute(stmt, {"age": 18})
rows = result.fetchall()
```

### 批量操作

```python
# 批量创建（Tortoise）
users = await User.bulk_create([
    User(name="User1", email="user1@example.com"),
    User(name="User2", email="user2@example.com"),
])

# 批量创建（SQLAlchemy）
session.add_all([
    User(name="User1", email="user1@example.com"),
    User(name="User2", email="user2@example.com"),
])
await session.commit()

# 批量更新（Tortoise）
await User.filter(is_active=False).update(is_active=True)

# 批量更新（SQLAlchemy）
stmt = update(User).where(User.is_active == False).values(is_active=True)
await session.execute(stmt)
await session.commit()
```

---

## 最佳实践

### 1. 选择合适的 ORM

```python
# ✅ 新项目、简单 CRUD：使用 Tortoise ORM
USE_PRO_ORM=false

# ✅ 企业级、复杂查询：使用 SQLAlchemy
USE_PRO_ORM=true
```

### 2. 模型设计

```python
# ✅ 推荐：清晰的字段注释
class User(TimestampMixin):
    name = fields.CharField(max_length=100, description="用户名")
    email = fields.CharField(max_length=255, unique=True, description="邮箱")

    class Meta:
        table = "users"
        indexes = [("email",)]  # 添加索引

# ❌ 不推荐：没有注释和索引
class User(TimestampMixin):
    name = fields.CharField(max_length=100)
    email = fields.CharField(max_length=255)
```

### 3. 使用仓储模式

```python
# ✅ 推荐：使用仓储模式（便于测试和切换 ORM）
user_repo = create_repository(User)
user = await user_repo.get(1)

# ⚠️  可用：直接使用模型（紧耦合到 ORM）
user = await User.get(id=1)
```

### 4. 错误处理

```python
from tortoise.exceptions import DoesNotExist, IntegrityError

async def get_user_safe(user_id: int):
    """安全的用户查询"""
    try:
        user = await User.get(id=user_id)
        return user
    except DoesNotExist:
        logger.warning(f"用户不存在: {user_id}")
        return None
    except Exception as e:
        logger.error(f"查询失败: {e}")
        raise
```

### 5. 性能优化

```python
# ✅ 推荐：使用 prefetch_related 避免 N+1 查询
users = await User.all().prefetch_related('orders')

# ❌ 不推荐：N+1 查询
users = await User.all()
for user in users:
    orders = await user.orders.all()  # 每次都查询

# ✅ 推荐：使用分页
users = await User.all().limit(20).offset(0)

# ✅ 推荐：只查询需要的字段
users = await User.all().values('id', 'name', 'email')
```

### 6. 数据库迁移

使用 Aerich（Tortoise ORM）或 Alembic（SQLAlchemy）管理数据库迁移:

```bash
# Tortoise ORM - Aerich
pip install aerich
aerich init -t infoman.config.TORTOISE_ORM
aerich init-db
aerich migrate
aerich upgrade

# SQLAlchemy - Alembic
pip install alembic
alembic init migrations
alembic revision --autogenerate -m "create users table"
alembic upgrade head
```

---

## 常见问题

### Q: 如何在 Tortoise 和 SQLAlchemy 之间切换？

A: 修改配置文件中的 `USE_PRO_ORM` 参数:

```bash
# 使用 Tortoise ORM
USE_PRO_ORM=false

# 使用 SQLAlchemy 2.0
USE_PRO_ORM=true
```

如果使用仓储模式，业务代码无需修改。

### Q: 如何连接多个数据库？

A: 在 `.env` 中启用多个数据库配置，每个模型指定对应的连接:

```python
# Tortoise ORM
class User(TimestampMixin):
    class Meta:
        app = "mysql_models"  # 使用 MySQL

class Log(TimestampMixin):
    class Meta:
        app = "postgres_models"  # 使用 PostgreSQL
```

### Q: 性能如何优化？

A: 主要优化点:
1. 使用连接池（自动配置）
2. 添加数据库索引
3. 避免 N+1 查询（使用 prefetch_related）
4. 使用批量操作
5. 只查询需要的字段

### Q: 如何进行数据库迁移？

A:
- **Tortoise ORM**: 使用 Aerich
- **SQLAlchemy**: 使用 Alembic

详见上面的"数据库迁移"章节。

---

## 参考资料

- [Tortoise ORM 官方文档](https://tortoise.github.io/)
- [SQLAlchemy 2.0 官方文档](https://docs.sqlalchemy.org/en/20/)
- [完整示例代码](../examples/sqlalchemy_example.py)
- [数据库配置详解](../infoman/config/db_relation.py)
- [模型基类源码](../infoman/service/models/base.py)


---


<a id="guide-tasks"></a>
## 定时任务调度器使用指南

infomankit 提供了完整的定时任务调度功能，基于 APScheduler 实现，支持自动任务发现、分布式锁和多种触发器。

## 📋 目录

- [快速开始](#快速开始)
- [安装](#安装)
- [基础用法](#基础用法)
- [装饰器详解](#装饰器详解)
- [分布式锁](#分布式锁)
- [任务管理](#任务管理)
- [配置选项](#配置选项)
- [最佳实践](#最佳实践)

---

## 快速开始

### 1. 安装依赖

```bash
pip install infomankit[scheduler]
```

### 2. 创建任务目录

```bash
mkdir -p app/tasks
touch app/tasks/__init__.py
```

### 3. 定义任务

创建 `app/tasks/daily_tasks.py`:

```python
from loguru import logger
from infoman.service.infrastructure.scheduler import cron, distributed_task

@cron(hour=0, minute=0)  # 每天 00:00 执行
@distributed_task(timeout=600)  # 使用分布式锁
async def daily_cleanup():
    """每日清理任务"""
    logger.info("开始每日清理...")
    # 你的业务逻辑
    logger.success("清理完成")
```

### 4. 启动应用

```python
from fastapi import FastAPI
from infoman.service.core.lifespan import lifespan

app = FastAPI(lifespan=lifespan)  # 自动加载并启动调度器
```

---

## 安装

### 基础安装

```bash
# 只安装调度器
pip install infomankit[scheduler]

# 完整安装（包含所有功能）
pip install infomankit[full]
```

### 可选依赖

如果需要分布式锁功能，还需要安装 Redis:

```bash
pip install infomankit[scheduler,cache]
```

---

## 基础用法

### Cron 表达式任务

```python
from infoman.service.infrastructure.scheduler import cron

# 每天特定时间
@cron(hour=8, minute=30)
async def morning_task():
    """每天 08:30 执行"""
    pass

# 每小时执行
@cron(minute=0)
async def hourly_task():
    """每小时整点执行"""
    pass

# 每 N 分钟
@cron(minute='*/15')
async def frequent_task():
    """每 15 分钟执行"""
    pass

# 工作日任务
@cron(day_of_week='mon-fri', hour=9, minute=0)
async def workday_task():
    """工作日 09:00 执行"""
    pass

# 每月第一天
@cron(day=1, hour=0, minute=0)
async def monthly_task():
    """每月 1 号 00:00 执行"""
    pass
```

### 间隔任务

```python
from infoman.service.infrastructure.scheduler import interval

# 每 N 秒/分钟/小时
@interval(seconds=30)
async def frequent_check():
    """每 30 秒执行"""
    pass

@interval(minutes=5)
async def regular_check():
    """每 5 分钟执行"""
    pass

@interval(hours=1)
async def hourly_sync():
    """每小时执行"""
    pass
```

### 一次性任务

```python
from infoman.service.infrastructure.scheduler import scheduled_task

@scheduled_task(trigger='date', run_date='2025-12-31 23:59:59')
async def new_year_task():
    """指定时间执行一次"""
    pass
```

---

## 装饰器详解

### @cron - Cron 表达式任务

```python
@cron(
    job_id='my_task',        # 任务ID（唯一标识）
    name='我的任务',          # 任务名称（用于展示）
    year=None,               # 年份
    month=None,              # 月份 (1-12)
    day=None,                # 日期 (1-31)
    week=None,               # ISO 周数 (1-53)
    day_of_week=None,        # 星期 (0-6 或 mon,tue,wed,thu,fri,sat,sun)
    hour=None,               # 小时 (0-23)
    minute=None,             # 分钟 (0-59)
    second=None,             # 秒 (0-59)
)
```

**示例:**

```python
# 每天 8:30
@cron(hour=8, minute=30)

# 每小时的第 0 和 30 分
@cron(minute='0,30')

# 每 10 分钟
@cron(minute='*/10')

# 工作日 9-17 点，每小时
@cron(day_of_week='mon-fri', hour='9-17', minute=0)

# 每月1号和15号的 00:00
@cron(day='1,15', hour=0, minute=0)
```

### @interval - 间隔任务

```python
@interval(
    job_id='my_interval_task',
    name='间隔任务',
    weeks=0,         # 周
    days=0,          # 天
    hours=0,         # 小时
    minutes=0,       # 分钟
    seconds=0,       # 秒
)
```

**示例:**

```python
# 每 30 秒
@interval(seconds=30)

# 每 5 分钟
@interval(minutes=5)

# 每 2 小时 30 分钟
@interval(hours=2, minutes=30)

# 每天
@interval(days=1)
```

### @scheduled_task - 通用任务

```python
@scheduled_task(
    trigger='cron',          # 触发器类型: cron, interval, date
    job_id='task_id',        # 任务ID
    name='任务名称',          # 任务名称
    **trigger_args           # 触发器参数
)
```

**示例:**

```python
# Cron 任务
@scheduled_task(trigger='cron', hour=0, minute=0)

# Interval 任务
@scheduled_task(trigger='interval', minutes=5)

# Date 任务（一次性）
@scheduled_task(trigger='date', run_date='2025-12-31 23:59:59')
```

---

## 分布式锁

在多实例部署时，使用分布式锁确保任务只在一个实例上执行。

### @distributed_task 装饰器

```python
from infoman.service.infrastructure.scheduler import cron, distributed_task

@cron(hour=0, minute=0)
@distributed_task(
    lock_key='my_unique_task',  # 锁的唯一键（默认使用函数名）
    timeout=600,                # 锁超时时间（秒），默认 300
    skip_locked=True            # 锁被占用时跳过执行，默认 True
)
async def my_task():
    """多实例环境下只有一个实例会执行"""
    pass
```

### 手动使用锁

```python
from infoman.service.infrastructure.scheduler import RedisDistributedLock
from infoman.service.app import application

@cron(hour=2, minute=0)
async def manual_lock_task():
    redis = application.state.redis
    lock = RedisDistributedLock(redis, "my_task", timeout=300)

    if await lock.acquire():
        try:
            # 执行任务
            logger.info("执行任务...")

            # 如果任务执行时间长，可以延期锁
            await lock.extend(additional_time=300)

        finally:
            await lock.release()
    else:
        logger.info("锁被其他实例占用，跳过执行")
```

### 上下文管理器方式

```python
@cron(hour=3, minute=0)
async def context_lock_task():
    from infoman.service.app import application

    redis = application.state.redis
    async with RedisDistributedLock(redis, "context_task", timeout=300):
        # 自动获取和释放锁
        logger.info("执行任务...")
```

---

## 任务管理

### 动态添加任务

```python
from fastapi import Request

@app.post("/scheduler/jobs")
async def add_job(request: Request):
    scheduler = request.app.state.scheduler_manager.scheduler

    async def dynamic_task():
        logger.info("动态任务执行中...")

    # 添加 Cron 任务
    job = scheduler.add_job(
        func=dynamic_task,
        trigger='cron',
        job_id='dynamic_task_1',
        hour=10,
        minute=30
    )

    return {"status": "ok", "job_id": job.id}
```

### 移除任务

```python
@app.delete("/scheduler/jobs/{job_id}")
async def remove_job(request: Request, job_id: str):
    scheduler = request.app.state.scheduler_manager.scheduler
    success = scheduler.remove_job(job_id)
    return {"status": "ok" if success else "error"}
```

### 暂停/恢复任务

```python
# 暂停
@app.post("/scheduler/jobs/{job_id}/pause")
async def pause_job(request: Request, job_id: str):
    scheduler = request.app.state.scheduler_manager.scheduler
    scheduler.pause_job(job_id)
    return {"status": "ok"}

# 恢复
@app.post("/scheduler/jobs/{job_id}/resume")
async def resume_job(request: Request, job_id: str):
    scheduler = request.app.state.scheduler_manager.scheduler
    scheduler.resume_job(job_id)
    return {"status": "ok"}
```

### 查询任务

```python
@app.get("/scheduler/jobs")
async def list_jobs(request: Request):
    scheduler = request.app.state.scheduler_manager.scheduler
    jobs = scheduler.get_jobs()

    return {
        "jobs": [
            {
                "id": job.id,
                "name": job.name,
                "next_run": job.next_run_time.isoformat() if job.next_run_time else None,
            }
            for job in jobs
        ]
    }
```

---

## 配置选项

### 环境变量配置

在 `.env` 或 `config/.env.{env}` 文件中配置:

```bash
# 是否启用调度器
SCHEDULER_ENABLED=true

# 时区
SCHEDULER_TIMEZONE=Asia/Shanghai

# 自定义任务目录
SCHEDULER_PACKAGE=app.tasks
SCHEDULER_PATH=./app/tasks
```

### 代码配置

```python
from infoman.config import settings

settings.SCHEDULER_ENABLED = True
settings.SCHEDULER_TIMEZONE = 'Asia/Shanghai'
settings.SCHEDULER_PACKAGE = 'app.tasks'
settings.SCHEDULER_PATH = './app/tasks'
```

---

## 最佳实践

### 1. 目录结构

推荐的项目结构:

```
app/
├── schedulers/              # 定时任务目录
│   ├── __init__.py
│   ├── daily_tasks.py      # 每日任务
│   ├── hourly_tasks.py     # 每小时任务
│   ├── monitoring.py       # 监控任务
│   └── _helpers.py         # 辅助函数（不会被自动加载）
├── main.py
└── .env
```

### 2. 任务命名

```python
# ✅ 推荐：清晰的任务ID和名称
@cron(
    job_id='daily_data_cleanup',
    name='每日数据清理',
    hour=0, minute=0
)
async def daily_data_cleanup():
    pass

# ❌ 不推荐：没有指定ID和名称
@cron(hour=0, minute=0)
async def task1():
    pass
```

### 3. 分布式锁使用

```python
# ✅ 推荐：关键任务使用分布式锁
@cron(hour=0, minute=0)
@distributed_task(lock_key='daily_backup', timeout=1800)
async def daily_backup():
    """数据备份（确保只执行一次）"""
    pass

# ✅ 推荐：监控任务不使用锁（每个实例都执行）
@interval(minutes=5)
async def health_check():
    """健康检查（每个实例独立检查）"""
    pass
```

### 4. 错误处理

```python
@cron(hour=0, minute=0)
@distributed_task(timeout=600)
async def robust_task():
    """健壮的任务实现"""
    logger.info("任务开始...")

    try:
        # 业务逻辑
        await do_something()
        logger.success("任务完成")

    except Exception as e:
        logger.error(f"任务失败: {e}", exc_info=True)
        # 发送告警
        # await send_alert(f"任务失败: {e}")
        raise  # 重新抛出异常，让调度器记录
```

### 5. 长时间运行的任务

```python
@cron(hour=1, minute=0)
@distributed_task(timeout=3600)
async def long_running_task():
    """长时间运行的任务"""
    from infoman.service.app import application

    redis = application.state.redis
    lock = RedisDistributedLock(redis, "long_task", timeout=1800)

    if await lock.acquire():
        try:
            # 每 10 分钟延期一次锁
            for i in range(6):
                await process_batch(i)
                await lock.extend(additional_time=600)

        finally:
            await lock.release()
```

### 6. 日志记录

```python
@cron(hour=0, minute=0)
@distributed_task(timeout=600)
async def well_logged_task():
    """良好的日志记录"""
    logger.info("📊 开始生成报表...")

    try:
        logger.info("  步骤1: 收集数据...")
        data = await collect_data()

        logger.info("  步骤2: 处理数据...")
        result = await process_data(data)

        logger.info("  步骤3: 生成报表...")
        await generate_report(result)

        logger.success("✅ 报表生成完成")

    except Exception as e:
        logger.error(f"❌ 报表生成失败: {e}")
        raise
```

---

## 常见问题

### Q: 任务没有被自动加载？

A: 检查以下几点:
1. 任务目录是否正确: `app/tasks/`
2. `__init__.py` 文件是否存在
3. 文件名不要以下划线开头
4. 配置中 `SCHEDULER_ENABLED=true`

### Q: 多实例部署时任务重复执行？

A: 使用 `@distributed_task` 装饰器，并确保 Redis 已正确配置。

### Q: 如何调试任务？

A: 使用日志和手动触发:

```python
# 在开发环境手动触发任务
@app.post("/debug/run-task/{task_name}")
async def debug_task(request: Request, task_name: str):
    scheduler = request.app.state.scheduler_manager.scheduler
    job = scheduler.get_job(task_name)

    if job:
        # 手动执行
        await job.func()
        return {"status": "ok"}
    else:
        return {"status": "error", "message": "任务不存在"}
```

---

## 参考资料

- [APScheduler 官方文档](https://apscheduler.readthedocs.io/)
- [Cron 表达式在线生成](https://crontab.guru/)
- [项目示例](../examples/scheduler_usage.py)
