Metadata-Version: 2.4
Name: pypabhiveagent
Version: 1.0.0
Summary: A Python package for querying Hive data and processing with AIGC applications
Home-page: https://github.com/Nevernamed/pypabhiveagent
Author: pypabhiveagent
Author-email: pypabhiveagent <kerwinchou@yeah.net>
License-Expression: MIT
Project-URL: Homepage, https://github.com/Nevernamed/pypabhiveagent
Project-URL: Bug Reports, https://github.com/Nevernamed/pypabhiveagent/issues
Project-URL: Source, https://github.com/Nevernamed/pypabhiveagent
Project-URL: Documentation, https://github.com/Nevernamed/pypabhiveagent#readme
Project-URL: Changelog, https://github.com/Nevernamed/pypabhiveagent/blob/main/CHANGELOG.md
Keywords: hive,aigc,data-processing,llm
Classifier: Development Status :: 4 - Beta
Classifier: Intended Audience :: Developers
Classifier: Topic :: Software Development :: Libraries :: Python Modules
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.7
Classifier: Programming Language :: Python :: 3.8
Classifier: Programming Language :: Python :: 3.9
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Requires-Python: >=3.7
Description-Content-Type: text/markdown
License-File: LICENSE
Requires-Dist: pandas<3.0.0,>=1.3.0
Requires-Dist: requests<3.0.0,>=2.26.0
Requires-Dist: openpyxl<4.0.0,>=3.0.9
Provides-Extra: hive
Requires-Dist: pyspark<4.0.0,>=3.0.0; extra == "hive"
Requires-Dist: pyhive>=0.6.0; extra == "hive"
Provides-Extra: dev
Requires-Dist: pytest>=7.0.0; extra == "dev"
Requires-Dist: pytest-cov>=3.0.0; extra == "dev"
Requires-Dist: requests-mock>=1.9.3; extra == "dev"
Requires-Dist: black>=22.0.0; extra == "dev"
Requires-Dist: flake8>=4.0.0; extra == "dev"
Requires-Dist: mypy>=0.950; extra == "dev"
Dynamic: author
Dynamic: home-page
Dynamic: license-file
Dynamic: requires-python

# pypabhiveagent

一个用于查询 Hive 数据并通过 AIGC 应用处理的 Python 包。

pypabhiveagent 提供了一个简单易用的接口，用于从 Hive 表查询数据，将数据发送到大模型应用（Prompt 或 Agent 类型）进行处理，解析结果并将处理后的数据存储回 Hive 表或本地 Excel 文件。

## 功能特性

- ✅ **Hive 数据查询**：使用标准 SQL 查询 Hive 表数据
- ✅ **DataFrame 输入**：支持直接传入 pandas DataFrame 作为数据源
- ✅ **DataFrame 输出**：查询结果包含 DataFrame 对象，便于后续处理
- ✅ **大小写不敏感**：列名匹配支持大小写不敏感，提高易用性
- ✅ **AIGC 应用集成**：支持 Prompt 和 Agent 两种应用类型
- ✅ **智能结果解析**：自动解析 JSON 或字符串格式的结果
- ✅ **思维链支持**：可选择获取 AIGC 的推理过程
- ✅ **多种存储方式**：支持保存到 Hive 表或 Excel 文件
- ✅ **并发处理**：使用多线程提高大数据集处理效率
- ✅ **完善的错误处理**：详细的错误日志和重试机制
- ✅ **灵活的配置**：支持链式调用和丰富的配置选项

## 安装

### 基础安装

```bash
pip install pypabhiveagent
```

### 完整安装（包含 Hive 支持）

如果需要使用 Hive 查询功能，需要额外安装 Hive 支持：

```bash
# 安装包含 PySpark 支持
pip install pypabhiveagent[hive]
```

### 从源码安装

```bash
git clone https://github.com/Nevernamed/pypabhiveagent.git
cd pypabhiveagent
pip install -e .  # 基础安装
# 或
pip install -e .[hive]  # 包含 Hive 支持
```

### 依赖说明

- **基础安装**：只包含核心功能（pandas, requests, openpyxl），支持 DataFrame 输入输出
- **Hive 支持**：需要额外安装 `pypabhiveagent[hive]` 来获得 PySpark 支持，用于 Hive 查询和存储
- **注意**：如果只使用 DataFrame 作为数据源，不需要安装 Hive 支持

## 快速开始

### 基本使用示例

