Skip to content

Simqueue

FIFO and priority queues.

PriorityQueue

Bases: Queue

Ordered queue.

Source code in src/asimpy/simqueue.py
68
69
70
71
72
73
74
75
class PriorityQueue(Queue):
    """Ordered queue."""

    def _get_item(self):
        return heapq.heappop(self._items)

    def _put_item(self, item):
        heapq.heappush(self._items, item)

Queue

FIFO queue.

Source code in src/asimpy/simqueue.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
class Queue:
    """FIFO queue."""

    def __init__(self, env: "Environment", max_capacity: int | None = None):
        """
        Construct queue.

        Args:
            env: simulation environment.
            max_capacity: maximum queue capacity (None for unlimited).
        """
        if max_capacity is not None:
            assert max_capacity > 0, "max_capacity must be a positive integer"
        self._env = env
        self._max_capacity = max_capacity
        self._items = []
        self._getters = []

    async def get(self):
        """Get one item from the queue."""
        if self._items:
            item = self._get_item()
            evt = Event(self._env)
            evt._on_cancel = lambda: self._items.insert(0, item)
            self._env.immediate(lambda: evt.succeed(item))
            return await evt
        else:
            evt = Event(self._env)
            self._getters.append(evt)
            return await evt

    def is_full(self):
        """Has the queue reached capacity?"""
        return self._max_capacity is not None and len(self._items) >= self._max_capacity

    async def put(self, item: Any):
        """
        Add one item to the queue.

        Args:
            item: to add to the queue.
        """
        if self._getters:
            evt = self._getters.pop(0)
            evt.succeed(item)
        else:
            if self.is_full():
                return
            self._put_item(item)

    def _get_item(self):
        return self._items.pop(0)

    def _put_item(self, item):
        self._items.append(item)

__init__(env, max_capacity=None)

Construct queue.

Parameters:

Name Type Description Default
env Environment

simulation environment.

required
max_capacity int | None

maximum queue capacity (None for unlimited).

None
Source code in src/asimpy/simqueue.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
def __init__(self, env: "Environment", max_capacity: int | None = None):
    """
    Construct queue.

    Args:
        env: simulation environment.
        max_capacity: maximum queue capacity (None for unlimited).
    """
    if max_capacity is not None:
        assert max_capacity > 0, "max_capacity must be a positive integer"
    self._env = env
    self._max_capacity = max_capacity
    self._items = []
    self._getters = []

get() async

Get one item from the queue.

Source code in src/asimpy/simqueue.py
29
30
31
32
33
34
35
36
37
38
39
40
async def get(self):
    """Get one item from the queue."""
    if self._items:
        item = self._get_item()
        evt = Event(self._env)
        evt._on_cancel = lambda: self._items.insert(0, item)
        self._env.immediate(lambda: evt.succeed(item))
        return await evt
    else:
        evt = Event(self._env)
        self._getters.append(evt)
        return await evt

is_full()

Has the queue reached capacity?

Source code in src/asimpy/simqueue.py
42
43
44
def is_full(self):
    """Has the queue reached capacity?"""
    return self._max_capacity is not None and len(self._items) >= self._max_capacity

put(item) async

Add one item to the queue.

Parameters:

Name Type Description Default
item Any

to add to the queue.

required
Source code in src/asimpy/simqueue.py
46
47
48
49
50
51
52
53
54
55
56
57
58
59
async def put(self, item: Any):
    """
    Add one item to the queue.

    Args:
        item: to add to the queue.
    """
    if self._getters:
        evt = self._getters.pop(0)
        evt.succeed(item)
    else:
        if self.is_full():
            return
        self._put_item(item)