Metadata-Version: 2.4
Name: sskw-tasks
Version: 0.4.2
Summary: sskw shared celery task declaration
Author-email: liwei <491520313@qq.com>
License-Expression: MIT
Project-URL: Homepage, https://tasks.sensearray.com/docs
Requires-Python: >=3.8
Description-Content-Type: text/markdown
Requires-Dist: celery>=5.5.3
Requires-Dist: pydantic>=2.10.6

# SSKW Tasks - 共享Celery任务声明包

## 项目简介

`sskw-tasks` 是一个共享的Python包，用于定义和管理Celery任务接口。该包提供了统一的任务声明规范，使得业务端和Worker端可以基于相同的接口定义进行任务投递和具体实现。

## 特性

- 🔄 统一的任务接口声明
- 📋 基于Pydantic的数据模型验证
- 🔧 灵活的Celery配置
- 📦 易于集成和扩展

## 安装

```bash
pip install sskw-tasks
```

## 环境配置

在使用前，需要设置以下环境变量：

```bash
# Redis/RabbitMQ 作为消息代理
export CELERY_BROKER_URL="redis://localhost:6379/0"
# 结果存储后端
export CELERY_RESULT_BACKEND="redis://localhost:6379/0"
```

## 业务端使用方法

业务端负责投递任务到队列，可以按以下方式使用：

### 1. 导入和配置

```python
from sskw.tasks.celery_app import app
from sskw.tasks.models import AudioTranscriptionRequest, AudioFormatConversionRequest
from sskw.tasks.audio_transcription import audio_transcription_paraformer_cpu

# 配置Celery（可选，如果需要自定义配置）
app.conf.update(
    task_serializer='json',
    result_serializer='json',
    accept_content=['json'],
    result_expires=3600,
)
```

### 2. 投递任务

```python
# 构造输入数据
input_data = AudioTranscriptionRequest(
    audio_urls=[
        "https://example.com/audio1.wav",
        "https://example.com/audio2.wav"
    ],
    data_id="meeting_20250109_001"
)

# 异步投递任务
result = audio_transcription_paraformer_cpu.delay(input_data.model_dump())

# 获取任务ID
task_id = result.id
print(f"任务已投递，ID: {task_id}")

# 获取任务结果（阻塞）
try:
    task_result = result.get(timeout=300)
    print(f"任务完成，结果: {task_result}")
except Exception as e:
    print(f"任务执行失败: {e}")
```

### 3. 检查任务状态

```python
# 检查任务状态
status = result.status
print(f"任务状态: {status}")

# 非阻塞获取结果
if result.ready():
    if result.successful():
        print(f"任务成功: {result.result}")
    else:
        print(f"任务失败: {result.traceback}")
```

## Worker端使用方法

Worker端负责实现具体的任务逻辑：

### 1. 实现任务

根据是否需要任务实例功能，有两种实现方式：

#### 方式一：简单实现（不需要 self）

```python
# worker_implementation.py
from sskw.tasks.celery_app import app
from sskw.tasks.models import AudioTranscriptionRequest, AudioTranscriptionResult

@app.task(name="audio.transcription.paraformer.cpu")
def audio_transcription_paraformer_cpu_impl(input_data: dict) -> dict:
    """
    音频转录任务的简单实现

    参数:
    - input_data (dict): 输入数据字典

    返回:
    - dict: 处理结果
    """
    # 验证输入数据
    audio_input = AudioTranscriptionRequest(**input_data)

    # 执行具体的音频转录逻辑
    segments = []
    for i, audio_url in enumerate(audio_input.audio_urls):
        # 示例转录逻辑（实际应调用 paraformer 模型）
        segment = {
            "text": f"这是第{i+1}段音频的转录结果",
            "start": str(i * 10),
            "end": str((i + 1) * 10)
        }
        segments.append(segment)

    # 构造返回结果
    result = AudioTranscriptionResult(
        audio_urls=audio_input.audio_urls,
        data_id=audio_input.data_id,
        segments=segments,
        merged_audio_url="https://example.com/merged_audio.wav"
    )

    return result.model_dump()
```

#### 方式二：高级实现（需要 self，支持状态更新）