```python
from pypabhiveagent import HiveAgent

# 1. 创建 HiveAgent 实例
agent = HiveAgent()

# 2. 配置连接参数
agent.set_config(
    # AIGC 服务配置
    url="https://aigc-api.example.com/prompt",
    app_id="your_app_id",
    token="your_token",
    aigc_app_id="prompt_app_12345"
)

# 3. 执行查询和处理
result = agent.query(
    # SQL 查询语句
    sql="SELECT id, question FROM question_table WHERE dt='20231201' LIMIT 10",
    
    # 应用类型：'prompt' 或 'agent'
    app_type='prompt',
    
    # 字段映射：将 Hive 字段映射到 AIGC 参数
    field_mapping={'question': None},
    
    # 是否包含思维链结果
    include_reasoning=False,
    
    # 存储选项
    cache_to_excel=True,
    save_to_hive=True,
    result_table_name='question_answer_result'
)

# 4. 查看结果
print(f"处理状态: {result['success']}")
print(f"处理消息: {result['message']}")
print(f"处理行数: {result['rows_processed']}")

# 获取结果 DataFrame 用于后续处理
if result.get('df') is not None:
    result_df = result['df']
    print(f"结果 DataFrame: {len(result_df)} 行 x {len(result_df.columns)} 列")

if result.get('excel_path'):
    print(f"Excel 文件: {result['excel_path']}")
if result.get('hive_table'):
    print(f"Hive 表: {result['hive_table']}")
```

### 链式调用示例

```python
from pypabhiveagent import HiveAgent

result = (
    HiveAgent()
    .set_config(
        url="https://aigc-api.example.com/prompt",
        app_id="your_app_id",
        token="your_token",
        aigc_app_id="prompt_app_12345"
    )
    .query(
        sql="SELECT id, text FROM my_table WHERE dt='20231201'",
        app_type='prompt',
        field_mapping={'text': None},
        cache_to_excel=True,
        save_to_hive=False
    )
)

print(result['message'])

# 获取结果 DataFrame
result_df = result['df']
```

## 配置说明

### 必需配置参数

使用 `set_config()` 方法配置以下必需参数：

| 参数 | 类型 | 说明 |
|------|------|------|
| `url` | str | AIGC 服务 URL |
| `app_id` | str | 应用 ID |
| `token` | str | 认证 token |
| `aigc_app_id` | str | AIGC 应用 ID |

### 可选配置参数

| 参数 | 类型 | 默认值 | 说明 |
|------|------|--------|------|
| `timeout` | int | 60 | AIGC 请求超时时间（秒） |
| `max_retries` | int | 3 | 最大重试次数 |
| `sleep` | float | 2 | 请求间隔时间（秒），用于避免 API 限流 |

### 配置示例

```python
agent = HiveAgent()
agent.set_config(
    # 必需参数
    url="https://aigc-api.example.com/prompt",
    app_id="your_app_id",
    token="your_token",
    aigc_app_id="prompt_app_12345",
    
    # 可选参数
    timeout=60,        # 增加超时时间到 60 秒
    max_retries=5,     # 增加重试次数到 5 次
    sleep=0.5          # 每个请求间隔 0.5 秒
)
```

## 使用指南

### 应用类型

pypabhiveagent 支持两种 AIGC 应用类型：

#### 1. Prompt 应用 (`app_type="prompt"`)

基于提示词的应用，适合模板化的查询场景。

**单参数输入示例：**

```python
result = agent.query(
    sql="SELECT id, question FROM qa_table WHERE dt='20231201'",
    app_type='prompt',
    field_mapping={'question': None},  # 单参数
    cache_to_excel=True,
    save_to_hive=True,
    result_table_name='qa_result'
)
```

**多参数输入示例：**

```python
result = agent.query(
    sql="""
        SELECT user_name, product_name, purchase_amount 
        FROM purchase_table 
        WHERE dt='20231201'
    """,
    app_type='prompt',
    field_mapping={
        'user_name': 'userName',
        'product_name': 'productName',
        'purchase_amount': 'amount'
    },  # 多参数
    cache_to_excel=True,
    save_to_hive=True,
    result_table_name='purchase_analysis'
)
```

#### 2. Agent 应用 (`app_type="agent"`)

基于智能体的应用，适合复杂推理任务。

**单参数输入示例：**

```python
result = agent.query(
    sql="SELECT id, content FROM content_table WHERE dt='20231201'",
    app_type='agent',
    field_mapping={'content': None},  # 单参数
    include_reasoning=True,  # 包含思维链
    cache_to_excel=True,
    save_to_hive=True,
    result_table_name='content_analysis'
)
```

**多参数输入示例：**

