Metadata-Version: 2.1
Name: mqttxx
Version: 3.2.2
Summary: 基于 aiomqtt 的高级 MQTT 客户端和 RPC 框架
Author: MQTTX Team
License: MIT
Keywords: mqtt,rpc,async,iot,messaging
Description-Content-Type: text/markdown
License-File: LICENSE
Requires-Dist: aiomqtt<3.0.0,>=2.0.0
Requires-Dist: loguru>=0.7.0
Provides-Extra: dev
Requires-Dist: pytest; extra == "dev"
Requires-Dist: pytest-asyncio; extra == "dev"
Requires-Dist: pytest-rich; extra == "dev"
Requires-Dist: ruff; extra == "dev"
Requires-Dist: build; extra == "dev"
Requires-Dist: twine; extra == "dev"

# MQTTX

[![PyPI version](https://img.shields.io/badge/version-3.2.0-blue.svg)](https://pypi.org/project/mqttxx/)
[![Python](https://img.shields.io/badge/python-3.10+-blue.svg)](https://www.python.org/downloads/)
[![License](https://img.shields.io/badge/license-MIT-green.svg)](LICENSE)

基于 [aiomqtt](https://github.com/sbtinstruments/aiomqtt) 的高级 MQTT 客户端和 RPC 框架。

## 核心特性

- ✅ **纯 async/await** - 无回调，代码清晰
- ✅ **高性能消息处理** - Queue + Worker 模式，可控并发，背压机制
- ✅ **自动重连** - 订阅队列化，断线重连
- ✅ **双向对等 RPC** - 带权限控制、超时管理
- ✅ **约定式 RPC** - 零配置，自动订阅、自动注入 reply_to
- ✅ **Event Channel** - 高吞吐事件广播，支持通配符订阅
- ✅ **TLS/SSL 支持** - 安全连接、双向认证
- ✅ **完善异常系统** - 统一错误码、清晰的异常层次
- ✅ **生产级可靠性** - 修复所有 P0 缺陷（任务泄漏、资源泄漏、并发竞态）

---

## 目录

- [架构原则](#架构原则)
- [安装](#安装)
- [快速开始](#快速开始)
  - [MQTT 基础用法](#1-mqtt-基础用法)
  - [约定式 RPC（推荐）](#2-约定式-rpc零配置)
  - [Event Channel（事件广播）](#3-event-channel事件广播)
  - [传统 RPC](#4-rpc-基础用法传统模式)
  - [RPC 权限控制](#5-rpc-权限控制)
  - [TLS/SSL 和认证](#6-tlsssl-和认证)
- [架构与实现](#架构与实现)
- [API 速查](#api-速查)
- [配置对象](#配置对象)
- [异常系统](#异常系统)
- [RPC 消息协议](#rpc-消息协议)
- [版本变更](#版本变更)
- [开发](#开发)
- [示例项目](#示例项目)
- [常见问题](#常见问题)
- [贡献](#贡献)

---

## 架构原则

MQTTX 遵循严格的分层架构，确保职责清晰、代码简洁、易于扩展：

### 1. MQTTClient（传输层）
- **职责**：只负责 bytes 传输
- **约束**：
  - 不导入 `json` 或 `protocol` 模块
  - 不理解消息内容
  - `publish()` 只接受 `bytes`
  - `subscribe()` handler 接收 `bytes`

### 2. Protocol（协议层）
- **职责**：定义消息格式和 JSON 编解码
- **核心**：
  - `encode()` 方法：object → bytes（使用 JSON）
  - `decode()` 方法：bytes → object（使用 JSON）
  - 所有 JSON 操作只在这一层

### 3. RPCManager/EventChannelManager（应用层）
- **职责**：业务逻辑处理
- **约束**：
  - 内部永远使用类型化对象（RPCRequest/RPCResponse/EventMessage）
  - 调用 `encode()`/`decode()` 处理编解码
  - 不直接使用 `json.dumps()`/`json.loads()`

**设计理念**：固定使用 JSON 编解码，简化架构，降低复杂度。如需自定义协议，可在传输层直接使用 `MQTTClient.subscribe()` 处理 bytes。

---

## 安装

```bash
pip install mqttxx
```

**要求**:
- Python >= 3.10
- aiomqtt >= 2.0.0
- loguru >= 0.7.0

---

## 快速开始

### 1. MQTT 基础用法

```python
import asyncio
from mqttxx import MQTTClient, MQTTConfig

async def main():
    config = MQTTConfig(
        broker_host="localhost",
        broker_port=1883,
        client_id="device_123"
    )

    async with MQTTClient(config) as client:
        # 订阅主题
        def on_message(topic, message):
            print(f"{topic}: {message}")

        client.subscribe("sensors/#", on_message)

        # 发布消息
        await client.publish("sensors/temperature", "25.5", qos=1)

        await asyncio.sleep(60)

asyncio.run(main())
```

---

### 2. 约定式 RPC（零配置）

**推荐**：使用 `RPCManager` 的 `my_topic` 参数，自动订阅 + 自动注入 reply_to。

```python
from mqttxx import MQTTClient, MQTTConfig, RPCManager

# 边缘设备
async def edge_device():
    client_id = "device_123"
    config = MQTTConfig(
        broker_host="localhost",
        client_id=client_id,
    )

    async with MQTTClient(config) as client:
        # 自动订阅 edge/device_123
        rpc = RPCManager(client, my_topic=f"edge/{client_id}")

        @rpc.register("get_status")
        async def get_status(params):
            return {"status": "online"}

        # 调用云端（自动注入 reply_to="edge/device_123"）
        config = await rpc.call("cloud/config-service", "get_device_config")
        print(config)

        await asyncio.sleep(60)

# 云端服务
async def cloud_service():
    client_id = "config-service"
    config = MQTTConfig(
        broker_host="localhost",
        client_id=client_id,
    )

    async with MQTTClient(config) as client:
        # 自动订阅 cloud/config-service
        rpc = RPCManager(client, my_topic=f"cloud/{client_id}")

        @rpc.register("get_device_config")
        async def get_device_config(params):
            return {"update_interval": 60, "servers": ["s1", "s2"]}

        # 调用边缘设备（自动注入 reply_to="cloud/config-service"）
        status = await rpc.call("edge/device_123", "execute_command", params={"cmd": "restart"})
        print(status)

        await asyncio.sleep(60)

# 运行边缘设备或云端
asyncio.run(edge_device())  # 或 asyncio.run(cloud_service())
```

**对比传统 RPC：**

| 场景 | 传统 RPC | 约定式 RPC |
|-----|---------|-----------|
| 初始化 | `rpc = RPCManager(client)`<br>`client.subscribe("edge/123", rpc.handle_rpc_message)` | `rpc = RPCManager(client, my_topic="edge/123")`<br>→ 自动订阅 |
| 调用 | `await rpc.call(topic="cloud/svc", method="get", reply_to="edge/123")` | `await rpc.call("cloud/svc", "get")` |
| 代码量 | 100% | **60%** ↓ |

---

### 3. Event Channel（事件广播）

**Event Channel 是高吞吐、低耦合、无返回值的事件广播通道**，适用于：
- 传感器数据流（温度、湿度、位置）
- 系统监控指标（CPU、内存、网络）
- 设备状态心跳
- 日志流

**关键特性**：
- ✅ **单向广播** - 发布即忘，无返回值（与 RPC 形成对比）
- ✅ **通配符订阅** - 支持 MQTT 通配符（`+` 单级，`#` 多级）
- ✅ **混合模式** - 支持结构化事件（EventMessage）和原始 dict
- ✅ **装饰器订阅** - 简洁的 API

```python
from mqttxx import MQTTClient, MQTTConfig, EventChannelManager, EventMessage

async def main():
    config = MQTTConfig(broker_host="localhost", client_id="device_001")

    async with MQTTClient(config) as client:
        events = EventChannelManager(client)

        # 订阅事件（支持通配符）
        @events.subscribe("sensors/+/temperature")
        async def on_temperature(topic, message):
            print(f"[温度] {topic}: {message}")

        @events.subscribe("sensors/#")
        async def on_all_sensors(topic, message):
            print(f"[所有传感器] {topic}")

        # 发布结构化事件
        await events.publish(
            "sensors/room1/temperature",
            EventMessage(
                event_type="temperature.changed",
                data={"value": 25.5, "unit": "C"},
                source="sensor_001"
            )
        )

        # 发布原始消息（零开销）
        await events.publish(
            "sensors/room1/humidity",
            {"value": 60.2, "unit": "%"}
        )

        await asyncio.sleep(60)

asyncio.run(main())
```

**Event Channel vs RPC**：

| 特性 | Event Channel | RPC |
|------|--------------|-----|
| 模式 | 发布-订阅（Pub-Sub） | 请求-响应（Request-Response） |
| 通信 | 单向，一对多广播 | 双向，点对点 |
| 返回值 | ❌ 无返回值 | ✅ 等待返回结果 |
| 用途 | 高频事件流、监控数据 | 远程方法调用、配置查询 |
| 通配符 | ✅ 支持 `+/#` | ❌ 精确匹配 |

**混合使用 RPC + Event**：

```python
# 同时使用 RPC 和 Event Channel
rpc = RPCManager(client, my_topic="device/device_001")
events = EventChannelManager(client)

# RPC: 获取配置（需要返回值）
config = await rpc.call("server/config", "get_device_config")

# Event: 发布心跳（无返回值）
await events.publish(
    "device/device_001/heartbeat",
    EventMessage(
        event_type="heartbeat",
        data={"cpu": 45.2, "memory": 78.5}
    )
)
```

#### 延迟注册（可选）

如果需要在模块导入时注册事件处理器，可以使用模块级 `event_subscribe` 函数：

```python
# handlers.py
from mqttxx.events import event_subscribe

@event_subscribe("sensors/+/temperature")
async def on_temperature(topic, message):
    print(f"温度: {message}")

# main.py
import handlers  # 触发装饰器执行

events = EventChannelManager(client)  # 自动导入所有延迟注册的处理器
```

**使用对比**：
- `@events.subscribe(pattern)` - 立即订阅，需要 EventChannelManager 实例
- `@event_subscribe(pattern)` - 延迟订阅，模块级函数，不需要实例

适用场景：
- 事件处理器分散在多个模块
- 需要在模块导入时自动注册
- 插件化架构

注意：延迟注册使用全局注册表，建议单例模式使用。

---

### 4. RPC 基础用法（传统模式）

需要手动订阅和传递 `reply_to`，适用于需要精细控制的场景。

```python
from mqttxx import MQTTClient, MQTTConfig, RPCManager

async def main():
    config = MQTTConfig(broker_host="localhost", client_id="device_001")

    async with MQTTClient(config) as client:
        rpc = RPCManager(client)

        # 注册本地方法
        @rpc.register("get_status")
        async def get_status(params):
            return {"status": "online", "cpu": 45.2}

        # 订阅 RPC 主题
        client.subscribe(
            "server/device_001",
            rpc.handle_rpc_message
        )

        # 调用远程方法
        result = await rpc.call(
            topic="bots/device_002",
            method="get_data",
            reply_to="server/device_001",
            timeout=5
        )
        print(result)  # {"data": [1, 2, 3]}

        await asyncio.sleep(60)

asyncio.run(main())
```

---

### 5. RPC 权限控制

```python
from mqttxx import RPCManager, RPCRequest

async def auth_check(caller_id: str, method: str, request: RPCRequest) -> bool:
    # 敏感方法只允许管理员
    if method in ["delete_user", "reset_system"]:
        return caller_id in ["admin_001", "admin_002"]
    return True

rpc = RPCManager(client, auth_callback=auth_check)

@rpc.register("delete_user")
async def delete_user(params):
    return {"result": "user deleted"}

# 未授权调用会返回 "Permission denied"
```

---

### 6. TLS/SSL 和认证

```python
from mqttxx import MQTTConfig, TLSConfig, AuthConfig
from pathlib import Path

config = MQTTConfig(
    broker_host="secure.mqtt.example.com",
    broker_port=8883,
    tls=TLSConfig(
        enabled=True,
        ca_certs=Path("ca.crt"),
        certfile=Path("client.crt"),
        keyfile=Path("client.key"),
    ),
    auth=AuthConfig(
        username="mqtt_user",
        password="mqtt_password",
    ),
)

async with MQTTClient(config) as client:
    await client.publish("secure/topic", "encrypted message")
```

---

## 架构与实现

深入了解 MQTTX 的技术实现细节：

- **[消息路由实现逻辑](docs/routing_flow.md)** - 使用 Mermaid 图表说明从 MQTT Broker 到最终处理器的完整消息流程、订阅注册机制、通配符匹配算法，以及关键代码位置
- **[约定式 RPC 用法详解](docs/约定式RPC用法.md)** - 深入解析零配置 RPC 调用的设计原理、自动订阅机制、reply_to 自动注入，以及与传统 RPC 的对比

**推荐阅读顺序**：先通过上方的"快速开始"掌握基本用法，再深入技术文档了解底层实现。

---

## API 速查

### MQTTClient

```python
class MQTTClient:
    def __init__(self, config: MQTTConfig)
    async def connect(self) -> None
    async def disconnect(self) -> None
    def subscribe(self, topic: str, handler: Callable) -> None
    async def publish(self, topic: str, payload: str, qos: int = 0) -> None

    @property
    def is_connected(self) -> bool
```

---

### RPCManager（传统 RPC）

```python
class RPCManager:
    def __init__(
        self,
        client: MQTTClient,
        my_topic: Optional[str] = None,
        config: RPCConfig = None,
        auth_callback: AuthCallback = None
    )

    def register(self, method_name: str)  # 装饰器
    def unregister(self, method_name: str) -> None
    def handle_rpc_message(self, topic: str, message: RPCRequest | RPCResponse) -> None

    async def call(
        self,
        topic: str,
        method: str,
        params: Any = None,
        reply_to: str = None,  # 必填
        timeout: float = None,
    ) -> Any
```

---

### RPCManager（约定式用法）

```python
class RPCManager:
    def __init__(
        self,
        client: MQTTClient,
        my_topic: Optional[str] = None,  # 本节点 topic（自动订阅，自动注入到 reply_to）
        config: RPCConfig = None,
        auth_callback: AuthCallback = None,
    )

    async def call(
        self,
        topic: str,        # 对方的 topic
        method: str,
        params: Any = None,
        timeout: float = None,
        reply_to: Optional[str] = None,  # 可选，默认使用 my_topic
    ) -> Any

    # 属性
    my_topic: Optional[str]      # 当前 topic（只读）
```

**使用示例：**

```python
# 边缘设备
rpc = RPCManager(client, my_topic="edge/device_123")
config = await rpc.call("cloud/config-service", "get_config")

# 云端服务
rpc = RPCManager(client, my_topic="cloud/config-service")
status = await rpc.call("edge/device_123", "execute_command")

# 微服务
rpc = RPCManager(client, my_topic="auth-service")
user = await rpc.call("user-service", "get_user", params={"id": 123})
```

---

### EventChannelManager（Event Channel）

```python
class EventChannelManager:
    def __init__(self, client: MQTTClient)

    def subscribe(
        self,
        pattern: str,           # MQTT topic 模式（支持通配符 +/#）
        handler: Optional[EventHandler] = None
    ) -> Callable  # 装饰器或直接注册

    def unsubscribe(self, pattern: str, handler: EventHandler) -> None

    async def publish(
        self,
        topic: str,             # MQTT topic
        message: EventMessage | dict | Any,  # 消息（支持多种格式）
        qos: int = 0
    ) -> None  # 无返回值（单向广播）
```

**EventMessage（可选的结构化格式）**：

```python
@dataclass
class EventMessage:
    type: str = "event"
    event_type: str         # 事件类型（如 "temperature.changed"）
    data: Any               # 事件数据
    timestamp: float        # 时间戳（自动生成）
    source: str = ""        # 事件源

    def to_dict(self) -> dict
    @staticmethod
    def from_dict(data: dict) -> EventMessage
```

**使用示例：**

```python
events = EventChannelManager(client)

# 订阅（装饰器模式）
@events.subscribe("sensors/+/temperature")
async def on_temp(topic, message):
    print(f"{topic}: {message}")

# 发布结构化事件
await events.publish(
    "sensors/room1/temperature",
    EventMessage(
        event_type="temperature.changed",
        data={"value": 25.5}
    )
)

# 发布原始消息（零开销）
await events.publish("sensors/room1/humidity", {"value": 60.2})
```

---

## 配置对象

### MQTTConfig

```python
@dataclass
class MQTTConfig:
    broker_host: str
    broker_port: int = 1883
    client_id: str = ""                    # 空字符串 = 自动生成
    keepalive: int = 60
    clean_session: bool = False
    tls: TLSConfig = field(default_factory=TLSConfig)
    auth: AuthConfig = field(default_factory=AuthConfig)
    reconnect: ReconnectConfig = field(default_factory=ReconnectConfig)
    max_queued_messages: int = 0           # 0 = 无限
    max_payload_size: int = 1024 * 1024    # 1MB

    # 高性能消息处理（v3.0+）
    message_queue_maxsize: int = 100_000   # 消息队列大小（保险丝，防止 OOM）
    num_workers: Optional[int] = None       # Worker 数量（None = CPU核数×2，适合 IO-bound）
```

**高性能配置说明**：

- **message_queue_maxsize**：消息队列容量限制
  - 默认 100,000（"几乎无限"，仅作保险丝）
  - 队列满时阻塞等待（背压信号）
  - 触发背压时：CPU/延迟升高 → 扩容信号
  - Python 字面量分隔符：`100_000 = 100000`（下划线仅用于可读性）

- **num_workers**：消息处理 Worker 数量
  - `None`（默认）：CPU核数 × 2（适合 IO-bound 负载）
  - 自定义值：根据 handler 类型调整
    - CPU-bound handler：设为 CPU核数
    - IO-bound handler：设为 CPU核数 × 2~4

### TLSConfig

```python
@dataclass
class TLSConfig:
    enabled: bool = False
    ca_certs: Optional[Path] = None
    certfile: Optional[Path] = None
    keyfile: Optional[Path] = None
    verify_mode: str = "CERT_REQUIRED"     # CERT_REQUIRED | CERT_OPTIONAL | CERT_NONE
    check_hostname: bool = True
```

### AuthConfig

```python
@dataclass
class AuthConfig:
    username: Optional[str] = None
    password: Optional[str] = None
```

### ReconnectConfig

```python
@dataclass
class ReconnectConfig:
    enabled: bool = True
    interval: int = 5                      # 初始重连间隔（秒）
    max_attempts: int = 0                  # 0 = 无限重试
    backoff_multiplier: float = 1.5        # 指数退避倍数
    max_interval: int = 60                 # 最大重连间隔（秒）
```

### RPCConfig

```python
@dataclass
class RPCConfig:
    default_timeout: float = 30.0          # 默认超时时间（秒）
    max_concurrent_calls: int = 100        # 最大并发调用数
```

---

## 异常系统

```python
# 基础异常
class MQTTXError(Exception)
class ConnectionError(MQTTXError)
class MessageError(MQTTXError)
class RPCError(MQTTXError)

# RPC 异常
class RPCTimeoutError(RPCError)            # RPC 调用超时
class RPCRemoteError(RPCError)             # 远程方法执行失败
class RPCMethodNotFoundError(RPCError)     # 方法未找到
class PermissionDeniedError(RPCError)      # 权限拒绝
class TooManyConcurrentCallsError(RPCError)  # 并发调用超限

# 错误码
class ErrorCode(IntEnum):
    NOT_CONNECTED = 1001
    RPC_TIMEOUT = 3002
    PERMISSION_DENIED = 4001
    # ... 更多错误码见源码
```

**使用示例：**

```python
from mqttxx import RPCTimeoutError, RPCRemoteError

try:
    result = await rpc.call_bot("456", "get_data", timeout=5)
except RPCTimeoutError:
    print("调用超时")
except RPCRemoteError as e:
    print(f"远程方法执行失败: {e}")
```

---

## RPC 消息协议

### 请求

```json
{
  "type": "rpc_request",
  "request_id": "uuid-string",
  "method": "get_status",
  "params": {"id": 123},
  "reply_to": "server/device_001",
  "caller_id": "device_002"
}
```

### 响应（成功）

```json
{
  "type": "rpc_response",
  "request_id": "uuid-string",
  "result": {"status": "online"}
}
```

### 响应（错误）

```json
{
  "type": "rpc_response",
  "request_id": "uuid-string",
  "error": "Permission denied"
}
```

---

## 版本变更

### v3.2.0（当前版本）

**架构简化**：
- ✅ **移除可插拔编解码器** - 固定使用 JSON 编解码，简化架构设计
- ✅ **整合约定式 RPC** - `my_topic` 参数集成到 `RPCManager`，移除独立的 `ConventionalRPCManager`
- ✅ **整合传输层 API** - `RawAPI` 功能整合到 `MQTTClient`，统一接口

**迁移指南（从 v3.1.x）**：
1. **移除 Codec 相关代码** - 不再支持自定义编解码器（如 MessagePack），统一使用 JSON
   ```python
   # ❌ 旧代码（不再支持）
   rpc = RPCManager(client, codec=MessagePackCodec)

   # ✅ 新代码（固定 JSON）
   rpc = RPCManager(client)
   ```
2. **使用约定式 RPC** - 推荐使用 `my_topic` 参数简化代码
   ```python
   # ✅ 推荐写法
   rpc = RPCManager(client, my_topic="edge/device_123")
   result = await rpc.call("cloud/server", "get_config")  # 自动注入 reply_to
   ```

---

### v2.0.0 重大变更

从 v2.0.0 开始，完全重写为基于 aiomqtt（纯 async/await），**不兼容** v0.x.x（gmqtt）。

**主要变化：**
- ✅ aiomqtt 替代 gmqtt
- ✅ 原生 dataclass 替代 python-box（性能提升 6 倍）
- ✅ **修复所有 P0 缺陷**（详见下方）
- ✅ 新增约定式 RPC（`RPCManager` 的 `my_topic` 参数）
- ✅ 新增权限控制（`auth_callback`）
- ✅ 新增 TLS/SSL 支持

**P0 缺陷修复（v3.0）**：
1. **任务泄漏 + 消息处理模型** - Queue + Worker 模式，可控并发
2. **取消订阅功能缺失** - 实现完整的取消订阅功能
3. **重连竞态条件** - 快照模式避免并发修改
4. **连接资源泄漏** - 显式关闭连接

**并发模型说明**：
- ✅ **单 Event Loop 设计**：所有方法（subscribe/unsubscribe/handler）必须在同一 asyncio loop 中调用
- ✅ **Handler 顺序执行**：同一消息的多个 handlers 按注册顺序顺序调用（非并发）
- ❌ **不支持多线程/多 loop 并发**：如需多线程，请为每个线程创建独立的 MQTTClient 实例

**迁移关键点：**
1. 使用 `MQTTConfig` 配置对象
2. 使用 `async with` 上下文管理器
3. `publish_message()` → `publish()`
4. 移除 `EventEmitter`（改用 dict）
5. （可选）配置 `message_queue_maxsize` 和 `num_workers` 以优化性能

---

## 开发

```bash
# 克隆项目
git clone <repository-url>
cd mqttx

# 安装开发依赖
pip install -e ".[dev]"

# 运行测试
pytest tests/ -v

# 代码检查和格式化
make lint    # 代码检查
make format  # 代码格式化

# 构建和发布
make build   # 构建分发包
make version # 查看当前版本
```

**测试覆盖**:
- 单元测试 (`test_*`)
- 集成测试 (`test_integration`)
- P0 缺陷修复验证 (`test_p0_fixes`)
- 性能测试 (`test_performance`)

---

## 示例项目

查看 [examples/](examples/) 目录获取完整示例:

- **conventional_rpc_generic.py** - 约定式 RPC 完整示例
  - 边缘设备与云端服务通信
  - 微服务之间的 RPC 调用
  - 展示自动订阅和自动注入 reply_to

- **event_channel_basic.py** - Event Channel 基础示例
  - 订阅事件（支持通配符）
  - 发布结构化事件和原始消息
  - 一个 topic 多个订阅者

- **rpc_event_mixed.py** - RPC 和 Event Channel 混合使用
  - 同一客户端同时使用 RPC 和 Event Channel
  - RPC 调用（请求-响应）
  - Event 广播（单向，无返回值）

运行示例:
```bash
# RPC 示例
# 终端 1: 运行边缘设备
python examples/conventional_rpc_generic.py edge

# 终端 2: 运行云端服务
python examples/conventional_rpc_generic.py cloud

# Event Channel 示例
python examples/event_channel_basic.py

# RPC + Event 混合使用
# 终端 1: 运行设备端
python examples/rpc_event_mixed.py device

# 终端 2: 运行服务器端
python examples/rpc_event_mixed.py server
```

---

## 常见问题

### Q: 如何处理断线重连？
A: MQTTClient 默认启用自动重连，配置参数在 `ReconnectConfig` 中：
```python
config = MQTTConfig(
    broker_host="localhost",
    reconnect=ReconnectConfig(
        enabled=True,
        interval=5,           # 初始重连间隔
        max_attempts=0,       # 0 = 无限重试
        backoff_multiplier=1.5,
        max_interval=60
    )
)
```

### Q: RPC 调用超时了怎么办？
A: 捕获 `RPCTimeoutError` 异常并处理：
```python
from mqttxx import RPCTimeoutError

try:
    result = await rpc.call("topic", "method", timeout=5)
except RPCTimeoutError:
    print("RPC 调用超时，请检查远程服务是否在线")
```

### Q: 如何实现双向认证（mTLS）？
A: 配置 TLS 证书和密钥：
```python
config = MQTTConfig(
    broker_host="secure.mqtt.example.com",
    broker_port=8883,
    tls=TLSConfig(
        enabled=True,
        ca_certs=Path("ca.crt"),      # CA 证书
        certfile=Path("client.crt"),  # 客户端证书
        keyfile=Path("client.key"),   # 客户端私钥
    )
)
```

### Q: 约定式 RPC 和传统 RPC 有什么区别？
A:
- **约定式 RPC**: 自动订阅 `my_topic`，自动注入 `reply_to`，代码量减少 40%
- **传统 RPC**: 需要手动订阅、手动传递 `reply_to`，适合需要精细控制的场景

推荐大多数场景使用约定式 RPC。

### Q: 如何优化高吞吐场景的性能？
A: 根据实际负载调整消息处理配置：
```python
import os

config = MQTTConfig(
    broker_host="localhost",
    # 队列容量（默认 100k 适合大多数场景）
    message_queue_maxsize=100_000,

    # Worker 数量（根据 handler 类型调整）
    num_workers=os.cpu_count() * 2,  # IO-bound（默认）
    # num_workers=os.cpu_count(),    # CPU-bound
)
```

**性能监控**：
- 监控队列深度：如果队列长期接近满载，考虑增加 workers
- 监控 CPU/延迟：队列满时会触发背压，CPU/延迟会升高
- Handler 并发：默认顺序执行，如需并发请在 handler 内使用 `asyncio.create_task()`

### Q: Handlers 是并发执行的吗？
A: 不是。同一消息的多个 handlers 按注册顺序**顺序执行**（非并发）：
```python
# 顺序处理（默认）
async def handler1(topic, payload):
    await process_data(payload)  # 阻塞后续 handlers

# 并发处理（手动）
async def handler2(topic, payload):
    asyncio.create_task(process_async(payload))  # 不阻塞
```

这是有意的设计，确保消息处理的可预测性。如需并发，请在 handler 内部创建 task。

---

## 贡献

欢迎提交 Issue 和 Pull Request！

在提交 PR 之前，请确保:
1. 通过所有测试 (`pytest tests/ -v`)
2. 代码通过检查 (`make lint`)
3. 代码已格式化 (`make format`)

---

## License

MIT
