Skip to content

Simqueue

FIFO and priority queues.

Queue

FIFO or priority queue.

Source code in src/asimpy/simqueue.py
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
class Queue:
    """FIFO or priority queue."""

    def __init__(
        self,
        env: "Environment",
        max_capacity: int | None = None,
        priority: bool = False,
    ):
        """
        Construct queue.

        Args:
            env: simulation environment.
            max_capacity: maximum queue capacity (None for unlimited).
            priority: if `True`, maintain items in sorted order.

        Raises:
            ValueError: for invalid `max_capacity`.
        """
        if max_capacity is not None and max_capacity <= 0:
            raise ValueError(
                f"queue max_capacity must be a positive integer, got {max_capacity}"
            )
        self._env = env
        self._priority = priority
        self._max_capacity = max_capacity
        self._items = []
        self._getters = []
        self._putters = []

    def _add(self, item):
        """Add item to internal list (sorted if priority, appended otherwise)."""
        if self._priority:
            bisect.insort(self._items, item)
        else:
            self._items.append(item)

    async def get(self):
        """Get one item from the queue."""
        if self._items:
            item = self._items.pop(0)
            if self._putters:
                putter_evt, putter_item = self._putters.pop(0)
                self._add(putter_item)
                self._env.immediate(lambda evt=putter_evt: evt.succeed(True))
            evt = Event(self._env)
            evt._on_cancel = lambda: self._items.insert(0, item)
            self._env.immediate(lambda: evt.succeed(item))
            try:
                return await evt
            except Interrupt:
                # Runner was cancelled before it received the item; restore it.
                self._items.insert(0, item)
                raise
        else:
            evt = Event(self._env)
            self._getters.append(evt)
            try:
                return await evt
            except Interrupt:
                # Runner was cancelled while waiting; remove orphan getter.
                if evt in self._getters:
                    self._getters.remove(evt)
                raise

    def is_empty(self):
        """Is the queue empty?"""
        return len(self._items) == 0

    def is_full(self):
        """Has the queue reached capacity?"""
        return self._max_capacity is not None and len(self._items) >= self._max_capacity

    async def put(self, item: Any) -> bool:
        """
        Add one item to the queue.

        If a getter is waiting, the item is delivered directly.
        Otherwise, if the queue is not full, the item is added.
        If the queue is full, the operation blocks until space
        is available.

        Args:
            item: to add to the queue.

        Returns:
            `True` when the item has been added.
        """
        if self._getters:
            evt = self._getters.pop(0)
            evt.succeed(item)
            return True

        if not self.is_full():
            self._add(item)
            return True

        evt = Event(self._env)
        entry = (evt, item)
        self._putters.append(entry)

        def cancel():
            if entry in self._putters:
                self._putters.remove(entry)

        evt._on_cancel = cancel
        return await evt

__init__(env, max_capacity=None, priority=False)

Construct queue.

Parameters:

Name Type Description Default
env Environment

simulation environment.

required
max_capacity int | None

maximum queue capacity (None for unlimited).

None
priority bool

if True, maintain items in sorted order.

False

Raises:

Type Description
ValueError

for invalid max_capacity.

Source code in src/asimpy/simqueue.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
def __init__(
    self,
    env: "Environment",
    max_capacity: int | None = None,
    priority: bool = False,
):
    """
    Construct queue.

    Args:
        env: simulation environment.
        max_capacity: maximum queue capacity (None for unlimited).
        priority: if `True`, maintain items in sorted order.

    Raises:
        ValueError: for invalid `max_capacity`.
    """
    if max_capacity is not None and max_capacity <= 0:
        raise ValueError(
            f"queue max_capacity must be a positive integer, got {max_capacity}"
        )
    self._env = env
    self._priority = priority
    self._max_capacity = max_capacity
    self._items = []
    self._getters = []
    self._putters = []

get() async

Get one item from the queue.

Source code in src/asimpy/simqueue.py
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
async def get(self):
    """Get one item from the queue."""
    if self._items:
        item = self._items.pop(0)
        if self._putters:
            putter_evt, putter_item = self._putters.pop(0)
            self._add(putter_item)
            self._env.immediate(lambda evt=putter_evt: evt.succeed(True))
        evt = Event(self._env)
        evt._on_cancel = lambda: self._items.insert(0, item)
        self._env.immediate(lambda: evt.succeed(item))
        try:
            return await evt
        except Interrupt:
            # Runner was cancelled before it received the item; restore it.
            self._items.insert(0, item)
            raise
    else:
        evt = Event(self._env)
        self._getters.append(evt)
        try:
            return await evt
        except Interrupt:
            # Runner was cancelled while waiting; remove orphan getter.
            if evt in self._getters:
                self._getters.remove(evt)
            raise

is_empty()

Is the queue empty?

Source code in src/asimpy/simqueue.py
78
79
80
def is_empty(self):
    """Is the queue empty?"""
    return len(self._items) == 0

is_full()

Has the queue reached capacity?

Source code in src/asimpy/simqueue.py
82
83
84
def is_full(self):
    """Has the queue reached capacity?"""
    return self._max_capacity is not None and len(self._items) >= self._max_capacity

put(item) async

Add one item to the queue.

If a getter is waiting, the item is delivered directly. Otherwise, if the queue is not full, the item is added. If the queue is full, the operation blocks until space is available.

Parameters:

Name Type Description Default
item Any

to add to the queue.

required

Returns:

Type Description
bool

True when the item has been added.

Source code in src/asimpy/simqueue.py
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
async def put(self, item: Any) -> bool:
    """
    Add one item to the queue.

    If a getter is waiting, the item is delivered directly.
    Otherwise, if the queue is not full, the item is added.
    If the queue is full, the operation blocks until space
    is available.

    Args:
        item: to add to the queue.

    Returns:
        `True` when the item has been added.
    """
    if self._getters:
        evt = self._getters.pop(0)
        evt.succeed(item)
        return True

    if not self.is_full():
        self._add(item)
        return True

    evt = Event(self._env)
    entry = (evt, item)
    self._putters.append(entry)

    def cancel():
        if entry in self._putters:
            self._putters.remove(entry)

    evt._on_cancel = cancel
    return await evt