```python
result = agent.query(
    sql="""
        SELECT customer_name, order_status, order_amount 
        FROM order_table 
        WHERE dt='20231201'
    """,
    app_type='agent',
    field_mapping={
        'customer_name': 'customerName',
        'order_status': 'status',
        'order_amount': 'amount'
    },  # 多参数
    include_reasoning=True,
    cache_to_excel=True,
    save_to_hive=True,
    result_table_name='order_analysis'
)
```

### 字段映射

字段映射用于指定 Hive 查询结果字段与 AIGC 应用参数的对应关系。

#### 单参数映射

当只有一个输入参数时：

```python
field_mapping = {'question': None}
# 或
field_mapping = {'question': 'query'}
```

- **Prompt 应用**：值会放入请求体的 `query` 字段
- **Agent 应用**：值会放入请求体的 `message` 字段（格式：`{'content': value, 'content_type': 'text'}`）

#### 多参数映射

当有多个输入参数时：

```python
field_mapping = {
    'field1': 'param1',
    'field2': 'param2',
    'field3': 'param3'
}
```

- **Prompt 应用**：键值对会放入请求体的 `dynamicColumMap` 字段
- **Agent 应用**：键值对会放入请求体的 `args` 字段

#### 映射规则

- **键**：Hive 查询结果中的字段名
- **值**：发送到 AIGC 应用的参数名（可以为 `None`，表示使用字段名）

### 结果处理

pypabhiveagent 会自动解析 AIGC 返回的结果并合并到原始数据中。

#### JSON 结果

如果 AIGC 返回 JSON 格式：

```json
{"answer": "这是答案", "confidence": 0.95}
```

会自动解析并添加为新列：

| id | question | answer | confidence |
|----|----------|--------|------------|
| 1  | 问题1    | 这是答案 | 0.95       |

#### 字符串结果

如果 AIGC 返回非 JSON 字符串，会创建 `result` 列：

| id | question | result |
|----|----------|--------|
| 1  | 问题1    | 这是答案 |

#### 思维链结果

当 `include_reasoning=True` 时，会添加 `reasoning_content` 列：

| id | question | answer | reasoning_content |
|----|----------|--------|-------------------|
| 1  | 问题1    | 这是答案 | 推理过程...        |

### 存储选项

#### Excel 缓存

设置 `cache_to_excel=True` 将结果保存到本地 Excel 文件：

```python
result = agent.query(
    sql="SELECT * FROM my_table",
    app_type='prompt',
    field_mapping={'text': None},
    cache_to_excel=True  # 保存到 Excel
)

print(f"Excel 文件: {result['excel_path']}")
# 输出: Excel 文件: prompt_app_12345-20231201143025.xlsx
```

文件命名格式：`{aigcAppId}-{年月日时分秒}.xlsx`

#### Hive 存储

设置 `save_to_hive=True` 将结果保存到 Hive 表：

```python
result = agent.query(
    sql="SELECT * FROM my_table WHERE dt='20231201'",
    app_type='prompt',
    field_mapping={'text': None},
    save_to_hive=True,
    result_table_name='my_result_table',  # 必需
    write_mode='append'  # 'append' 或 'overwrite'
)

print(f"Hive 表: {result['hive_table']}")
# 输出: Hive 表: my_result_table
```

**自动表管理：**

- 如果结果表不存在，会自动创建
- 如果源表有 `dt` 字段，会创建分区表（按 `dt` 分区）
- 自动推断字段类型

**写入模式：**

- `append`（默认）：追加数据到表中
- `overwrite`：覆盖表中的数据

### 并发处理

使用 `max_workers` 参数控制并发线程数：

```python
result = agent.query(
    sql="SELECT * FROM large_table WHERE dt='20231201'",
    app_type='prompt',
    field_mapping={'text': None},
    max_workers=10  # 使用 10 个并发线程
)
```

**性能建议：**

- 默认值为 1，确保稳定性
- 如果 API 支持高并发，可以适当增加 `max_workers`（如 5-10）
- 如果遇到 API 限流，降低 `max_workers` 或增加 `sleep` 参数
- 建议每次处理 100-1000 行数据

### DataFrame 输入输出

#### 使用 DataFrame 作为数据源

除了使用 SQL 查询，pypabhiveagent 还支持直接传入 pandas DataFrame 作为数据源：

