Metadata-Version: 2.4
Name: ai4love-tools
Version: 0.0.9
Summary: 为爱觉醒工具库
Author-email: AI4Love <499878323@qq.com>
License: MIT
Keywords: ai4love
Classifier: Development Status :: 3 - Alpha
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: MIT License
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.12
Classifier: Topic :: Software Development :: Libraries :: Python Modules
Requires-Python: >=3.12
Description-Content-Type: text/markdown
Requires-Dist: fastapi>=0.115.0
Requires-Dist: sqlalchemy>=1.4.0
Provides-Extra: dev
Requires-Dist: pytest>=7.0.0; extra == "dev"
Requires-Dist: pytest-asyncio>=0.21.0; extra == "dev"
Requires-Dist: black>=23.0.0; extra == "dev"
Requires-Dist: ruff>=0.1.0; extra == "dev"
Requires-Dist: mypy>=1.0.0; extra == "dev"

# AI4Love 通用工具库

面向多项目复用的 Python 通用工具库，当前提供租户隔离模块，后续持续扩展更多模块。

## 模块概览

- `tenant`：租户隔离与上下文管理

## 租户模块功能特性

- **租户ID和用户ID提取**：从 HTTP 请求头中自动提取租户ID和用户ID
- **上下文存储**：使用 `contextvars` 实现线程安全的上下文变量存储
- **装饰器支持**：提供 `@extract_tenant_id` 和 `@with_tenant_context` 装饰器
- **SQL 自动封装**：自动为 SQL 语句附加租户条件
- **SQLAlchemy 集成**：在不改动业务代码的前提下，自动为 ORM 查询追加租户过滤

## 安装

```bash
pip install -e .
```

## 快速开始

### 1. 使用装饰器自动提取租户ID（推荐）

```python
from fastapi import FastAPI, Request
from ai4love_tools.tenant.decorators import extract_tenant_id
from ai4love_tools.tenant.context import get_tenant_id, get_user_id

app = FastAPI()

@app.get("/users")
@extract_tenant_id  # 自动从请求头提取租户ID和用户ID
async def get_users(request: Request):
    tenant_id = get_tenant_id()  # 从上下文获取租户ID
    user_id = get_user_id()  # 从上下文获取用户ID
    return {"tenant_id": tenant_id, "user_id": user_id}
```

### 2. 手动提取和设置

```python
from ai4love_tools.tenant.extractor import extract_tenant_id_from_request, extract_user_id_from_request
from ai4love_tools.tenant.context import set_tenant_id, set_user_id

def manual_extract(request):
    tenant_id = extract_tenant_id_from_request(request)
    user_id = extract_user_id_from_request(request)
    set_tenant_id(tenant_id)
    set_user_id(user_id)
```

### 3. 多线程/后台任务上下文传递

```python
from ai4love_tools.tenant.decorators import with_tenant_context
from ai4love_tools.tenant.context import get_tenant_id, get_user_id
from concurrent.futures import ThreadPoolExecutor

@with_tenant_context
def background_task(data):
    # 在新线程中，租户上下文已自动传递
    tenant_id = get_tenant_id()
    user_id = get_user_id()
    process_data(data)

executor = ThreadPoolExecutor()
executor.submit(background_task, data)
```

### 4. SQL 封装使用

```python
from ai4love_tools.tenant.sql import tenant_execute
from ai4love_tools.tenant.context import set_tenant_id

# 设置租户ID到上下文
set_tenant_id("tenant_123")

def list_users(conn):
    # 业务侧只关心业务条件
    base_sql = "SELECT id, name FROM users WHERE status = :status"
    params = {"status": "active"}

    # 底层自动从上下文获取租户ID，并追加 tenant 条件
    def executor(sql: str, params: dict):
        with conn.cursor() as cursor:
            cursor.execute(sql, params)
            return cursor.fetchall()

    rows = tenant_execute(executor, base_sql, params)
    return rows
```

### 5. SQLAlchemy ORM 自动租户过滤

#### 同步 SQLAlchemy

```python
from sqlalchemy import create_engine
from sqlalchemy.orm import Session
from ai4love_tools.tenant.sqlalchemy_integration import enable_sqlalchemy_tenant_isolation
from your_models import User

# 在应用启动时启用
engine = create_engine("sqlite:///db.sqlite")
enable_sqlalchemy_tenant_isolation(engine)

# 业务代码无需改动，自动追加租户过滤
with Session(engine) as session:
    # 自动追加 tenant_id 条件
    users = session.query(User).filter(User.status == "active").all()
```

#### 异步 SQLAlchemy（推荐）

