Metadata-Version: 2.4
Name: rickcommons
Version: 0.1.0
Summary: 一个Python工具库，提供Redis队列功能
Author-email: li2810081 <276069869@qq.com>
Requires-Python: >=3.8.6
Requires-Dist: pytest>=8.3.5
Requires-Dist: redis>=4.5.0
Description-Content-Type: text/markdown

# Qyscripts

一个Python工具库，提供Redis队列功能。

## 功能特性

- Redis队列管理器
- Redis流式消费者
- Redis队列生产者
- 支持消息重试机制
- 支持批量消息发送
- 消费者组管理

## 安装

```bash
pip install qyscripts
```

## 使用示例

### 基本使用

查看 `examples/basic_usage.py` 文件了解完整的使用示例。

```python
# 基本配置
from qyscripts.redis_queue import RedisQueueConfig, RedisQueueManager

config = RedisQueueConfig(
    host='localhost',
    port=6379,
    password=None,
    db=0
)

manager = RedisQueueManager(config)
```

### 快速开始

1. 安装依赖：
   ```bash
   pip install redis
   ```

2. 运行示例：
   ```bash
   python examples/basic_usage.py
   ```

### 主要功能示例

- **生产者发送消息**
- **消费者消费消息**
- **批量操作**
- **持续消费模式**

详细示例代码请参考 `examples/basic_usage.py`。

## 快速开始

### 基本配置

```python
from qyscripts.redis_queue import RedisQueueConfig, RedisQueueManager

# 创建配置
config = RedisQueueConfig(
    host='localhost',
    port=6379,
    password=None,  # 如果没有密码
    db=0
)

# 创建队列管理器
manager = RedisQueueManager(config)
```

### 使用队列生产者

```python
from qyscripts.redis_queue import RedisQueueProducer

# 创建生产者
producer = RedisQueueProducer(manager)

# 发送单条消息
success = producer.send_message("my_queue", "Hello, World!")
print(f"消息发送成功: {success}")

# 发送带键的消息
success = producer.send_message("my_queue", "Message with key", key="message_key")

# 批量发送消息
messages = ["msg1", "msg2", "msg3"]
success = producer.send_batch_messages("my_queue", messages)
```

### 使用流式消费者

```python
from qyscripts.redis_queue import RedisStreamsConsumer

# 创建消费者
consumer = RedisStreamsConsumer(
    manager=manager,
    stream_name="my_stream",
    consumer_group="my_group",
    consumer_name="consumer_1"
)

# 消费单条消息
message = consumer.consume_message(timeout=1000)  # 1秒超时
if message:
    print(f"收到消息: {message}")
    # 确认消息
    consumer.ack_message(message['id'])

# 批量消费消息
messages = consumer.consume_batch_messages(count=10, timeout=1000)
for msg in messages:
    print(f"批量消息: {msg}")
    consumer.ack_message(msg['id'])

# 持续消费消息
consumer.start_consuming(
    message_handler=lambda msg: print(f"处理消息: {msg}"),
    batch_size=5,
    timeout=1000
)
```

### 完整的示例

```python
import time
from qyscripts.redis_queue import (
    RedisQueueConfig, 
    RedisQueueManager, 
    RedisQueueProducer, 
    RedisStreamsConsumer
)

# 配置Redis连接
config = RedisQueueConfig(
    host='localhost',
    port=6379,
    password=None,
    db=0
)

# 创建管理器
manager = RedisQueueManager(config)

# 创建生产者
producer = RedisQueueProducer(manager)

# 创建消费者
consumer = RedisStreamsConsumer(
    manager=manager,
    stream_name="test_stream",
    consumer_group="test_group",
    consumer_name="test_consumer"
)

def process_message(message):
    """消息处理函数"""
    print(f"处理消息: {message['data']}")
    # 确认消息
    consumer.ack_message(message['id'])

# 发送测试消息
producer.send_message("test_stream", "测试消息1")
producer.send_message("test_stream", "测试消息2")

# 消费消息
print("开始消费消息...")
consumer.start_consuming(
    message_handler=process_message,
    batch_size=2,
    timeout=2000
)

# 运行一段时间后停止
time.sleep(5)
consumer.stop_consuming()

# 关闭连接
manager.close()
```

## API 文档

### RedisQueueConfig

Redis队列配置类。

**参数:**
- `host`: Redis服务器地址
- `port`: Redis端口
- `password`: Redis密码
- `db`: Redis数据库编号

### RedisQueueManager

Redis队列管理器，负责管理Redis连接。

**方法:**
- `get_redis_client()`: 获取Redis客户端
- `close()`: 关闭Redis连接

### RedisQueueProducer

Redis队列生产者，负责发送消息。

**方法:**
- `send_message(queue_name, message, key=None)`: 发送单条消息
- `send_batch_messages(queue_name, messages)`: 批量发送消息

### RedisStreamsConsumer

Redis流式消费者，负责消费消息。

**方法:**
- `consume_message(timeout=1000)`: 消费单条消息
- `consume_batch_messages(count=10, timeout=1000)`: 批量消费消息
- `start_consuming(message_handler, batch_size=1, timeout=1000)`: 开始持续消费
- `stop_consuming()`: 停止消费
- `ack_message(message_id)`: 确认消息

## 测试

运行测试：

```bash
pytest tests/redis_queue_test.py -v
```

## 许可证

MIT License