```python
import pandas as pd
from pypabhiveagent import HiveAgent

# 创建或加载 DataFrame
data = {
    'id': [1, 2, 3, 4, 5],
    'Question': ['问题1', '问题2', '问题3', '问题4', '问题5'],  # 注意大小写
    'dt': ['20231201'] * 5
}
df = pd.DataFrame(data)

# 使用 DataFrame 作为数据源
agent = HiveAgent()
agent.set_config(
    url="https://aigc-api.example.com/prompt",
    app_id="your_app_id",
    token="your_token",
    aigc_app_id="prompt_app_12345"
)

result = agent.query(
    df=df,  # 传入 DataFrame 而不是 SQL
    app_type='prompt',
    field_mapping={'question': None},  # 大小写不敏感，会匹配到 'Question'
    cache_to_excel=True,
    save_to_hive=True,
    result_table_name='result_table'
)
```

**使用场景：**
- 从 CSV、Excel 等文件加载数据
- 从 API 获取的数据
- 已经在内存中处理过的数据
- 需要预处理或筛选的数据

#### 获取结果 DataFrame

查询结果中包含处理后的 DataFrame，可以直接用于后续处理：

```python
result = agent.query(
    sql="SELECT id, question FROM qa_table WHERE dt='20231201'",
    app_type='prompt',
    field_mapping={'question': None},
    cache_to_excel=False,
    save_to_hive=False
)

# 获取结果 DataFrame
if result['success'] and result['df'] is not None:
    result_df = result['df']
    
    # 进行后续处理
    print(f"处理了 {len(result_df)} 行数据")
    print(f"列名: {list(result_df.columns)}")
    
    # 数据分析
    print(result_df.describe())
    
    # 保存到其他格式
    result_df.to_csv('output.csv', index=False)
    result_df.to_json('output.json', orient='records')
    
    # 继续处理
    filtered_df = result_df[result_df['score'] > 0.8]
```

**使用场景：**
- 需要对结果进行进一步分析
- 保存到多种格式（CSV、JSON、Parquet 等）
- 与其他数据处理流程集成
- 数据质量检查和验证

#### 大小写不敏感的列名匹配

field_mapping 中的列名匹配是大小写不敏感的，提高了易用性：

```python
# DataFrame 的列名可能是各种大小写
df = pd.DataFrame({
    'ID': [1, 2, 3],
    'UserName': ['张三', '李四', '王五'],
    'ProductName': ['产品A', '产品B', '产品C'],
    'DT': ['20231201'] * 3
})

# field_mapping 中可以使用任意大小写
result = agent.query(
    df=df,
    app_type='prompt',
    field_mapping={
        'username': 'userName',      # 会匹配到 'UserName'
        'productname': 'productName'  # 会匹配到 'ProductName'
    }
)
```

**优势：**
- 不需要担心列名的大小写
- 提高代码的健壮性
- 减少因大小写不匹配导致的错误

### 错误处理

pypabhiveagent 提供完善的错误处理机制。当部分行处理失败时：

**处理策略：**
- ✅ 保留原始数据
- ✅ 结果字段填充 None 值
- ✅ 保持与成功数据的结构一致
- ✅ 打印失败统计和提醒信息

```python
result = agent.query(
    sql="SELECT * FROM my_table",
    app_type='prompt',
    field_mapping={'text': None}
)

# 查看处理结果
print(f"处理状态: {result['success']}")
print(f"处理消息: {result['message']}")

# 获取结果 DataFrame
if result['df'] is not None:
    result_df = result['df']
    
    # 识别失败的行（结果列为 None）
    if 'answer' in result_df.columns:
        failed_rows = result_df[result_df['answer'].isna()]
        success_rows = result_df[result_df['answer'].notna()]
        
        print(f"成功: {len(success_rows)} 行")
        print(f"失败: {len(failed_rows)} 行")
        
        # 查看失败详情
        if result.get('errors'):
            for error in result['errors'][:5]:
                print(f"行 {error['row_index']}: {error['error']}")
```

**失败时的输出示例：**
```
================================================================================
⚠️  PROCESSING COMPLETED WITH ERRORS
================================================================================
✓ Successfully processed: 8 rows
✗ Failed to process: 2 rows

📌 Failed rows are included in the result with:
   - Original data preserved
   - Result columns filled with None values
   - Result columns: answer, confidence

💡 Tip: Filter failed rows using: df[df['result_column'].isna()]
================================================================================
```

**数据结构示例：**
```python
# 成功的行
id | question | answer | confidence
1  | 问题1    | 答案1  | 0.95

# 失败的行（结果列为 None）
id | question | answer | confidence
2  | 问题2    | None   | None
```

**错误类型：**

- `ConfigurationError`：配置错误
- `HiveQueryError`：Hive 查询错误
- `AIGCRequestError`：AIGC 请求错误
- `ResultParseError`：结果解析错误
- `StorageError`：存储错误

详细的错误处理指南请参考 [docs/ERROR_HANDLING_GUIDE.md](docs/ERROR_HANDLING_GUIDE.md)