```python
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
from sqlalchemy.orm import Session  # 必须导入 Session
from sqlalchemy import select
from ai4love_tools.tenant.sqlalchemy_integration import enable_sqlalchemy_tenant_isolation
from ai4love_tools.tenant.decorators import extract_tenant_id
from ai4love_tools.tenant.context import get_tenant_id
from your_models import User

# 1. 创建异步引擎
async_engine = create_async_engine("mysql+aiomysql://user:pass@localhost/db")

# 2. 启用租户隔离（在应用启动时调用一次）
enable_sqlalchemy_tenant_isolation(async_engine, require_tenant=True)

# 3. 创建 async_sessionmaker（重要：必须指定 sync_session_class=Session）
async_session_factory = async_sessionmaker(
    async_engine,
    expire_on_commit=False,
    class_=AsyncSession,
    sync_session_class=Session  # 必须指定这个参数，才能让 do_orm_execute 事件生效
)

# 4. 在 FastAPI 路由中使用
from fastapi import FastAPI, Depends

app = FastAPI()

async def get_db():
    """获取数据库会话的依赖项"""
    async with async_session_factory() as session:
        yield session
        await session.commit()

@app.get("/users")
@extract_tenant_id(required_tenant_id=True)  # 从请求头提取租户ID
async def get_users(db: AsyncSession = Depends(get_db)):
    # 租户ID已自动从请求头提取并设置到上下文
    # 查询时会自动添加 tenant_id 条件
    stmt = select(User).where(User.status == "active")
    result = await db.execute(stmt)
    users = result.scalars().all()
    return users
```

#### 完整示例：MySQLManager 集成

```python
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
from sqlalchemy.orm import Session  # 必须导入
from sqlalchemy import event
import time
import logging
from ai4love_tools.tenant.sqlalchemy_integration import enable_sqlalchemy_tenant_isolation

logger = logging.getLogger(__name__)

class MySQLManager:
    def __init__(self, url: str):
        # 1. 创建异步引擎
        self.engine = create_async_engine(
            url,
            echo=False,
            pool_size=25,
            max_overflow=500,
            pool_timeout=15,
            pool_recycle=3600,
            pool_pre_ping=True,
            connect_args={
                "charset": "utf8mb4",
                "ssl": False,
                "autocommit": False,
                "connect_timeout": 10
            },
            future=True
        )

        # 2. 设置 SQL 日志（使用 sync_engine）
        self._setup_sql_logging()

        # 3. 启用租户隔离（在引擎创建后立即调用）
        enable_sqlalchemy_tenant_isolation(
            engine_or_session_factory=self.engine,
            require_tenant=True  # 强制要求租户ID
        )

        # 4. 创建 async_sessionmaker（重要：必须指定 sync_session_class=Session）
        self.async_session_factory = async_sessionmaker(
            self.engine,
            expire_on_commit=False,
            class_=AsyncSession,
            sync_session_class=Session  # 必须指定，才能让 do_orm_execute 事件生效
        )

    def _setup_sql_logging(self):
        """设置SQL执行日志监听器"""
        @event.listens_for(self.engine.sync_engine, "before_cursor_execute")
        def before_cursor_execute(conn, cursor, statement, parameters, context, executemany):
            conn.info.setdefault('query_start_time', []).append(time.time())

        @event.listens_for(self.engine.sync_engine, "after_cursor_execute")
        def after_cursor_execute(conn, cursor, statement, parameters, context, executemany):
            if conn.info.get('query_start_time'):
                total = time.time() - conn.info['query_start_time'].pop(-1)
                sql = ' '.join(str(statement).strip().split())
                rows = cursor.rowcount if cursor.rowcount >= 0 else 0
                elapsed_ms = int(total * 1000)
                logger.info(f"SQL: {sql} (耗时: {elapsed_ms}ms) | rows={rows}")

    def get_session(self) -> AsyncSession:
        """获取数据库会话"""
        return self.async_session_factory()
```

## API 文档

### 上下文管理

- `get_tenant_id()`: 获取当前上下文中的租户ID
- `get_user_id()`: 获取当前上下文中的用户ID
- `set_tenant_id(tenant_id)`: 设置租户ID到上下文
- `set_user_id(user_id)`: 设置用户ID到上下文

### 装饰器

- `@extract_tenant_id`: 自动从请求头提取租户ID和用户ID并保存到上下文
- `@with_tenant_context`: 自动复制租户上下文到新线程/任务

### SQL 封装

- `tenant_execute(executor, sql, params, ...)`: 执行包含租户条件的 SQL 语句
- `build_tenant_sql(sql, ...)`: 构建包含租户条件的 SQL 语句
- `build_tenant_params(params, tenant_id, ...)`: 构建包含租户ID的参数

### SQLAlchemy 集成

- `enable_sqlalchemy_tenant_isolation(engine_or_session_factory, ...)`: 启用 SQLAlchemy ORM 自动租户过滤

## 依赖要求

- Python >= 3.12
- fastapi >= 0.115.0
- sqlalchemy >= 1.4.0

## 注意事项

- contextvars 无法跨进程传递，进程池场景需要显式传递租户ID作为参数
- 装饰器是给其他项目使用的，library 只负责提供装饰器函数，不负责具体的任务调度逻辑
- library 本身不包含业务逻辑，只提供工具函数和装饰器
- 尽量保持框架无关性，支持多种 Web 框架

## 运行测试

```bash
# 安装开发依赖
pip install -e ".[dev]"

# 运行所有测试
pytest

# 运行特定测试文件
pytest tests/test_context.py

# 运行测试并显示覆盖率
pytest --cov=src/ai4love_tools --cov-report=html
```

## 测试覆盖

测试覆盖以下模块：
- `test_context.py` - 上下文存储管理测试
- `test_extractor.py` - 租户ID和用户ID提取器测试
- `test_decorators.py` - 装饰器功能测试
- `test_sql.py` - SQL 封装函数测试
- `test_sqlalchemy_integration.py` - SQLAlchemy 集成测试
- `test_integration.py` - 集成测试（多模块协同工作）

## 许可证

MIT