```python
# worker_implementation.py
from sskw.tasks.celery_app import app
from sskw.tasks.models import AudioTranscriptionRequest, AudioTranscriptionResult

@app.task(bind=True, name="audio.transcription.paraformer.cpu")
def audio_transcription_paraformer_cpu_impl(self, input_data: dict) -> dict:
    """
    音频转录任务的高级实现（支持进度更新和错误处理）

    参数:
    - self: 任务实例，提供状态更新等功能
    - input_data (dict): 输入数据字典

    返回:
    - dict: 处理结果
    """
    try:
        # 验证输入数据
        audio_input = AudioTranscriptionRequest(**input_data)

        # 执行具体的音频转录逻辑
        segments = []
        total_audios = len(audio_input.audio_urls)

        for i, audio_url in enumerate(audio_input.audio_urls):
            # 示例转录逻辑（实际应调用 paraformer 模型）
            segment = {
                "text": f"这是第{i+1}段音频的转录结果",
                "start": str(i * 10),
                "end": str((i + 1) * 10)
            }
            segments.append(segment)

            # 更新任务进度（可选）
            self.update_state(
                state='PROGRESS',
                meta={
                    'processed': i + 1,
                    'total': total_audios,
                    'percentage': int((i + 1) / total_audios * 100)
                }
            )

        # 构造返回结果
        result = AudioTranscriptionResult(
            audio_urls=audio_input.audio_urls,
            data_id=audio_input.data_id,
            segments=segments,
            merged_audio_url="https://example.com/merged_audio.wav"
        )

        return result.model_dump()
        
    except Exception as e:
        # 记录错误并重新抛出
        self.update_state(
            state='FAILURE',
            meta={'error': str(e), 'task_id': self.request.id}
        )
        raise
```

#### 选择哪种方式？

- **方式一（不带 self）**：适用于简单任务，不需要进度更新、状态管理等功能
- **方式二（带 self）**：适用于复杂任务，需要：
  - 进度更新 (`self.update_state()`)
  - 手动重试 (`self.retry()`)
  - 访问任务信息 (`self.request.id`, `self.request.retries`)
  - 任务替换 (`self.replace()`)

#### self 参数提供的功能

```python
# 任务重试
@app.task(bind=True, autoretry_for=(Exception,))
def retry_task(self, data):
    try:
        # 任务逻辑
        pass
    except SomeException as e:
        # 手动重试
        raise self.retry(countdown=60, max_retries=3)

# 访问任务信息
@app.task(bind=True)
def info_task(self, data):
    task_id = self.request.id
    retries = self.request.retries
    print(f"任务ID: {task_id}, 重试次数: {retries}")
```

### 2. 启动Worker

```python
# worker.py
from celery import Celery
from sskw.tasks.celery_app import app

# 导入具体实现
import worker_implementation

if __name__ == '__main__':
    # 启动worker
    app.start(['worker', '--loglevel=info'])
```

或者使用命令行启动：

```bash
# 在项目根目录执行
celery -A sskw.tasks.celery_app worker --loglevel=info --concurrency=4
```

## 扩展任务类型

可以轻松扩展新的任务类型：

### 1. 定义新的数据模型

```python
# 在models.py中添加
class ImageInput(BaseModel):
    image_url: str
    operations: List[str]
    quality: int = 80
```

### 2. 声明新的任务接口

```python
# 创建tasks/image_processing.py
from sskw.tasks.celery_app import app
from sskw.tasks.models import ImageInput

@app.task(name="image.process")
def process_image(input: ImageInput) -> dict:
    """
    图像处理任务接口
    
    参数:
    - input (ImageInput): 图像处理输入
    
    返回:
    - dict: 处理结果
    """
    raise NotImplementedError("此任务必须在具体实现中完成")
```

### 任务重试配置

```python
@app.task(bind=True, autoretry_for=(Exception,), retry_kwargs={'max_retries': 3, 'countdown': 60})
def robust_task(self, input_data):
    # 任务实现
    pass
```

## 监控和管理

### 使用Flower监控

```bash
# 安装flower
pip install flower

# 启动监控界面
celery -A sskw.tasks.celery_app flower
```

### 使用Celery命令行工具

```bash
# 查看活跃任务
celery -A sskw.tasks.celery_app inspect active

# 查看worker状态
celery -A sskw.tasks.celery_app inspect stats

# 清空队列
celery -A sskw.tasks.celery_app purge
```

## 最佳实践

1. **数据验证**: 始终使用Pydantic模型验证输入数据
2. **错误处理**: 在任务实现中添加适当的异常处理
3. **进度更新**: 对于长时间运行的任务，定期更新进度状态
4. **资源管理**: 合理设置Worker并发数和内存限制
5. **监控日志**: 配置详细的日志记录用于问题排查

## 版本信息

- 当前版本: 0.0.1
- Python要求: >= 3.8
- Celery版本: >= 5.5.3

---


## 修改历史
### v0.4.2  2025-07-10 liwei
- 添加 meeting.text.refine / meeting.text.summery 接口

### v0.4.1  2025-07-08 liwei
- 添加 audio.transcription.paraformer / audio.transcription.sensevoice 接口

### v0.4.0  2025-07-08 liwei   
- 添加语音转文字任务 audio.transcription.paraformer.cpu / gpu  audio.transcription.sensevoice.cpu /gpu
- 添加语音格式转换任务 audio.format.conversion
