Metadata-Version: 2.4
Name: isagellm-comm
Version: 0.5.2.18
Summary: Communication Layer for sageLLM distributed inference
Author-email: IntelliStream Team <shuhao_zhang@hust.edu.cn>
License: Private
Project-URL: Homepage, https://github.com/intellistream/sagellm-comm
Project-URL: Repository, https://github.com/intellistream/sagellm-comm
Project-URL: Issues, https://github.com/intellistream/sagellm-comm/issues
Keywords: llm,inference,communication,nccl,hccl,domestic-hardware
Classifier: Development Status :: 3 - Alpha
Classifier: Intended Audience :: Developers
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Requires-Python: ==3.11.*
Description-Content-Type: text/markdown
Requires-Dist: isagellm-protocol<0.6.0,>=0.5.2.11
Requires-Dist: pydantic>=2.0.0
Provides-Extra: dev
Requires-Dist: pytest>=7.0.0; extra == "dev"
Requires-Dist: pytest-cov>=4.0.0; extra == "dev"
Requires-Dist: ruff>=0.1.0; extra == "dev"
Requires-Dist: isage-pypi-publisher>=0.2.0.0; extra == "dev"

# sagellm-comm

## Protocol Compliance (Mandatory)

- MUST follow Protocol v0.1: https://github.com/intellistream/sagellm-docs/blob/main/docs/specs/protocol_v0.1.md
- Any globally shared definitions (fields, error codes, metrics, IDs, schemas) MUST be added to Protocol first.

