API¶
Executor Objects¶
-
class
asyncpool.ProcessAsyncPoolExecutor(*, pool_size: Optional[int] = 4, max_works_per_worker: Optional[int] = 300, load_balancer: Optional[asyncpool.load_balancers.LoadBalancer] = <class 'asyncpool.load_balancers.RoundRobin'>, awaitable: Optional[bool] = False, future_loop: asyncio.events.AbstractEventLoop = None, worker_loop_factory: Optional[asyncio.events.AbstractEventLoop] = None)[source]¶ Setups executor and adds self to executor track set.
Parameters: - pool_size – Number of workers, i.e., number of threads or processes.
- max_works_per_worker – The max number of works a worker can run at the same time. This does not limit the number of asyncio tasks of a worker.
- load_balancer – A subclass of asyncpool.LoadBalancer for submitted
item load balancing that has implemented abstract method
get_proper_worker. - awaitable – If it’s set to True, futures returned from
submitmethod will be awaitable, andmapwill return async generator(async iterator if python3.5). - future_loop –
Loop instance set in awaitable futures returned from
submitmethod.If specified,
awaitablemust be set to true.This loop can also be set in
set_future_loopmethod. - worker_loop_factory – A factory to generate loop instance for workers to run their job.
Raises: ValueError–future_loopis specified whileawaitableis False.-
map(work: Callable, *iterables, timeout: Optional[float] = None, chunksize: int = 1, load_balancing_meta: Optional[Any] = None) → Union[AsyncGenerator[T_co, T_contra], Generator[T_co, T_contra, V_co]]¶ map your work like the way in concurrent.futures.
The work submitted will be sent to the specific worker that the load balancer choose.
Note
The work you submit should be a callable, And a
coroutineis not a callable. You should submit acoroutine functionand specify its args and kwargs here instead.Parameters: - work – The callable that will be run in a worker.
- *iterables – Position arguments for work. All of them are iterable and have same length.
- timeout – The time limit for waiting results.
- chunksize – Works are gathered, partitioned as chunks in this size, and then sent to workers.
- load_balancing_meta – This will be passed to load balancer for the choice of proper worker.
Returns: A async generator yielding the map results if
awaitableis set to True, otherwise a generator. In python3.5, async iterator is used to replace async generator.If a exception is raised in a work, it will be re-raised in the generator, and the remaining works will be cancelled.
Raises: ValueError– If chunksize is less than 1.TypeError– If work is not a callable.
-
set_future_loop(loop: asyncio.events.AbstractEventLoop)¶ Sets loop for awaitable futures to await results.
This loop can also be set in initialization.
Parameters: loop – The Loop needed for awaitable futures.
Raises: RuntimeError– If executor has been shut down, or executor is set to be unawaitable.AsyncpoolWorkerError– If some workers are broken or raise BaseException.
-
shutdown(wait: bool = True)¶ Shuts down the executor and frees the resource.
Parameters: wait – Whether to block until shutdown is finished.
-
submit(work: Callable, *args, load_balancing_meta: Optional[Any] = None, **kwargs) → Union[asyncpool.futures.AsyncioFuture, asyncpool.futures.ConcurrentFuture]¶ submits your work like the way in concurrent.futures.
The work submitted will be sent to the specific worker that the load balancer choose.
Note
The work you submit should be a callable, And a
coroutineis not a callable. You should submit acoroutine functionand specify its args and kwargs here instead.Parameters: - work – The callable that will be run in a worker.
- *args – Position arguments for work.
- load_balancing_meta – This will be passed to load balancer for the choice of proper worker.
- **kwargs – Keyword arguments for work.
Returns: A future.
The future will be awaitable like that in asyncio if
awaitableis set to True in executor construction, otherwise, unawaitable like that in concurrent.futures.Raises: RuntimeError– If executor has been shut down.AsyncpoolWorkerError– If some workers are broken or raise BaseException.TypeError– If work is not a callable.
-
class
asyncpool.ThreadAsyncPoolExecutor(*, pool_size: Optional[int] = 4, max_works_per_worker: Optional[int] = 300, load_balancer: Optional[asyncpool.load_balancers.LoadBalancer] = <class 'asyncpool.load_balancers.RoundRobin'>, awaitable: Optional[bool] = False, future_loop: asyncio.events.AbstractEventLoop = None, worker_loop_factory: Optional[asyncio.events.AbstractEventLoop] = None)[source]¶ Setups executor and adds self to executor track set.
Parameters: - pool_size – Number of workers, i.e., number of threads or processes.
- max_works_per_worker – The max number of works a worker can run at the same time. This does not limit the number of asyncio tasks of a worker.
- load_balancer – A subclass of asyncpool.LoadBalancer for submitted
item load balancing that has implemented abstract method
get_proper_worker. - awaitable – If it’s set to True, futures returned from
submitmethod will be awaitable, andmapwill return async generator(async iterator if python3.5). - future_loop –
Loop instance set in awaitable futures returned from
submitmethod.If specified,
awaitablemust be set to true.This loop can also be set in
set_future_loopmethod. - worker_loop_factory – A factory to generate loop instance for workers to run their job.
Raises: ValueError–future_loopis specified whileawaitableis False.-
map(work: Callable, *iterables, timeout: Optional[float] = None, chunksize: int = 1, load_balancing_meta: Optional[Any] = None) → Union[AsyncGenerator[T_co, T_contra], Generator[T_co, T_contra, V_co]]¶ map your work like the way in concurrent.futures.
The work submitted will be sent to the specific worker that the load balancer choose.
Note
The work you submit should be a callable, And a
coroutineis not a callable. You should submit acoroutine functionand specify its args and kwargs here instead.Parameters: - work – The callable that will be run in a worker.
- *iterables – Position arguments for work. All of them are iterable and have same length.
- timeout – The time limit for waiting results.
- chunksize – Works are gathered, partitioned as chunks in this size, and then sent to workers.
- load_balancing_meta – This will be passed to load balancer for the choice of proper worker.
Returns: A async generator yielding the map results if
awaitableis set to True, otherwise a generator. In python3.5, async iterator is used to replace async generator.If a exception is raised in a work, it will be re-raised in the generator, and the remaining works will be cancelled.
Raises: ValueError– If chunksize is less than 1.TypeError– If work is not a callable.
-
set_future_loop(loop: asyncio.events.AbstractEventLoop)¶ Sets loop for awaitable futures to await results.
This loop can also be set in initialization.
Parameters: loop – The Loop needed for awaitable futures.
Raises: RuntimeError– If executor has been shut down, or executor is set to be unawaitable.AsyncpoolWorkerError– If some workers are broken or raise BaseException.
-
shutdown(wait: bool = True)¶ Shuts down the executor and frees the resource.
Parameters: wait – Whether to block until shutdown is finished.
-
submit(work: Callable, *args, load_balancing_meta: Optional[Any] = None, **kwargs) → Union[asyncpool.futures.AsyncioFuture, asyncpool.futures.ConcurrentFuture]¶ submits your work like the way in concurrent.futures.
The work submitted will be sent to the specific worker that the load balancer choose.
Note
The work you submit should be a callable, And a
coroutineis not a callable. You should submit acoroutine functionand specify its args and kwargs here instead.Parameters: - work – The callable that will be run in a worker.
- *args – Position arguments for work.
- load_balancing_meta – This will be passed to load balancer for the choice of proper worker.
- **kwargs – Keyword arguments for work.
Returns: A future.
The future will be awaitable like that in asyncio if
awaitableis set to True in executor construction, otherwise, unawaitable like that in concurrent.futures.Raises: RuntimeError– If executor has been shut down.AsyncpoolWorkerError– If some workers are broken or raise BaseException.TypeError– If work is not a callable.
Future Objects¶
-
class
asyncpool.futures.ConcurrentFuture(cancel_interface)[source]¶ A concurrent.futures.Future subclass that cancels like asyncio.Task.
Load Balancer Objects¶
-
class
asyncpool.load_balancers.RoundRobin(*args, **kwargs)[source]¶ A load balancer based on round-robin algorithm.
-
get_available_workers() → Iterator[Worker]¶ Returns the workers that does not reach the
max_works_per_workerlimit.Returns: A iterator of the available workers.
-
get_proper_worker(load_balancing_meta: Optional[Any]) → Worker[source]¶ Returns the next available worker.
Parameters: load_balancing_meta – An optional argument specified in submitandmapmethods that users may need for choosing a proper worker.Returns: A worker that is available for work assignment.
-
is_available(worker: Worker) → bool¶ Returns if the given worker reaches the
max_works_per_workerlimit.Parameters: worker – A worker object. Returns: True if available, else False.
-
max_works_per_worker¶ Returns tha max number of works a worker can run at the same time.
-
workers¶ Returns worker list.
-
workloads¶ Returns worker workload mapping.
-
-
class
asyncpool.load_balancers.Random(workers: List[Worker], workloads: Dict[Worker, int], max_works_per_worker: int)[source]¶ A load balancer that chooses proper worker randomly.
Initialization.
Note: Must call
super().__init__(*args, **kwargs)in the beginning of the__init__block if you are trying to overwrite this.Parameters: - workers – A argument for
workersproperty. - workloads – A argument for
workloadsproperty. - max_works_per_worker – A argument for
max_works_per_workerproperty.
-
get_available_workers() → Iterator[Worker]¶ Returns the workers that does not reach the
max_works_per_workerlimit.Returns: A iterator of the available workers.
-
get_proper_worker(load_balancing_meta: Optional[Any]) → Worker[source]¶ Randomly picks an avaiable worker.
Parameters: load_balancing_meta – An optional argument specified in submitandmapmethods that users may need for choosing a proper worker.Returns: A worker that is available for work assignment.
-
is_available(worker: Worker) → bool¶ Returns if the given worker reaches the
max_works_per_workerlimit.Parameters: worker – A worker object. Returns: True if available, else False.
-
max_works_per_worker¶ Returns tha max number of works a worker can run at the same time.
-
workers¶ Returns worker list.
-
workloads¶ Returns worker workload mapping.
- workers – A argument for
-
class
asyncpool.load_balancers.Average(workers: List[Worker], workloads: Dict[Worker, int], max_works_per_worker: int)[source]¶ A load balancer that tries to equalize the workloads of all the workers.
To put it otherwise, it assign work to the worker having minimun workload.
Initialization.
Note: Must call
super().__init__(*args, **kwargs)in the beginning of the__init__block if you are trying to overwrite this.Parameters: - workers – A argument for
workersproperty. - workloads – A argument for
workloadsproperty. - max_works_per_worker – A argument for
max_works_per_workerproperty.
-
get_available_workers() → Iterator[Worker]¶ Returns the workers that does not reach the
max_works_per_workerlimit.Returns: A iterator of the available workers.
-
get_proper_worker(load_balancing_meta: Optional[Any]) → Worker[source]¶ Returns the worker with minimum workload.
Parameters: load_balancing_meta – An optional argument specified in submitandmapmethods that users may need for choosing a proper worker.Returns: A worker that is available for work assignment.
-
is_available(worker: Worker) → bool¶ Returns if the given worker reaches the
max_works_per_workerlimit.Parameters: worker – A worker object. Returns: True if available, else False.
-
max_works_per_worker¶ Returns tha max number of works a worker can run at the same time.
-
workers¶ Returns worker list.
-
workloads¶ Returns worker workload mapping.
- workers – A argument for
-
class
asyncpool.load_balancers.LoadBalancer(workers: List[Worker], workloads: Dict[Worker, int], max_works_per_worker: int)[source]¶ The base class of all load balancers.
Users can inherit this to write their own load balancers.
Initialization.
Note: Must call
super().__init__(*args, **kwargs)in the beginning of the__init__block if you are trying to overwrite this.Parameters: - workers – A argument for
workersproperty. - workloads – A argument for
workloadsproperty. - max_works_per_worker – A argument for
max_works_per_workerproperty.
-
get_available_workers() → Iterator[Worker][source]¶ Returns the workers that does not reach the
max_works_per_workerlimit.Returns: A iterator of the available workers.
-
get_proper_worker(load_balancing_meta: Optional[Any]) → Worker[source]¶ The method to be implemented by users. Returns an available worker.
Note
There is always at least an available worker when this method is called.
Parameters: load_balancing_meta – An optional argument specified in submitandmapmethods that users may need for choosing a proper worker.Returns: A worker that is available for work assignment.
-
is_available(worker: Worker) → bool[source]¶ Returns if the given worker reaches the
max_works_per_workerlimit.Parameters: worker – A worker object. Returns: True if available, else False.
-
max_works_per_worker¶ Returns tha max number of works a worker can run at the same time.
-
workers¶ Returns worker list.
-
workloads¶ Returns worker workload mapping.
- workers – A argument for
Module contents¶
Run your coroutines and functions in child processes or threads like the way using concurrnet.futures.
Users can prosess works concurrently and in parallel.
Example
Below is a example sending http request using asyncpool with the helps of aiohttp and uvloop:
import aiohttp
import uvloop
from asyncpool import ProcessAsyncPoolExecutor
async def demo(url):
async with aiohttp.request('GET', url) as response:
return response.status
if __name__ == '__main__':
pool = ProcessAsyncPoolExecutor(pool_size=8,
worker_loop_factory=uvloop.Loop)
future = pool.submit(demo, 'http://httpbin.org')
print('Status: %d.' % future.result())
The result will be:
Status: 200
Note: If you are running python on windows,
if __name__ == '__main__': is necessary. That’s the design of
multiprocessing.