Metadata-Version: 2.4
Name: lmp-sdk
Version: 2.0.9
Summary: 推理服务 Python SDK
Home-page: https://github.com/lixiang/lmp-sdk-python
Author: LMP SDK Team
Project-URL: Homepage, https://github.com/lixiang/lmp-sdk-python
Project-URL: Bug Tracker, https://github.com/lixiang/lmp-sdk-python/issues
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.7
Classifier: Programming Language :: Python :: 3.8
Classifier: Programming Language :: Python :: 3.9
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: License :: OSI Approved :: MIT License
Classifier: Operating System :: OS Independent
Requires-Python: >=3.7
Description-Content-Type: text/markdown
Requires-Dist: requests>=2.25.0
Requires-Dist: lpai_asset>=2.3.38
Provides-Extra: dev
Requires-Dist: pytest>=6.0; extra == "dev"
Requires-Dist: pytest-cov>=2.0; extra == "dev"
Requires-Dist: black>=21.0; extra == "dev"
Requires-Dist: flake8>=3.9; extra == "dev"
Requires-Dist: mypy>=0.900; extra == "dev"
Dynamic: home-page
Dynamic: requires-python

# LMP SDK - Python SDK

[![Python Version](https://img.shields.io/badge/python-3.7%2B-blue.svg)](https://www.python.org/downloads/)
[![License](https://img.shields.io/badge/license-MIT-green.svg)](LICENSE)
[![Version](https://img.shields.io/badge/version-2.0.7-orange.svg)](https://pypi.org/project/lmp-sdk/)

LMP SDK 是lpai-llm平台的官方 Python SDK，提供简洁易用的接口来访问平台所部署的公共推理服务，支持文本、图片和视频的多模态输入。

## ✨ 特性

- 🚀 **简单易用**：提供两种使用模式，满足不同场景需求
- 🔄 **异步处理**：支持异步任务提交和自动结果轮询
- 📦 **批量处理**：高效的批量请求处理能力
- 🛡️ **稳定可靠**：内置重试机制、任务持久化和优雅关闭
- 🎯 **多模态支持**：支持文本、图片URL、视频URL多种输入类型
- 🔧 **灵活配置**：丰富的配置选项，适应各种使用场景

## 📦 安装

### 使用 pip 安装

```bash
pip install lmp-sdk==2.0.7
```


### 要求

- Python 3.7+
- lpai_asset >= 2.3.38 (批量推理功能必需)
- requests >= 2.25.0

### 安装 Asset SDK（批量推理功能必需）

批量推理功能依赖 lpai_asset SDK，需要从指定源安装：

```bash
pip install lpai_asset -U -i https://artifactory.ep.chehejia.com/artifactory/api/pypi/liauto-pypi-l5/simple
```

- requests >= 2.25.0

## 🚀 快速开始

### 基础示例 - 简单模式

适合快速提交任务并手动查询结果的场景：

```python
from src import InferClient, PostAsyncInferRequest, Content, ContentType

# 1. 创建客户端
client = InferClient(
    token="your-api-token",
    timeout=3600
)

# 2. 提交推理任务
request = PostAsyncInferRequest(
    contents=[
        Content(type=ContentType.TEXT, text="请描述这张图片"),
        Content(
            type=ContentType.IMAGE_URL,
            image_url={"url": "datasets/images/example.png"}
        )
    ],
    model="qwen__qwen2_5-vl-72b-instruct"
)

response = client.post_async_infer(request)
print(f"任务已提交，task_id: {response.data.task_id}")

# 3. 查询任务结果
from src import GetAsyncInferRequest

result = client.get_async_infer_res(
    GetAsyncInferRequest(task_id=response.data.task_id)
)
print(f"任务状态: {result.data.status}")
print(f"任务结果: {result.data.task_output}")
```

### 高级示例 - 自动模式

适合需要自动处理任务队列和获取结果通知的场景：

```python
from src import AsyncInfer, PostAsyncInferRequest, Content, ContentType, QueueMonitor

# 1. 定义结果回调函数
def on_task_complete(response):
    print(f"任务完成: {response.data.task_id}")
    print(f"结果: {response.data.task_output}")

# 2. 创建自动处理客户端
infer = AsyncInfer(
    token="your-api-token",
    worker_num=100,          # 并发工作线程数
    callback=on_task_complete  # 任务完成回调
)

# 3. 批量提交任务
batch_requests = [
    PostAsyncInferRequest(
        contents=[Content(type=ContentType.TEXT, text=f"问题 {i}")],
        model="qwen__qwen2_5-vl-72b-instruct"
    )
    for i in range(10)
]

responses = infer.post_async_infer_batch(batch_requests, max_workers=10)
print(f"已提交 {len(responses)} 个任务")

# 4. 监控队列直到全部完成
monitor = QueueMonitor(infer, max_duration=3600)
exit_reason = monitor.monitor()  # 阻塞直到队列为空或超时
print(f"队列处理完成，原因: {exit_reason}")

### 批量推理示例

适合需要处理大量数据的场景，支持JSONL格式的批量推理任务：

```python
from src import BatchClient, CreateBatchAsyncInferRequest

# 1. 创建批量推理客户端
batch_client = BatchClient(
    token="your-api-token",
    env="prod"  # 或 "ontest"
)

# 2. 创建批量任务（SDK会自动上传本地文件）
request = CreateBatchAsyncInferRequest(
    name="批量推理任务",
    description="处理客户反馈数据",
    model_service="qwen__qwen2_5-vl-72b-instruct",
    dataset="/local/path/to/input.jsonl",  # 本地文件，SDK自动上传
    output_dataset="datasets/my-output/versions/v1",  # 输出数据集
    max_waiting_hour=24,
    apikey="your-api-token"
)

response = batch_client.create_batch(request, auto_upload=True)
batch_id = response.data
print(f"批量任务已创建: {batch_id}")

# 3. 等待任务完成
final_detail = batch_client.wait_for_completion(
    batch_id=batch_id,
    poll_interval=30,  # 每30秒查询一次
    max_wait_time=3600  # 最多等待1小时
)

# 4. 下载结果
batch_client.download_results(
    batch_id=batch_id,
    local_dir="./batch_results",
    download_error_file=True  # 同时下载错误文件
)

print(f"结果已下载到: ./batch_results/")
batch_client.close()
```

**批量推理数据格式（JSONL）**：

输入文件 `input.jsonl` 示例：
```jsonl
{"custom_id": "request-1", "method": "POST", "url": "/v1/chat/completions", "body": {"model": "qwen__qwen2_5-vl-72b-instruct", "messages": [{"role": "user", "content": "你好"}]}}
{"custom_id": "request-2", "method": "POST", "url": "/v1/chat/completions", "body": {"model": "qwen__qwen2_5-vl-72b-instruct", "messages": [{"role": "user", "content": "世界"}]}}
```

```

## 📖 核心概念

### 两种使用模式

#### 1. 简单模式 (InferClient)

**特点**：
- 直接提交任务，立即返回 task_id
- 需要手动查询任务结果
- 适合对结果获取时机有精确控制需求的场景

**工作流程**：
```
提交任务 → 获取task_id → 手动轮询 → 获取结果
```

#### 2. 自动模式 (AsyncInfer)

**特点**：
- 自动管理任务队列
- 后台线程自动轮询任务状态
- 通过回调函数通知结果
- 支持任务持久化，程序重启后恢复

**工作流程**：
```
提交任务 → 自动加入队列 → 后台轮询 → 回调通知结果
```

### 数据模型

#### Content - 内容类型

```python
from src import Content, ContentType

# 文本内容
text_content = Content(
    type=ContentType.TEXT,
    text="你好，世界！"
)

# 图片内容
image_content = Content(
    type=ContentType.IMAGE_URL,
    image_url={"url": "path/to/image.jpg"}
)

# 视频内容
video_content = Content(
    type=ContentType.VIDEO_URL,
    video_url={"url": "path/to/video.mp4"}
)
```

#### PostAsyncInferRequest - 推理请求

```python
from src import PostAsyncInferRequest

request = PostAsyncInferRequest(
    contents=[text_content, image_content],  # 内容列表（必填）
    model="qwen__qwen2_5-vl-72b-instruct",  # 模型名称（必填）
    temperature=0.7,                         # 温度参数（可选，默认0.000001）
    max_tokens=2048,                         # 最大token数（可选）
    top_p=0.9,                              # Top-p采样（可选）
    frequency_penalty=1.05,                  # 频率惩罚（可选，默认1.05）
    presence_penalty=0.0,                    # 存在惩罚（可选）
    lpai_callback="https://your-callback-url",  # 回调URL（可选）
    lpai_max_request_retries=5,              # 最大重试次数（可选，默认5）
    role="user"                              # 角色（可选，默认"user"）
)
```

#### TaskStatus - 任务状态

```python
from src import TaskStatus

# 任务状态枚举
TaskStatus.PENDING    # 排队中
TaskStatus.RUNNING    # 运行中
TaskStatus.SUCCEEDED  # 成功
TaskStatus.FAILED     # 失败
TaskStatus.UNKNOWN    # 未知
```

## 🔧 API 参考

### InferClient

简单模式客户端，适合快速集成。

#### 初始化

```python
client = InferClient(
    token: str,                           # API Token（必填）
    endpoint: str = "默认端点",            # API端点（可选）
    worker_num: int = 100,                # 连接池大小（可选）
    timeout: int = 3600                   # 请求超时时间（秒，可选）
)
```

#### 方法

##### post_async_infer()

提交单个异步推理任务。

```python
response = client.post_async_infer(
    request: PostAsyncInferRequest
) -> PostAsyncInferResponse
```

**返回值**：
- `response.errno`: 错误码（0表示成功）
- `response.msg`: 消息
- `response.data.task_id`: 任务ID
- `response.data.queue_length`: 队列长度
- `response.data.estimated_scheduled_time`: 预计调度时间（秒）

##### post_async_infer_batch()

批量提交推理任务（并发执行）。

```python
responses = client.post_async_infer_batch(
    requests: List[PostAsyncInferRequest],
    max_workers: int = 10                 # 并发线程数
) -> List[PostAsyncInferResponse]
```

##### get_async_infer_res()

查询任务结果。

```python
result = client.get_async_infer_res(
    request: GetAsyncInferRequest
) -> TaskResponse
```

**返回值**：
- `result.errno`: 错误码
- `result.msg`: 消息
- `result.data.task_id`: 任务ID
- `result.data.status`: 任务状态
- `result.data.task_output`: 任务输出结果
- `result.data.failed_reason`: 失败原因（如果失败）
- `result.data.e2e_latency`: 端到端延迟（毫秒）

---

### AsyncInfer

自动模式客户端，提供完整的任务管理能力。

#### 初始化

```python
infer = AsyncInfer(
    token: str,                           # API Token（必填）
    endpoint: str = "默认端点",            # API端点（可选）
    worker_num: int = 100,                # 后台工作线程数（可选）
    polling_interval: int = 10,           # 轮询间隔（秒，可选）
    max_wait_time: int = 86400,          # 最大等待时间（秒，可选）
    max_queue_size: int = 100000,        # 最大队列大小（可选）
    timeout: int = 3600,                  # 请求超时（秒，可选）
    callback: Callable = None             # 任务完成回调（可选）
)
```

#### 方法

##### post_async_infer()

提交单个任务（自动加入队列）。

```python
response = infer.post_async_infer(
    request: PostAsyncInferRequest
) -> PostAsyncInferResponse
```

##### post_async_infer_batch()

批量提交任务（自动加入队列）。

```python
responses = infer.post_async_infer_batch(
    requests: List[PostAsyncInferRequest],
    max_workers: int = 10
) -> List[PostAsyncInferResponse]
```

---

### QueueMonitor

队列监控器，用于监控和管理任务队列。

#### 初始化

```python
monitor = QueueMonitor(
    infer: AsyncInfer,                    # AsyncInfer实例（必填）
    max_duration: int = 3600,             # 最大运行时间（秒，可选）
    check_interval: int = 30              # 检查间隔（秒，可选）
)
```

#### 方法

##### monitor()

阻塞监控队列，直到队列为空或超时。

```python
exit_reason = monitor.monitor() -> str
# 返回值: "empty"（队列为空） 或 "timeout"（超时）
```

##### get_elapsed_time()

获取已运行时间。

```python
elapsed = monitor.get_elapsed_time() -> float  # 返回秒数
```

##### get_queue_size()

获取当前队列大小。

```python
size = monitor.get_queue_size() -> int
```

---

### BatchClient

批量推理客户端，用于处理大规模数据的批量推理任务。

#### 初始化

```python
batch_client = BatchClient(
    token: str,                           # JWT认证token（必填）
    base_url: str = "默认URL",             # API基础URL（可选）
    env: str = "prod",                    # 运行环境（可选）
    timeout: int = 60                     # 请求超时时间（秒，可选）
)
```

#### 方法

##### create_batch()

创建批量异步推理任务。

```python
response = batch_client.create_batch(
    request: CreateBatchAsyncInferRequest,
    auto_upload: bool = True              # 是否自动上传本地文件
) -> CreateBatchAsyncInferResponse
```

**返回值**：
- `response.errno`: 错误码（0表示成功）
- `response.msg`: 消息
- `response.data`: 批量任务ID

##### list_batches()

获取批量推理任务列表。

```python
response = batch_client.list_batches(
    request: ListBatchAsyncInferRequest = None
) -> ListBatchAsyncInferResponse
```

##### get_batch_detail()

获取批量推理任务详情。

```python
response = batch_client.get_batch_detail(
    batch_id: str
) -> GetBatchAsyncInferDetailResponse
```

**返回值包含**：
- `status`: 任务状态（validating/failed/in_progress/finalizing/completed/expired/cancelling/cancelled）
- `progress`: 任务进度（0.0-1.0）
- `request_num`: 总请求数
- `finished_request_num`: 已完成请求数
- `succeeded_request_num`: 成功请求数
- `failed_request_num`: 失败请求数

##### cancel_batch()

取消批量推理任务。

```python
response = batch_client.cancel_batch(
    batch_id: str
) -> CancelBatchAsyncInferResponse
```

##### download_results()

下载批量推理任务的结果文件。

```python
batch_client.download_results(
    batch_id: str,                        # 批量任务ID
    local_dir: str,                       # 本地下载目录
    download_error_file: bool = True,     # 是否下载错误文件
    overwrite: bool = False               # 是否覆盖已存在文件
) -> None
```

##### wait_for_completion()

等待批量任务完成。

```python
final_detail = batch_client.wait_for_completion(
    batch_id: str,
    poll_interval: int = 30,              # 轮询间隔（秒）
    max_wait_time: int = 86400            # 最大等待时间（秒）
) -> GetBatchAsyncInferDetailResponse
```


## 🎯 使用场景

### 场景1：单次推理查询

```python
from src import InferClient, PostAsyncInferRequest, Content, ContentType

client = InferClient(token="your-token")

# 提交任务
response = client.post_async_infer(
    PostAsyncInferRequest(
        contents=[Content(type=ContentType.TEXT, text="什么是人工智能？")],
        model="qwen__qwen2_5-vl-72b-instruct"
    )
)

# 等待一段时间后查询
import time
time.sleep(30)

result = client.get_async_infer_res(
    GetAsyncInferRequest(task_id=response.data.task_id)
)
```

### 场景2：批量数据处理

```python
from src import AsyncInfer, PostAsyncInferRequest, Content, ContentType

results = []

def collect_result(response):
    results.append(response.data.task_output)

infer = AsyncInfer(token="your-token", callback=collect_result)

# 批量提交1000个任务
requests = [
    PostAsyncInferRequest(
        contents=[Content(type=ContentType.TEXT, text=data)],
        model="qwen__qwen2_5-vl-72b-instruct"
    )
    for data in your_data_list  # 假设有1000条数据
]

infer.post_async_infer_batch(requests, max_workers=50)

# 等待全部完成
from src import QueueMonitor
monitor = QueueMonitor(infer, max_duration=7200)
monitor.monitor()

print(f"处理完成，共 {len(results)} 个结果")
```

### 场景3：图文混合推理

```python
from src import InferClient, PostAsyncInferRequest, Content, ContentType

client = InferClient(token="your-token")

# 多模态输入
response = client.post_async_infer(
    PostAsyncInferRequest(
        contents=[
            Content(type=ContentType.TEXT, text="请分析这张图片中的内容"),
            Content(
                type=ContentType.IMAGE_URL,
                image_url={"url": "datasets/images/analysis.jpg"}
            ),
            Content(type=ContentType.TEXT, text="并提供详细说明")
        ],
        model="qwen__qwen2_5-vl-72b-instruct",
        temperature=0.7,
        max_tokens=2048
    )
)
```

### 场景4：带回调URL的异步处理

```python
from src import InferClient, PostAsyncInferRequest, Content, ContentType

client = InferClient(token="your-token")

response = client.post_async_infer(
    PostAsyncInferRequest(
        contents=[Content(type=ContentType.TEXT, text="任务内容")],
        model="qwen__qwen2_5-vl-72b-instruct",
        lpai_callback="https://your-service.com/callback"  # 结果会POST到这个URL
    )
)

### 场景5：大规模批量推理

适合处理成千上万条数据的场景：

```python
from src import BatchClient, CreateBatchAsyncInferRequest

# 创建批量推理客户端
batch_client = BatchClient(token="your-token", env="prod")

# 创建批量任务（支持本地文件自动上传）
request = CreateBatchAsyncInferRequest(
    name="客户反馈批量分析",
    description="分析10000条客户反馈",
    model_service="qwen__qwen2_5-vl-72b-instruct",
    dataset="/local/path/to/10000_feedbacks.jsonl",  # 本地文件
    output_dataset="datasets/feedback-analysis/versions/v1",
    max_waiting_hour=48,  # 允许48小时完成
    apikey="your-token"
)

# 创建任务（SDK自动上传文件到lmp-request数据集）
response = batch_client.create_batch(request, auto_upload=True)
batch_id = response.data

# 等待任务完成
final_detail = batch_client.wait_for_completion(
    batch_id=batch_id,
    poll_interval=60,  # 每分钟查询一次
    max_wait_time=172800  # 最多等待48小时
)

# 下载结果
batch_client.download_results(
    batch_id=batch_id,
    local_dir="./results",
    download_error_file=True
)

print(f"批量任务完成: {final_detail.data.succeeded_request_num}/{final_detail.data.request_num} 成功")
```

```

## ⚙️ 配置说明

### 环境变量

可以通过环境变量配置默认参数：

```bash
export LMP_API_TOKEN="your-token"
export LMP_API_ENDPOINT="https://custom-endpoint.com/api"
export LMP_DEFAULT_MODEL="your-model"
```

### 任务持久化

AsyncInfer 模式下，任务会自动持久化到本地文件：

- 默认路径：`~/.lmp/task_queue.json`
- 程序异常退出时自动保存
- 重启后自动恢复未完成的任务

## 🔍 错误处理

### 异常类型

```python
from src import LMPException, APIError, TaskTimeoutError, TaskFailedError, QueueFullError

try:
    response = client.post_async_infer(request)
except APIError as e:
    print(f"API错误: {e.status_code} - {e.message}")
except TaskTimeoutError as e:
    print(f"任务超时: {e}")
except TaskFailedError as e:
    print(f"任务失败: {e}")

### 批量推理专用异常

```python
from src import (
    BatchCreationError,
    BatchNotFoundError,
    DatasetPathError,
    AssetDownloadError,
    AssetUploadError
)

try:
    response = batch_client.create_batch(request)
except BatchCreationError as e:
    print(f"批量任务创建失败: {e}")
except DatasetPathError as e:
    print(f"数据集路径错误: {e}")
except AssetUploadError as e:
    print(f"文件上传失败: {e}")
```

except QueueFullError as e:
    print(f"队列已满: {e}")
except LMPException as e:
    print(f"SDK错误: {e}")
```

### 重试机制

SDK 内置了多层重试机制：

1. **HTTP层重试**：网络请求失败自动重试3次（指数退避）
2. **任务层重试**：可通过 `lpai_max_request_retries` 参数配置（默认5次）

### 日志配置

```python
import logging

# 配置SDK日志级别
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)

# 调试模式
logging.getLogger('src').setLevel(logging.DEBUG)
```

## 🧪 测试

```bash
# 安装测试依赖
pip install -e ".[dev]"

# 运行测试
pytest tests/

# 生成覆盖率报告
pytest --cov=src tests/
```

## 📊 性能优化

### 连接池配置

```python
# 高并发场景建议配置
client = InferClient(
    token="your-token",
    worker_num=200,      # 增加连接池大小
    timeout=7200         # 增加超时时间
)
```

### 批量处理优化

```python
# 使用批量接口
responses = infer.post_async_infer_batch(
    requests,
    max_workers=50  # 根据服务器性能调整并发数
)
```

### 内存优化

对于大规模任务处理，建议分批提交：

```python
batch_size = 1000
for i in range(0, len(all_requests), batch_size):
    batch = all_requests[i:i+batch_size]
    responses = infer.post_async_infer_batch(batch)
    # 等待这批完成后再提交下一批
    monitor.monitor()
```

## 🔒 安全建议

1. **Token管理**：不要在代码中硬编码token，使用环境变量或配置文件
2. **HTTPS**：生产环境必须使用HTTPS端点
3. **权限控制**：使用最小权限原则，只授予必要的API权限
4. **日志脱敏**：避免在日志中输出敏感信息

## 📚 更多资源

- [平台使用文档](https://li.feishu.cn/wiki/X8yxwluDCiRci0k5e85cp3cxndg?from=from_copylink)
- [示例代码](./examples)


## 📝 版本发布

### 更新版本号

修改以下文件中的版本号：
- `pyproject.toml`：`version = "x.x.x"`
- `setup.py`：`version = "x.x.x"`

### 构建和发布

```bash
# 清理旧构建
rm -rf dist/ build/ *.egg-info

# 构建分发包
python -m build

# 上传到PyPI
twine upload dist/*
```

---

**Made with ❤️ by LMP SDK Team**
