Metadata-Version: 2.1
Name: ff-arq
Version: 0.1.0
Summary: A simple arq library.
Home-page: https://github.com/itsanr-oris/ff-arq
Author: foris.feng
Author-email: us@f-oris.me
License: MIT
Project-URL: Bug Tracker, https://github.com/itsanr-oris/ff-arq/issues
Project-URL: Source Code, https://github.com/itsanr-oris/ff-arq
Keywords: arq,redis-queue
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.6
Classifier: Programming Language :: Python :: 3.7
Classifier: Programming Language :: Python :: 3.8
Classifier: Programming Language :: Python :: 3.9
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Programming Language :: Python :: 3.13
Classifier: License :: OSI Approved :: MIT License
Classifier: Operating System :: OS Independent
Requires-Python: >=3.6
Description-Content-Type: text/markdown
Requires-Dist: arq
Requires-Dist: redis>=4.2.0
Requires-Dist: typing_extensions; python_version < "3.10"
Provides-Extra: tests
Requires-Dist: faker; extra == "tests"
Requires-Dist: pytest; extra == "tests"
Requires-Dist: pytest-cov; extra == "tests"
Requires-Dist: pytest-asyncio; extra == "tests"

# ff-arq

一个简洁易用的 arq 封装库，简化arq使用流程。

## 主要功能

- 支持任务/定时任务装饰器
- 支持自动加载指定目录下的任务
- 支持arq worker服务重启后，任务快速重启

## 安装

```bash
pip install ff-arq
```

## 快速开始

在`tasks/__init__.py`文件内定义好任务

```python
import asyncio
import logging
from ff_arq import task, cron_task

@task()
async def test_task(message: str):
    """
    测试任务
    """
    await asyncio.sleep(5)
    logging.debug(f"test_task completed, message: {message}")


@cron_task(second={0, 5, 10, 15, 20, 25, 30, 35, 40, 45, 50,55})  # 每分钟执行一次
async def test_cron_task():
    """
    每隔5秒，输出日志 test_cron_task executed
    """
    logging.debug("test_cron_task executed")
```

在 `main.py` 文件内编写一下代码

```python
import asyncio
import logging
from ff_arq import TaskManager


logging.basicConfig(level=logging.DEBUG)

async def start_task_queue():
    """
    启动任务
    """
    # 创建任务队列和任务管理器
    config = {
        "redis_dsn": "redis://localhost:6379/0",
        "task_dir_path": "src/tasks", # auto load arq task from dir
        "queues": {
            # queue_name: queue_config
            "default": {
                "concurrency": 10,
                "task_timeout": 300,
                "keep_result": 3600,
                "retry_jobs": True,
                "max_tries": 5,
                "handle_signals": False,
            }
        }
    }
    await TaskManager.init(**config).run()

async def run_test_task():
    """
    执行task
    """
    from tasks import test_task
    queue_job = await test_task(message="hello ff-arq", enqueue=True)
    logging.debug(f"queue job: {queue_job}")
    # 等待5秒后，在日志里面可以看到：test_task completed, message: hello ff-arq

async def main():

    tasks = [
        start_task_queue(),
        run_test_task()
    ]
    await asyncio.gather(*tasks)

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
```

## 更多示例

请参考 `tests` 目录下的用例。

## 贡献指南

欢迎提交 Issue 或 PR，完善功能或修复问题。请确保代码风格与项目保持一致，并补充必要的测试。

## License

MIT
