Concurrency
One of the benefits of asynchronous programming is that it allows concurrency. In Python, multiple tasks can start, run and complete in overlapping time periods.
Pyot provides a production rated queue manager to achieve high magnitude of concurrency in a stable manner.
Queue
Module: pyot.core.queue
Worker queue for scheduling coroutines concurrently.
This object is only accessible as a context manager with the async with
syntax for safeguarding session closing and workers joining.
class Queue
Queue
Definitions:
__init__
->None
workers
:int = 25
Maximum number of workers to spawn for the queue. Increasing the number of workers may increase or decrease performance. Defaults to 25.
maxsize
:int = None
Max size of the queue. Defaults to
workers * 2
.log_level
:int = 0
Log level for the que (does not affect pipeline logs). Defaults to 0 (NOLOG level).
exception_handler
:Callable[[Exception], Any] = LOGGER.warning
Handler for raised exceptions in workers, defaults to logging a message with level 30 (WARNING).
Attributes:
queue
:asyncio.Queue
workers_num
:int
maxsize
:int
responses
:Dict
counter
:int
workers
:List
exception_handler
:Callable[[Exception], Any]
Methods:
async
put
->None
coro
:Coroutine
Coroutine to put on the queue.
delay
:float
Amount of delay in seconds before putting the coroutine into the queue. Defaults to 0 (No delay).
Put a coroutine object to the queue. If the queue is full, wait for availability. A delay may be provided if desired for load balancing.
async
join
->List[T]
class_of_t
:Optional[Type[T]]
Optional, Generic type for typing the return content of this method (e.g
await queue.join(int)
will return a list typed asList[int]
).
Block until all items in the queue have been received and executed. Clears the previously collected items if exists. NoneType and Exceptions are not collected, order of items is maintained but not guaranteed.
You can use the same queue to join()
as many time as you want, it will clear previous collected responses, this creates a nice way to do everything in a single Queue. Method join()
will be automatically called before exiting the async with
block, it is not needed to call explicitly unless the content of it is needed.
Example
Try not to return anything or return small objects in coroutines passed to the queue, because Queue
will save those return values for the join()
, meaning that memory can start to increase over time. Design async functions to consume data instead.
Objects instantiated outside of async functions holds reference in an upper scope, these objects can become a source of memory leaks if the data it holds (or will hold after mutations executed by coroutines) is big enough. To counter this:
Instantiate objects inside async functions (e.g. Pass the id and region of a match and instantiate inside the function instead of passing a
lol.Match
object), so it can be garbage collected when it goes out of scope and nothing else holds reference to it.If the objects are inside an iterable and is planned to be mutated and filled with more data (e.g. calling
.get()
on PyotCore objects), freeze the iterable with an utility containerpyot.utils.itertools.FrozenGenerator
, it creates exact copies of the objects when iterated, therefore dropping the outer scope reference, the original object will be left intact.
Assuming the need to collect 30k matches, this will lead to a high use of memory:
Instead, consume the matches directly instead of collecting them. The list is frozen to prevent memory usage since the child function calls .get()
on the match.
Last updated