⚡ Async & Queues
For long-running tasks (like generating a report, scraping a website, or processing a PDF), you should never block the API response. Antilogix provides a built-in Job Queue to handle these tasks in the background.
The Queue Architecture
- Client sends a request to the API.
- API pushes a job to the Queue (SQLite-backed) and returns a
job_idinstantly. - Background Worker picks up the job and executes it.
- Client polls the API to check the status.
1. Pushing a Job
Use the QueueService to add tasks.
from fastapi import APIRouter
from sentinel_core.services.queue import QueueService
router = APIRouter()
queue = QueueService.get_instance()
@router.post("/generate-report")
async def start_report_generation(topic: str):
# Push job to queue
job_id = await queue.push({
"task_type": "finance_report",
"payload": {"topic": topic}
})
return {
"status": "queued",
"job_id": job_id,
"message": "Report generation started."
}
2. Processing Jobs (The Worker)
Antilogix runs a background worker process automatically when you start main.py. You need to define how to handle the jobs.
File: app/jobs/worker.py (or wherever your worker logic lives)
# This function is called by the framework when a job is picked up
async def process_job(job_data: dict):
task_type = job_data.get("task_type")
payload = job_data.get("payload")
if task_type == "finance_report":
# Call your agent or logic here
return await generate_finance_report(payload["topic"])
return "Unknown Task"
3. Checking Status
You can create an endpoint to check if the job is done.
@router.get("/jobs/{job_id}")
async def get_job_status(job_id: str):
job = await queue.get_status(job_id)
if not job:
return {"error": "Job not found"}
return {
"job_id": job["job_id"],
"status": job["status"], # PENDING, PROCESSING, COMPLETED, FAILED
"result": job["result"] # The output of your worker function
}