Metadata-Version: 2.4
Name: kk-mcp
Version: 0.1.2
Summary: Kafka MCP Server - A Model Context Protocol server for Apache Kafka integration
Project-URL: Homepage, https://github.com/wangyajun/kk-mcp
Project-URL: Documentation, https://github.com/wangyajun/kk-mcp#readme
Project-URL: Repository, https://github.com/wangyajun/kk-mcp
Project-URL: Issues, https://github.com/wangyajun/kk-mcp/issues
Author: wangyajun
License: MIT
Keywords: fastmcp,kafka,mcp,messaging,model-context-protocol
Classifier: Development Status :: 3 - Alpha
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: MIT License
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.13
Classifier: Topic :: Software Development :: Libraries
Requires-Python: >=3.13
Requires-Dist: confluent-kafka>=2.3.0
Requires-Dist: mcp[cli]>=1.26.0
Requires-Dist: pyyaml>=6.0
Provides-Extra: dev
Requires-Dist: pytest-asyncio>=0.23.0; extra == 'dev'
Requires-Dist: pytest>=8.0.0; extra == 'dev'
Description-Content-Type: text/markdown

# MCP + Kafka 集成项目

一个集成了 Apache Kafka 的模型上下文协议（MCP）服务器，用于消息生产、消费和管理。

## 功能特性

- **消息生产**: 向 Kafka 主题发送单条消息或批量消息
- **消息消费**: 从 Kafka 主题消费消息，支持可配置的消费者
- **主题管理**: 创建、列出、描述和删除 Kafka 主题
- **消费者组管理**: 列出和描述消费者组
- **系统状态**: 通过 MCP 资源监控集群状态
- **异步操作**: 完整的异步支持，非阻塞操作
- **配置管理**: 基于 YAML 的配置管理和验证
- **连接池**: 生产者连接池，提升性能
- **演示模式**: 无需 Kafka 实例即可测试的演示版本

## 安装

1. 安装依赖:
   ```bash
   uv sync
   ```

2. 安装 Kafka（可选）:

   使用 Docker:
   ```bash
   # 首先拉取镜像
   docker pull confluentinc/cp-kafka:latest

   # 启动 Kafka
   docker run -d -p 9092:9092 \
     -e KAFKA_BROKER_ID=1 \
     -e KAFKA_ZOOKEEPER_CONNECT=0.0.0.0:2181 \
     -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 \
     -e KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT \
     -e KAFKA_INTER_BROKER_LISTENER_NAME=PLAINTEXT \
     --name kafka-server confluentinc/cp-kafka:latest
   ```

3. 配置 Kafka 设置，编辑 `config/kafka_config.yaml`

## 使用方法

### 运行服务器

#### 方式一：完整版本（需要 Kafka）
```bash
python3 main.py
```
或
```bash
uv run main.py
```

#### 方式二：演示版本（无需 Kafka）
如果不想安装 Kafka，可以使用演示版本进行测试：
```bash
python3 main_demo.py
```

演示版本提供了模拟的 Kafka 功能，所有数据存储在内存中，适合快速测试和学习。

### 可用工具

#### 生产者工具
- `create_producer` - 创建生产者实例
- `send_message` - 向主题发送单条消息
- `send_messages_batch` - 批量发送消息
- `close_producer` - 关闭生产者实例

#### 消费者工具
- `create_consumer` - 创建消费者实例
- `consume_messages` - 开始从主题消费消息
- `close_consumer` - 关闭消费者实例

#### 主题管理工具
- `list_topics` - 列出集群中的所有主题
- `create_topic` - 创建新主题
- `describe_topic` - 获取主题详细信息
- `delete_topic` - 删除主题

#### 消费者组工具
- `list_consumer_groups` - 列出所有消费者组
- `describe_consumer_group` - 获取消费者组详细信息

#### 演示模式工具（仅演示版本）
- `create_topic` - 创建主题
- `send_message` - 发送消息
- `list_topics` - 列出所有主题
- `consume_messages` - 消费消息
- `add` - 简单的计算器

### 可用资源

#### 完整版本
- `kafka://status` - 获取系统状态
- `kafka://topics` - 获取所有主题列表
- `kafka://topics/{topic}` - 获取特定主题的详细信息
- `kafka://consumers` - 获取所有消费者组列表
- `kafka://consumers/{group}` - 获取特定消费者组的详细信息

#### 演示版本
- `demo://status` - 获取演示服务器状态

### 可用提示

#### 演示版本
- `help_prompt` - 获取演示服务器的使用帮助

## 配置

Kafka 配置通过 `config/kafka_config.yaml` 管理，示例配置如下：

