mode.utils.queues¶
Queue utilities - variations of asyncio.Queue.
- class mode.utils.queues.FlowControlEvent(*, initially_suspended: bool = True, loop: Optional[AbstractEventLoop] = None)¶
Manage flow control
FlowControlQueueinstances.The FlowControlEvent manages flow in one or many queue instances at the same time.
To flow control queues, first create the shared event:
>>> flow_control = FlowControlEvent()
Then pass that shared event to the queues that should be managed by it:
>>> q1 = FlowControlQueue(maxsize=1, flow_control=flow_control) >>> q2 = FlowControlQueue(flow_control=flow_control)
If you want the contents of the queue to be cleared when flow is resumed, then specify that by using the
clear_on_resumeflag:>>> q3 = FlowControlQueue(clear_on_resume=True, ... flow_control=flow_control)
To suspend production into queues, use
flow_control.suspend:>>> flow_control.suspend()
While the queues are suspend, any producer attempting to send something to the queue will hang until flow is resumed.
To resume production into queues, use
flow_control.resume:>>> flow_control.resume()
Notes
In Faust queues are managed by the
app.flow_controlevent.- async acquire() None¶
Wait until flow control is resumed.
- clear() None¶
- is_active() bool¶
- manage_queue(queue: FlowControlQueue) None¶
Add
FlowControlQueueto be cleared on resume.
- resume() None¶
Resume production into queues managed by this event.
- suspend() None¶
Suspend production into queues managed by this event.
- class mode.utils.queues.FlowControlQueue(maxsize: int = 0, *, flow_control: FlowControlEvent, clear_on_resume: bool = False, **kwargs: Any)¶
asyncio.Queuemanaged byFlowControlEvent.See also
- clear() None¶
- force_put_nowait(item: _T) None¶
- async get() _T¶
Remove and return an item from the queue.
If queue is empty, wait until an item is available.
- get_nowait() _T¶
Remove and return an item from the queue.
Return an item if one is immediately available, else raise QueueEmpty.
- in_pressure_high_state(callback: Callable) bool¶
- maybe_endorse_pressure_drop() None¶
- on_pressure_drop() None¶
- on_pressure_high() None¶
- pressure_drop_ratio = 0.4¶
- property pressure_drop_size: int¶
- pressure_high_ratio = 1.25¶
- property pressure_high_size: int¶
- async put(value: _T) None¶
Put an item into the queue.
Put an item into the queue. If the queue is full, wait until a free slot is available before adding item.
- put_nowait_enhanced(value: _T, *, on_pressure_high: Callable, on_pressure_drop: Callable) bool¶
- class mode.utils.queues.ThrowableQueue(*args: Any, **kwargs: Any)¶
Queue that can be notified of errors.
- clear() None¶
- empty() bool¶
Return True if the queue is empty, False otherwise.
- async get() _T¶
Remove and return an item from the queue.
If queue is empty, wait until an item is available.
- get_nowait() _T¶
Remove and return an item from the queue.
Return an item if one is immediately available, else raise QueueEmpty.
- async throw(exc: BaseException) None¶