I am interested in running large numbers of tasks in parallel, so I need something like asyncio.as_completed, but taking an iterable instead of a list, and with a limited number of tasks running concurrently. First, let’s try to build something pretty much equivalent to asyncio.as_completed. Here is my attempt, but I’d welcome feedback from readers who know better:
# Note this is not a coroutine - it returns # an iterator - but it crucially depends on # work being done inside the coroutines it # yields - those coroutines empty out the # list of futures it holds, and it will not # end until that list is empty. def my_as_completed(coros): # Start all the tasks futures = [asyncio.ensure_future(c) for c in coros] # A coroutine that waits for one of the # futures to finish and then returns # its result. async def first_to_finish(): # Wait forever - we could add a # timeout here instead. while True: # Give up control to the scheduler # - otherwise we will spin here # forever! await asyncio.sleep(0) # Return anything that has finished for f in futures: if f.done(): futures.remove(f) return f.result() # Keep yielding a waiting coroutine # until all the futures have finished. while len(futures) > 0: yield first_to_finish()
The above can be substituted for asyncio.as_completed in the code that uses it in the first article, and it seems to work. It also makes a reasonable amount of sense to me, so it may be correct, but I’d welcome comments and corrections.
my_as_completed above accepts an iterable and returns a generator producing results, but inside it starts all tasks concurrently, and stores all the futures in a list. To handle bigger lists we will need to do better, by limiting the number of running tasks to a sensible number.
Let’s start with a test program:
import asyncio async def mycoro(number): print("Starting %d" % number) await asyncio.sleep(1.0 / number) print("Finishing %d" % number) return str(number) async def print_when_done(tasks): for res in asyncio.as_completed(tasks): print("Result %s" % await res) coros = [mycoro(i) for i in range(1, 101)] loop = asyncio.get_event_loop() loop.run_until_complete(print_when_done(coros)) loop.close()
This uses asyncio.as_completed to run 100 tasks and, because I adjusted the asyncio.sleep command to wait longer for earlier tasks, it prints something like this:
$ time python3 python-async.py Starting 47 Starting 93 Starting 48 ... Finishing 93 Finishing 94 Finishing 95 ... Result 93 Result 94 Result 95 ... Finishing 46 Finishing 45 Finishing 42 ... Finishing 2 Result 2 Finishing 1 Result 1 real 0m1.590s user 0m0.600s sys 0m0.072s
So all 100 tasks were completed in 1.5 seconds, indicating that they really were run in parallel, but all 100 were allowed to run at the same time, with no limit.
We can adjust the test program to run using our customised my_as_completed function, and pass in an iterable of coroutines instead of a list by changing the last part of the program to look like this:
async def print_when_done(tasks): for res in my_as_completed(tasks): print("Result %s" % await res) coros = (mycoro(i) for i in range(1, 101)) loop = asyncio.get_event_loop() loop.run_until_complete(print_when_done(coros)) loop.close()
But we get similar output to last time, with all tasks running concurrently.
To limit the number of concurrent tasks, we limit the size of the futures list, and add more as needed:
from itertools import islice def limited_as_completed(coros, limit): futures = [ asyncio.ensure_future(c) for c in islice(coros, 0, limit) ] async def first_to_finish(): while True: await asyncio.sleep(0) for f in futures: if f.done(): futures.remove(f) try: newf = next(coros) futures.append( asyncio.ensure_future(newf)) except StopIteration as e: pass return f.result() while len(futures) > 0: yield first_to_finish()
We start limit tasks at first, and whenever one ends, we ask for the next coroutine in coros and set it running. This keeps the number of running tasks at or below limit until we start running out of input coroutines (when next throws and we don’t add anything to futures), then futures starts emptying until we eventually stop yielding coroutine objects.
I thought this function might be useful to others, so I started a little repo over here and added it: asyncioplus/limited_as_completed.py. Please provide merge requests and log issues to improve it – maybe it should be part of standard Python?
When we run the same example program, but call limited_as_completed instead of the other versions:
async def print_when_done(tasks): for res in limited_as_completed(tasks, 10): print("Result %s" % await res) coros = (mycoro(i) for i in range(1, 101)) loop = asyncio.get_event_loop() loop.run_until_complete(print_when_done(coros)) loop.close()
We see output like this:
$ time python3 python-async.py Starting 1 Starting 2 ... Starting 9 Starting 10 Finishing 10 Result 10 Starting 11 ... Finishing 100 Result 100 Finishing 1 Result 1 real 0m1.535s user 0m1.436s sys 0m0.084s
So we can see that the tasks are still running concurrently, but this time the number of concurrent tasks is limited to 10.
It feels like limited_as_completed is more re-usable as an approach but I’d love to hear others’ thoughts on this. E.g. could/should I use a semaphore to implement limited_as_completed instead of manually holding a queue?