```yaml
kafka:
  # Kafka 引导服务器
  bootstrap_servers: "localhost:9092"

  # 安全配置
  security:
    protocol: "PLAINTEXT"

  # 主题配置
  topics:
    default:
      partitions: 3
      replication_factor: 1

    notifications:
      partitions: 6
      replication_factor: 2
      configs:
        retention.ms: "604800000"  # 7 天
        cleanup.policy: "delete"

  # 消费者配置
  consumers:
    default:
      group_id: "mcp-consumer"
      auto_offset_reset: "earliest"
      enable_auto_commit: true
      auto_commit_interval_ms: 1000
      session_timeout_ms: 30000
      heartbeat_interval_ms: 3000
      max_poll_records: 500
      max_poll_interval_ms: 300000

    analytics:
      group_id: "analytics-consumer"
      auto_offset_reset: "latest"
      enable_auto_commit: false
      max_poll_records: 1000

  # 生产者配置
  producers:
    default:
      acks: "all"
      retries: 3
      batch_size: 16384
      linger_ms: 5
      buffer_memory: 33554432
      compression_type: "snappy"

    high-throughput:
      acks: "1"
      retries: 0
      batch_size: 32768
      linger_ms: 10
      buffer_memory: 67108864
      compression_type: "lz4"
```

## 测试

运行测试脚本验证实现：

```bash
python3 test_kafka_integration.py
```

测试内容包括：
- 模块导入测试
- 配置加载测试
- Kafka 模块创建测试
- 错误处理测试

## 项目架构

```
kk-mcp/
├── main.py                 # 主服务器文件（需要 Kafka）
├── main_demo.py           # 演示服务器文件（无需 Kafka）
├── kafka_integration.py     # Kafka 集成模块
├── kafka_config.py         # 配置管理
├── kafka_errors.py         # 错误处理
├── kafka_tools.py          # 工具定义
├── kafka_resources.py      # 资源定义
├── kafka_clients.py        # Kafka 客户端封装
├── kafka_prompts.py        # 提示模板
├── config/kafka_config.yaml # 配置文件
├── test_kafka_integration.py # 集成测试
└── README.md              # 项目文档
```

## 使用示例

### 发送消息（完整版本）

```python
{
  "tool": "send_message",
  "arguments": {
    "topic": "my-topic",
    "value": "Hello, Kafka!",
    "key": "greeting",
    "headers": "{\"source\": \"mcp\", \"timestamp\": \"2024-01-01T00:00:00Z\"}"
  }
}
```

### 创建主题（完整版本）

```python
{
  "tool": "create_topic",
  "arguments": {
    "topic_name": "orders",
    "partitions": 6,
    "replication_factor": 2
  }
}
```

### 消费消息（完整版本）

```python
{
  "tool": "consume_messages",
  "arguments": {
    "topics": ["my-topic"],
    "config_name": "default"
  }
}
```

### 演示版本使用示例

```python
# 1. 创建主题
{
  "tool": "create_topic",
  "arguments": {
    "topic_name": "my-topic",
    "partitions": 3
  }
}

# 2. 发送消息
{
  "tool": "send_message",
  "arguments": {
    "topic": "my-topic",
    "value": "Hello, Demo!",
    "key": "greeting"
  }
}

# 3. 列出主题
{
  "tool": "list_topics"
}

# 4. 消费消息
{
  "tool": "consume_messages",
  "arguments": {
    "topic": "my-topic"
  }
}
```

## 常见问题

### MCP error -32000

**问题**: 工具调用时出现 error -32000 错误

**原因**: Kafka 服务器未运行或无法连接到 localhost:9092

**解决方案**:
1. 使用演示版本（无需 Kafka）:
   ```bash
   python3 main_demo.py
   ```
2. 或启动 Kafka 服务器后运行完整版本

### Kafka 连接失败

**问题**: 无法连接到 Kafka 服务器

**解决方案**:
1. 检查 Kafka 是否正在运行:
   ```bash
   docker ps  # 查看容器是否运行
   ```
2. 检查配置文件中的 bootstrap_servers 设置
3. 确认端口 9092 未被占用

### 导入错误

**问题**: 模块导入失败

**解决方案**:
1. 确保在项目根目录运行
2. 安装所有依赖:
   ```bash
   uv sync
   ```

## 前置要求

### 完整版本
- Python 3.13+
- Apache Kafka 实例（本地或远程）
- confluent-kafka Python 包
- 可选: Docker 用于运行 Kafka

### 演示版本
- Python 3.13+
- 无需 Kafka 实例
- 适合快速测试和学习

## 技术栈

- **MCP**: Model Context Protocol 服务器
- **FastMCP**: MCP 服务器框架
- **confluent-kafka**: 高性能 Kafka 客户端
- **PyYAML**: 配置文件解析
- **asyncio**: 异步操作支持

## 贡献

1. Fork 仓库
2. 创建功能分支
3. 为新功能添加测试
4. 运行测试套件
5. 提交 Pull Request

## 许可证

本项目采用 MIT 许可证。

## 相关链接

- [MCP 文档](https://modelcontextprotocol.io/)
- [FastMCP 文档](https://github.com/jlowin/fastmcp)
- [Apache Kafka 文档](https://kafka.apache.org/documentation/)
- [confluent-kafka Python 客户端](https://github.com/confluentinc/confluent-kafka-python)