Metadata-Version: 2.4
Name: llm-datagen
Version: 1.0.0
Summary: 极简高性能流式数据加工库
Home-page: https://github.com/your-org/llm-datagen
Author: llm-datagen Team
Author-email: 
License: MIT
Keywords: data processing,pipeline,llm,streaming
Classifier: Development Status :: 3 - Alpha
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: MIT License
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.8
Classifier: Programming Language :: Python :: 3.9
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Requires-Python: >=3.8
Description-Content-Type: text/markdown
Requires-Dist: openai>=1.0.0
Provides-Extra: dev
Requires-Dist: pytest>=7.0.0; extra == "dev"
Requires-Dist: pytest-cov>=4.0.0; extra == "dev"
Requires-Dist: black>=23.0.0; extra == "dev"
Requires-Dist: flake8>=6.0.0; extra == "dev"
Dynamic: home-page
Dynamic: requires-python

# llm-datagen: 极简高性能流式数据加工库

> **如果你在处理百万级 LLM 数据时曾遭遇过 OOM、磁盘 I/O 锁死，或因程序崩溃导致昂贵的 API 费用重跑，那么 llm-datagen 正是为你设计的。**

llm-datagen 是一个专为大规模数据加工、LLM 语料清洗以及数据生成设计的轻量级 Python 框架。它在保持极致简单的 API 的同时，提供了工业级的可靠性（断点续传）和极致的并发性能（流式处理与异步 I/O）。

---

## 1. 为什么选择 llm-datagen？

*   💰 **保护钱包**：采用“最多一次”语义与物理位点记录，确保 LLM 调用绝不重跑，崩溃后秒级恢复。
*   🧠 **保护内存**：内置“传输级背压”机制，上游自动根据下游消费速度降速，彻底告别 OOM。
*   ⚡ **榨干性能**：单写者异步总线解耦计算与 I/O，支持批次间并行与批次内并发的双层火力。
*   🧩 **算子自适应**：算子只需实现 `process_item` 或 `process_batch`，框架自动完成高性能并发封装。
*   🔌 **协议中立**：一套代码无缝切换 `JSONL`、`CSV`、`Memory` 或本地/远程存储。

---

## 2. 核心特性

*   🚀 **极致并发**：支持“批次间并行”与“批次内并发”的双层火力，自动适配 CPU 密集与 I/O 密集任务。
*   🛠️ **算子多态适配**：开发者只需编写 `process_item` 或 `process_batch`，框架自动完成高性能并行封装与 1:N 爆炸分发。
*   💾 **断点续传 (Recovery)**：物理级封条机制 (`.done`) 与镜像级状态快照，确保任务在崩溃后能从中断点精准恢复，绝不重复扣费（LLM 友好）。
*   🌪️ **流式总线 (Streaming)**：全链路流式传输，上游处理一条，下游立即消费，内存占用恒定，不随数据量增加。
*   🚥 **内置背压 (Backpressure)**：通过信号量与有界异步队列双重保护，彻底终结大规模任务下的 OOM（内存溢出）噩梦。
*   🔌 **协议中立**：一套代码无缝切换 `JSONL`、`CSV`、`Memory` 或远程存储。

---

## 2. 快速开始 (Usage Demo)

### 3.1 安装

```bash
pip install llm-datagen
```

### 3.2 极简管道示例

只需几行代码，即可构建一个带断点续传能力的加工管道。

```python
from llm_datagen import UnifiedPipeline, FunctionOperator

# 1. 定义你的业务算子 (只需关注单条逻辑)
def my_process(item):
    return {"text": item["text"].upper(), "len": len(item["text"])}

# 2. 构建并运行管道
pipeline = UnifiedPipeline(operators=[
    FunctionOperator(my_process)
])

# 3. 执行任务 (支持 JSONL/CSV 协议)
pipeline.create(
    pipeline_id="hello_llm_datagen",
    input_uri="jsonl://input.jsonl",
    output_uri="jsonl://output.jsonl",
    parallel_size=5  # 开启 5 并发
)
pipeline.run()
```

### 3.3 恢复运行

如果程序中途崩溃，只需调用 `resume`：

```python
pipeline = UnifiedPipeline(operators=[...])
pipeline.resume(pipeline_id="hello_llm_datagen")
pipeline.run() # 自动从断点继续
```

---

## 4. 开发者定义 (Developer Guide)

### 4.1 定义算子 (Operator)

llm-datagen 支持三种方式定义算子：

1.  **函数包装**：使用 `FunctionOperator(func)`。
2.  **继承 `BaseOperator` (推荐)**：实现 `process_item`（简单）或 `process_batch`（高性能，适合 LLM 批量调用）。

```python
from llm_datagen.core.operators import BaseOperator

class MyLLMOperator(BaseOperator):
    def process_batch(self, items, ctx=None):
        # 批量调用 LLM API
        results = call_llm_api([it["prompt"] for it in items])
        return [{"output": res} for res in results]
```

### 4.2 高级配置 (`WriterConfig`)

你可以通过 `WriterConfig` 压榨 I/O 性能或控制背压：

```python
from llm_datagen import WriterConfig

writer_config = WriterConfig(
    async_mode=True,       # 开启异步写入
    queue_size=1000,       # 背压队列大小
    flush_batch_size=100   # 聚合 100 条写入一次磁盘
)

pipeline.create(..., writer_config=writer_config)
```

---

## 4. 架构与进阶文档

llm-datagen 提供了完善的文档矩阵，帮助你从入门到精通：

*   💡 **[业务使用指南 (USER_GUIDE.md)](docs/USER_GUIDE.md)**：面向使用者。涵盖算子开发、管道配置、断点恢复与实战模式。
*   ⚙️ **[扩展开发指南 (DEVELOPER_GUIDE.md)](docs/DEVELOPER_GUIDE.md)**：面向二次开发。涵盖自定义 Bus、协议扩展与深度钩子集成。
*   📖 **[架构详解 (DETAIL.md)](docs/DETAIL.md)**：深层原理。深入了解 Node、Bus 和 Pipeline 的生命周期与物理协议。
*   ❓ **[FAQ / 避坑指南 (QUESTION.md)](docs/QUESTION.md)**：血泪教训。涵盖了死锁幽灵、位点幻读、背压控制等核心问题的解决方案。
*   🏗️ **[架构白皮书 (DEVELOPMENT.md)](DEVELOPMENT.md)**：顶层设计。界定了项目在整个生态系统中的位置与设计哲学。

---

## 许可证

MIT License