[![CI](https://github.com/intellistream/sagellm-comm/actions/workflows/ci.yml/badge.svg)](https://github.com/intellistream/sagellm-comm/actions/workflows/ci.yml)
[![PyPI version](https://badge.fury.io/py/isagellm-comm.svg)](https://badge.fury.io/py/isagellm-comm)
[![Python Version](https://img.shields.io/pypi/pyversions/isagellm-comm.svg)](https://pypi.org/project/isagellm-comm/)
[![codecov](https://codecov.io/gh/intellistream/sagellm-comm/branch/main/graph/badge.svg)](https://codecov.io/gh/intellistream/sagellm-comm)

**通信硬件抽象层** - 为 sageLLM 提供分布式通信能力（NCCL/HCCL/Gloo）

## Overview

sagellm-comm 是 sageLLM 的**通信硬件抽象层**，与 sagellm-backend（计算硬件抽象层）**平行**，专注于分布式推理的通信需求。

> ⚠️ **架构说明**：本包**不依赖** sagellm-backend，两者是平行的 L1 层抽象。计算相关操作请使用 [sagellm-backend](https://github.com/intellistream/sagellm-backend)。

### 功能一览

| 功能 | 任务 | 说明 |
|------|------|------|
| **拓扑发现** | Task1.1 | 自动发现节点、GPU、互联拓扑 |
| **集合操作** | Task1.2 | AllReduce, AllGather, ReduceScatter 等 |
| **计算/通信重叠** | Task1.4, 1.8 | Multi-stream overlap, pipeline |
| **国产互联适配** | Task1.5, 1.6 | CXL/UB/RDMA 适配器 |
| **跨节点通信** | Task1.7 | 跨节点集合操作优化 |

> **注意**: Task1.3 (KV Transfer) 已移至 `sagellm-kv-cache` 仓库，本包提供底层 `CommBackend` 供其使用。

### 架构定位

```text
┌─────────────────────────────────────────────────────────────────────────────┐
│                           sagellm-core (L2)                                  │
│            引擎层：LLMEngine / Scheduler / Executor / ModelRunner            │
│                                                                              │
│          ⬇️ 计算相关调用                      ⬇️ 通信相关调用                  │
├─────────────────────────────────┬────────────────────────────────────────────┤
│     sagellm-backend (L1)        │       sagellm-comm (L1) ← 本仓库           │
│     计算硬件抽象层               │       通信硬件抽象层                        │
│                                 │                                            │
│  • Device / Stream / Event      │  • CommBackend 通信后端抽象                │
│  • Memory Allocator             │  • Topology 拓扑发现                       │
│  • Kernel Registry              │  • Collective Ops (all_reduce 等)          │
│  • Attention Backend            │  • P2P Ops (send/recv)                     │
│  • KV Block 基础操作            │  • CommGroup 通信组管理                    │
│                                 │  • 计算通信重叠 (Overlap)                  │
│  Providers:                     │                                            │
│  CUDA│Ascend│Kunlun│DCU│CPU     │  Backends: NCCL│HCCL│RCCL│Gloo            │
├─────────────────────────────────┴────────────────────────────────────────────┤
│                         sagellm-protocol (L0)                                │
│                      协议定义：Schema / Errors / Types                        │
└──────────────────────────────────────────────────────────────────────────────┘
```

### 职责边界

| 职责 | sagellm-comm | sagellm-backend |
|------|--------------|-----------------|
| 通信操作 (all_reduce) | ✅ | ❌ |
| 拓扑发现 | ✅ | ❌ |
| P2P 通信 (send/recv) | ✅ | ❌ |
| 通信组管理 | ✅ | ❌ |
| 计算/通信重叠 | ✅ | ❌ |
| Device/Stream/Event | ❌ | ✅ |
| 内存分配与管理 | ❌ | ✅ |
| Kernel 注册/选择 | ❌ | ✅ |

**关键约束**：
- ✅ **本仓库负责**：通信后端抽象、拓扑发现、集合操作、P2P 通信、通信组管理
- ❌ **不依赖**：sagellm-backend（两者是平行的 L1 层）
- ❌ **不负责**：设备内存管理（由 backend 负责）
- 🔗 **被使用于**：sagellm-core（分布式推理）、sagellm-kv-cache（KV 传输）

### 与 sagellm-backend 的协作方式

在分布式推理场景中，core 层同时使用 backend 和 comm：

```python
from sagellm_backend import get_provider
from sagellm_comm import CommBackend, ReduceOp

# backend: 计算相关
backend = get_provider("cuda")
tensor = backend.allocate(1024, DType.FP16)

# comm: 通信相关
comm = CommBackend.create("nccl")
comm.init_process_group(world_size=4, rank=0)
comm.all_reduce(tensor, op=ReduceOp.SUM)
```

### 📦 职责边界图

```
┌─────────────────────────────────────────────────────────────────────┐
│                         sagellm-core                                 │
│                    (分布式推理：TP/PP 并行)                           │
└────────────────────────────┬────────────────────────────────────────┘
                             │ 使用 CommBackend 进行张量通信
                             ▼
┌─────────────────────────────────────────────────────────────────────┐
│                       sagellm-comm (本仓库)                          │
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐ │
│  │  Topology   │  │ Collective  │  │   Overlap   │  │  Domestic   │ │
│  │  (Task1.1)  │  │  (Task1.2)  │  │ (Task1.4/8) │  │  (Task1.5)  │ │
│  └─────────────┘  └─────────────┘  └─────────────┘  └─────────────┘ │
│                      CommBackend Interface                           │
│  ┌──────────┐  ┌──────────┐  ┌──────────┐  ┌──────────┐            │
│  │   NCCL   │  │   HCCL   │  │   RCCL   │  │   Gloo   │            │
│  └──────────┘  └──────────┘  └──────────┘  └──────────┘            │
└─────────────────────────────────────────────────────────────────────┘
                             ▲
                             │ KV Transfer 使用 CommBackend
┌────────────────────────────┴────────────────────────────────────────┐
│                      sagellm-kv-cache                                │
│                   (KV Transfer 使用本包的网络能力)                    │
└─────────────────────────────────────────────────────────────────────┘
```

### 🔍 Research Context

**sagellm-comm** is conceptually similar to the **Transfer Engine** in [Mooncake](https://github.com/kvcache-ai/Mooncake):

| Aspect | Mooncake Transfer Engine | sagellm-comm |
|--------|-------------------------|--------------|
| **Core Function** | KV cache data movement | Network communication layer |
| **Scope** | Cross-node KV transfer | Topology + collectives + overlap |
| **Focus** | RDMA/NVLink optimization | Hardware-agnostic abstraction |
| **KV Transfer** | Integrated | Provided to sagellm-kv-cache |

**Key differences**:
- **sagellm-comm** provides a **unified communication layer** that integrates with sageLLM's backend abstraction, supporting NCCL, HCCL, and domestic interconnects (CXL/UB/RDMA)
- **Compute/communication overlap** (Task1.4/1.8) is a first-class design goal
- **Adapter pattern** ensures zero vendor lock-in: swappable backends without core logic changes
- **KV Transfer (Task1.3)** is implemented in sagellm-kv-cache, using this package's `CommBackend` for data-aware optimization

## Installation

```bash
# 从 PyPI 安装（自动安装依赖）
pip install isagellm-comm
```

## 🚀 开发者快速开始

```bash
git clone git@github.com:intellistream/sagellm-comm.git
cd sagellm-comm
./quickstart.sh   # 一键安装开发环境（含依赖）

# 或手动安装
pip install -e ".[dev]"
```

运行测试：
```bash
pytest tests/ -v
```

> 💡 `isagellm-protocol` 和 `isagellm-backend` 会自动从 PyPI 安装。

## Quick Start

```python
from sagellm_comm import CommGroup, Topology, CollectiveOps

# Discover topology
topology = Topology.discover()

# Create communication group
group = CommGroup.create(world_size=4, rank=0)

# Collective operations (for distributed inference)
CollectiveOps.all_reduce(tensor, group=group)
CollectiveOps.all_gather(tensor, group=group)
```

> **Note**: For KV block transfer, use `sagellm-kv-cache.KVTransferEngine` which utilizes this package's `CommBackend` internally.

## Benchmarking

### Quick Benchmark

Run all-reduce micro-benchmark with default settings:

```bash
# Single process (world_size=1)
python examples/benchmark_all_reduce.py

# Multi-process with torchrun (2 processes)
torchrun --nproc_per_node=2 examples/benchmark_all_reduce.py
```

### CLI Tool

Use the CLI for more control:

```bash
# Run with custom message sizes
torchrun --nproc_per_node=2 -m benchmark.run_bench --message-sizes 1024 4096 16384

# Specify backend (default: gloo)
torchrun --nproc_per_node=4 -m benchmark.run_bench --backend gloo

# Adaptive selection (default): backend + algorithm auto policy
torchrun --nproc_per_node=4 -m benchmark.run_bench --backend auto --algo auto

# Explicit algorithm
torchrun --nproc_per_node=4 -m benchmark.run_bench --algo tree

# Custom selector policy config
python -m benchmark.run_bench --selector-config ./selector_policy.json

# Custom warmup and benchmark iterations
python -m benchmark.run_bench --warmup 20 --iters 200

# Specify output directory
python -m benchmark.run_bench --output-dir ./my_results
```

### Output Format

Benchmark results are exported in both JSON and CSV formats:

```
.benchmarks/collective/
├── all_reduce_ws2_20260217_120000.json  # JSON format
└── all_reduce_ws2_20260217_120000.csv   # CSV format
```

**Output fields**:
- `bytes`: Message size in bytes
- `latency_ms`: Latency in milliseconds
- `bandwidth_gbps`: Bandwidth in GB/s
- `algo_id`: Algorithm identifier (e.g., "ring")
- `backend_kind`: Backend type (e.g., "gloo", "nccl")
- `topology_source`: Topology source (e.g., "env", "cuda")
- `world_size`: Number of processes
- `op_type`: Operation type (e.g., "all_reduce")
- `data_type`: Tensor data type (e.g., "float32")
- `warmup_iters`: Number of warmup iterations
- `benchmark_iters`: Number of benchmark iterations
- `selection_rule_id`: Rule ID used by adaptive selector
- `selection_reason`: Human-readable selection reason for traceability
- `timestamp`: Benchmark timestamp

### Python API

```python
from sagellm_comm import get_comm_backend
from sagellm_comm.benchmark import run_all_reduce_benchmark

# Initialize backend
backend = get_comm_backend("gloo")
backend.init(rank=0, world_size=1, master_addr="localhost", master_port=29500)

# Run benchmark
results = run_all_reduce_benchmark(
    backend=backend,
    message_sizes=[1024, 4096, 16384],  # Element counts
    output_dir=".benchmarks/collective",
    warmup_iters=10,
    benchmark_iters=100,
)

# Results are automatically exported to JSON + CSV
print(f"Completed {len(results)} benchmark runs")
```

### Overlap Benchmark

Measure communication-computation overlap efficiency:

```bash
# Run overlap benchmark with default settings
python -m sagellm_comm.benchmark.run_overlap_bench

# Custom communication sizes and compute durations
python -m sagellm_comm.benchmark.run_overlap_bench \
    --comm-bytes 1024 4096 16384 \
    --compute-ms 10 50 100

# Custom iterations
python -m sagellm_comm.benchmark.run_overlap_bench --warmup 10 --iters 20

# Specify output directory
python -m sagellm_comm.benchmark.run_overlap_bench --output-dir ./.benchmarks/overlap
```

**Output fields**:
- `comm_bytes`: Communication payload size in bytes
- `compute_duration_ms`: Configured computation duration in milliseconds
- `comm_submit_ms`: Time to submit communication operation
- `comm_wait_ms`: Time waiting for communication to complete
- `compute_ms`: Time for computation
- `total_ms`: Total elapsed time
- `overlap_ratio`: Percentage of overlap (0-1), higher is better
- `overlap_efficiency`: Actual overlap efficiency vs theoretical (0-1)
- `theoretical_sequential_ms`: Expected time without overlap
- `theoretical_overlap_ms`: Expected time with perfect overlap

**Python API**:

```python
from sagellm_comm.benchmark import run_overlap_benchmark

# Run overlap benchmark
results = run_overlap_benchmark(
    comm_bytes=[1024, 4096, 16384],  # Communication sizes
    compute_durations_ms=[10, 50, 100],  # Compute durations
    output_dir=".benchmarks/overlap",
    warmup_iters=5,
    benchmark_iters=10,
)

# Results are automatically exported to JSON + CSV
for r in results:
    print(f"Overlap ratio: {r.overlap_ratio:.2%}, Efficiency: {r.overlap_efficiency:.2%}")
```


## Supported Backends

- NCCL (NVIDIA)
- HCCL (Huawei Ascend)
- RCCL (AMD ROCm)
- Gloo (CPU fallback)

## Dependencies

- `isagellm-protocol>=0.1.0` - Protocol definitions
- `isagellm-backend>=0.1.0` - Backend abstraction

## Development

### Setup

```bash
# Install dev dependencies
pip install -e ".[dev]"

# Install pre-commit hooks
pip install pre-commit
pre-commit install
```

### Pre-commit Hooks

This project uses [pre-commit](https://pre-commit.com/) to ensure code quality:

```bash
# Run on all files
pre-commit run --all-files

# Run on staged files (automatic on git commit)
git commit

# Skip hooks temporarily (not recommended)
git commit --no-verify
```

Configured hooks:
- Ruff linter and formatter
- MyPy type checking
- Trailing whitespace, end-of-file fixer
- YAML/TOML/JSON validation

### Testing

```bash
# Run all tests
pytest tests/ -v

# Run with coverage
pytest tests/ -v --cov=sagellm_comm --cov-report=html

# Run specific test file
pytest tests/test_imports.py -v
```

### Code Quality

```bash
# Format code
ruff format .

# Lint code
ruff check . --fix

# Type check
mypy src/
```

提交流程：先创建 Issue，分支开发并通过测试与 lint，再发起 PR。

## 版本信息

- 当前版本：0.4.0.6
- 变更记录：见 [CHANGELOG.md](CHANGELOG.md)

## 从 sagellm-backend 迁移

如果你之前从 sagellm-backend 调用通信 API，请按以下方式迁移：

### 迁移前（v0.3.x）
```python
# ❌ 旧版：通信操作在 backend（已废弃）
from sagellm_backend import get_provider
backend = get_provider("cuda")
backend.all_reduce(tensor, op="sum")
```

### 迁移后（v0.4.0+）
```python
# ✅ 新版：通信操作使用 sagellm-comm
from sagellm_comm import CommBackend, ReduceOp

comm = CommBackend.create("nccl")  # 或 "hccl"/"gloo"
comm.init_process_group(world_size=4, rank=0)
comm.all_reduce(tensor, op=ReduceOp.SUM)
```

### API 对照表

| 旧 API (backend) | 新 API (comm) | 说明 |
|------------------|---------------|------|
| `backend.all_reduce()` | `comm.all_reduce()` | 集合归约 |
| `backend.all_gather()` | `comm.all_gather()` | 集合收集 |
| `backend.broadcast()` | `comm.broadcast()` | 广播 |
| `backend.send()` | `comm.send()` | P2P 发送 |
| `backend.recv()` | `comm.recv()` | P2P 接收 |
| N/A | `Topology.discover()` | 拓扑发现（新增） |
| N/A | `CommGroup.create()` | 通信组管理（新增） |

### 详细迁移指南

完整的迁移指南请参阅：
- [sagellm-docs: Backend vs Comm 边界说明](https://github.com/intellistream/sagellm-docs/blob/main/docs/BACKEND_VS_COMM_BOUNDARY.md)

## License

Private - IntelliStream Research Project