### 日志配置

pypabhiveagent 使用 Python 标准 logging 模块：

```python
from pypabhiveagent import configure_logging, set_log_level
import logging

# 配置日志
configure_logging(
    level=logging.INFO,
    log_file='pypabhiveagent.log'
)

# 或者只设置日志级别
set_log_level(logging.DEBUG)
```

## 完整示例

查看 `examples/` 目录获取更多示例：

- `basic_usage.py`：基本使用示例
- `prompt_app_example.py`：Prompt 应用完整示例
- `agent_app_example.py`：Agent 应用完整示例
- `dataframe_usage_example.py`：DataFrame 输入输出示例
- `error_handling_example.py`：错误处理示例

或者使用 `demo()` 方法查看示例代码：

```python
from pypabhiveagent import HiveAgent

# 直接打印完整的使用指南
HiveAgent.demo()
```

## 开发

### 安装开发依赖

```bash
pip install -r requirements-dev.txt
```

### 运行测试

```bash
pytest
```

### 代码格式化

```bash
black pypabhiveagent/
```

### 类型检查

```bash
mypy pypabhiveagent/
```

### 代码检查

```bash
flake8 pypabhiveagent/
```

## 常见问题

### 1. 如何避免 API 限流？

使用 `sleep` 参数设置请求间隔：

```python
agent.set_config(
    url="...",
    app_id="...",
    token="...",
    aigc_app_id="...",
    sleep=0.5  # 每个请求间隔 0.5 秒
)
```

或者降低并发数：

```python
result = agent.query(
    sql="...",
    app_type='prompt',
    field_mapping={'text': None},
    max_workers=3  # 降低并发数
)
```

### 2. 如何处理大数据集？

建议分批处理：

```python
# 按日期分批处理
dates = ['20231201', '20231202', '20231203']

for date in dates:
    result = agent.query(
        sql=f"SELECT * FROM my_table WHERE dt='{date}'",
        app_type='prompt',
        field_mapping={'text': None},
        result_table_name='my_result_table',
        write_mode='append'  # 追加模式
    )
    print(f"处理 {date}: {result['message']}")
```

### 3. 如何自定义超时时间？

使用 `timeout` 参数：

```python
agent.set_config(
    url="...",
    app_id="...",
    token="...",
    aigc_app_id="...",
    timeout=60  # 超时时间 60 秒
)
```

### 4. 如何只保存到 Excel 不保存到 Hive？

```python
result = agent.query(
    sql="...",
    app_type='prompt',
    field_mapping={'text': None},
    cache_to_excel=True,
    save_to_hive=False  # 不保存到 Hive
)
```

### 5. 如何使用 DataFrame 作为数据源？

```python
import pandas as pd

# 准备 DataFrame
df = pd.DataFrame({
    'id': [1, 2, 3],
    'question': ['问题1', '问题2', '问题3'],
    'dt': ['20231201'] * 3
})

# 使用 DataFrame 作为数据源
result = agent.query(
    df=df,  # 传入 DataFrame
    app_type='prompt',
    field_mapping={'question': None}
)
```

### 6. 如何获取结果 DataFrame 进行后续处理？

```python
result = agent.query(
    sql="SELECT * FROM my_table",
    app_type='prompt',
    field_mapping={'text': None}
)

# 获取结果 DataFrame
if result['success'] and result['df'] is not None:
    result_df = result['df']
    
    # 进行后续处理
    result_df.to_csv('output.csv', index=False)
    filtered_df = result_df[result_df['score'] > 0.8]
```

### 7. 列名大小写不匹配怎么办？

不用担心！pypabhiveagent 支持大小写不敏感的列名匹配：

```python
# DataFrame 列名是 'UserName'，field_mapping 中可以使用 'username'
df = pd.DataFrame({'UserName': ['张三', '李四']})

result = agent.query(
    df=df,
    app_type='prompt',
    field_mapping={'username': None}  # 自动匹配到 'UserName'
)
```

## 版本信息

当前版本：**1.0.0**

主要特性：
- 支持 Hive SQL 查询和 DataFrame 输入
- 支持 Prompt 和 Agent 两种 AIGC 应用类型
- 大小写不敏感的列名匹配
- 完善的错误处理机制
- 灵活的存储选项（Excel 和 Hive）

## 许可证

MIT License - 详见 [LICENSE](LICENSE) 文件。

## 贡献

欢迎贡献！请随时提交 Pull Request。

## 支持

如有问题或建议，请提交 [Issue](https://github.com/Nevernamed/pypabhiveagent/issues)。
