Skip to content

Job Queue

The Queue Service handles background processing for long-running tasks.

antilogix.services.queue.QueueService

Source code in antilogix\services\queue.py
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
class QueueService:
    _instance = None
    DB_NAME = "sentinel_metrics.db" # Reusing the same DB file

    @classmethod
    def get_instance(cls):
        if cls._instance is None:
            cls._instance = QueueService()
        return cls._instance

    async def init_db(self):
        """Creates the jobs table."""
        async with aiosqlite.connect(self.DB_NAME) as db:
            await db.execute("""
                CREATE TABLE IF NOT EXISTS jobs (
                    job_id TEXT PRIMARY KEY,
                    status TEXT, -- PENDING, PROCESSING, COMPLETED, FAILED
                    input_data TEXT,
                    result TEXT,
                    created_at REAL
                )
            """)
            await db.commit()

    async def push(self, data: Dict[str, Any]) -> str:
        """Adds a job to the queue."""
        job_id = str(uuid.uuid4())
        async with aiosqlite.connect(self.DB_NAME) as db:
            await db.execute(
                "INSERT INTO jobs (job_id, status, input_data, created_at) VALUES (?, ?, ?, CURRENT_TIMESTAMP)",
                (job_id, "PENDING", json.dumps(data))
            )
            await db.commit()
        return job_id

    async def fetch_pending(self) -> Optional[Dict]:
        """Gets the next job to process."""
        async with aiosqlite.connect(self.DB_NAME) as db:
            db.row_factory = aiosqlite.Row
            cursor = await db.execute("SELECT * FROM jobs WHERE status = 'PENDING' LIMIT 1")
            row = await cursor.fetchone()

            if row:
                # Mark as processing immediately so no other worker picks it up
                await db.execute("UPDATE jobs SET status = 'PROCESSING' WHERE job_id = ?", (row['job_id'],))
                await db.commit()
                return dict(row)
        return None

    async def complete(self, job_id: str, result: str):
        async with aiosqlite.connect(self.DB_NAME) as db:
            await db.execute("UPDATE jobs SET status = 'COMPLETED', result = ? WHERE job_id = ?", (result, job_id))
            await db.commit()

    async def get_status(self, job_id: str):
        async with aiosqlite.connect(self.DB_NAME) as db:
            db.row_factory = aiosqlite.Row
            cursor = await db.execute("SELECT * FROM jobs WHERE job_id = ?", (job_id,))
            row = await cursor.fetchone()
            return dict(row) if row else None

fetch_pending() async

Gets the next job to process.

Source code in antilogix\services\queue.py
42
43
44
45
46
47
48
49
50
51
52
53
54
async def fetch_pending(self) -> Optional[Dict]:
    """Gets the next job to process."""
    async with aiosqlite.connect(self.DB_NAME) as db:
        db.row_factory = aiosqlite.Row
        cursor = await db.execute("SELECT * FROM jobs WHERE status = 'PENDING' LIMIT 1")
        row = await cursor.fetchone()

        if row:
            # Mark as processing immediately so no other worker picks it up
            await db.execute("UPDATE jobs SET status = 'PROCESSING' WHERE job_id = ?", (row['job_id'],))
            await db.commit()
            return dict(row)
    return None

init_db() async

Creates the jobs table.

Source code in antilogix\services\queue.py
17
18
19
20
21
22
23
24
25
26
27
28
29
async def init_db(self):
    """Creates the jobs table."""
    async with aiosqlite.connect(self.DB_NAME) as db:
        await db.execute("""
            CREATE TABLE IF NOT EXISTS jobs (
                job_id TEXT PRIMARY KEY,
                status TEXT, -- PENDING, PROCESSING, COMPLETED, FAILED
                input_data TEXT,
                result TEXT,
                created_at REAL
            )
        """)
        await db.commit()

push(data) async

Adds a job to the queue.

Source code in antilogix\services\queue.py
31
32
33
34
35
36
37
38
39
40
async def push(self, data: Dict[str, Any]) -> str:
    """Adds a job to the queue."""
    job_id = str(uuid.uuid4())
    async with aiosqlite.connect(self.DB_NAME) as db:
        await db.execute(
            "INSERT INTO jobs (job_id, status, input_data, created_at) VALUES (?, ?, ?, CURRENT_TIMESTAMP)",
            (job_id, "PENDING", json.dumps(data))
        )
        await db.commit()
    return job_id