Series: asyncio basics, large numbers in parallel, parallel HTTP requests, adding to stdlib
I am interested in running large numbers of tasks in parallel, so I need something like asyncio.as_completed, but taking an iterable instead of a list, and with a limited number of tasks running concurrently. First, let’s try to build something pretty much equivalent to asyncio.as_completed. Here is my attempt, but I’d welcome feedback from readers who know better:
# Note this is not a coroutine - it returns # an iterator - but it crucially depends on # work being done inside the coroutines it # yields - those coroutines empty out the # list of futures it holds, and it will not # end until that list is empty. def my_as_completed(coros): # Start all the tasks futures = [asyncio.ensure_future(c) for c in coros] # A coroutine that waits for one of the # futures to finish and then returns # its result. async def first_to_finish(): # Wait forever - we could add a # timeout here instead. while True: # Give up control to the scheduler # - otherwise we will spin here # forever! await asyncio.sleep(0) # Return anything that has finished for f in futures: if f.done(): futures.remove(f) return f.result() # Keep yielding a waiting coroutine # until all the futures have finished. while len(futures) > 0: yield first_to_finish()
The above can be substituted for asyncio.as_completed in the code that uses it in the first article, and it seems to work. It also makes a reasonable amount of sense to me, so it may be correct, but I’d welcome comments and corrections.
my_as_completed above accepts an iterable and returns a generator producing results, but inside it starts all tasks concurrently, and stores all the futures in a list. To handle bigger lists we will need to do better, by limiting the number of running tasks to a sensible number.
Let’s start with a test program:
import asyncio async def mycoro(number): print("Starting %d" % number) await asyncio.sleep(1.0 / number) print("Finishing %d" % number) return str(number) async def print_when_done(tasks): for res in asyncio.as_completed(tasks): print("Result %s" % await res) coros = [mycoro(i) for i in range(1, 101)] loop = asyncio.get_event_loop() loop.run_until_complete(print_when_done(coros)) loop.close()
This uses asyncio.as_completed to run 100 tasks and, because I adjusted the asyncio.sleep command to wait longer for earlier tasks, it prints something like this:
$ time python3 python-async.py Starting 47 Starting 93 Starting 48 ... Finishing 93 Finishing 94 Finishing 95 ... Result 93 Result 94 Result 95 ... Finishing 46 Finishing 45 Finishing 42 ... Finishing 2 Result 2 Finishing 1 Result 1 real 0m1.590s user 0m0.600s sys 0m0.072s
So all 100 tasks were completed in 1.5 seconds, indicating that they really were run in parallel, but all 100 were allowed to run at the same time, with no limit.
We can adjust the test program to run using our customised my_as_completed function, and pass in an iterable of coroutines instead of a list by changing the last part of the program to look like this:
async def print_when_done(tasks): for res in my_as_completed(tasks): print("Result %s" % await res) coros = (mycoro(i) for i in range(1, 101)) loop = asyncio.get_event_loop() loop.run_until_complete(print_when_done(coros)) loop.close()
But we get similar output to last time, with all tasks running concurrently.
To limit the number of concurrent tasks, we limit the size of the futures list, and add more as needed:
from itertools import islice def limited_as_completed(coros, limit): futures = [ asyncio.ensure_future(c) for c in islice(coros, 0, limit) ] async def first_to_finish(): while True: await asyncio.sleep(0) for f in futures: if f.done(): futures.remove(f) try: newf = next(coros) futures.append( asyncio.ensure_future(newf)) except StopIteration as e: pass return f.result() while len(futures) > 0: yield first_to_finish()
We start limit tasks at first, and whenever one ends, we ask for the next coroutine in coros and set it running. This keeps the number of running tasks at or below limit until we start running out of input coroutines (when next throws and we don’t add anything to futures), then futures starts emptying until we eventually stop yielding coroutine objects.
I thought this function might be useful to others, so I started a little repo over here and added it: asyncioplus/limited_as_completed.py. Please provide merge requests and log issues to improve it – maybe it should be part of standard Python?
When we run the same example program, but call limited_as_completed instead of the other versions:
async def print_when_done(tasks):
for res in limited_as_completed(tasks, 10):
print("Result %s" % await res)
coros = (mycoro(i) for i in range(1, 101))
loop = asyncio.get_event_loop()
loop.run_until_complete(print_when_done(coros))
loop.close()
We see output like this:
$ time python3 python-async.py
Starting 1
Starting 2
...
Starting 9
Starting 10
Finishing 10
Result 10
Starting 11
...
Finishing 100
Result 100
Finishing 1
Result 1
real 0m1.535s
user 0m1.436s
sys 0m0.084s
So we can see that the tasks are still running concurrently, but this time the number of concurrent tasks is limited to 10.
See also
To achieve a similar result using semaphores, see Python asyncio.semaphore in async-await function and Making 1 million requests with python-aiohttp.
It feels like limited_as_completed is more re-usable as an approach but I’d love to hear others’ thoughts on this. E.g. could/should I use a semaphore to implement limited_as_completed instead of manually holding a queue?
This is golden, thanks for publishing it. Would love to see the `limit` kwarg added to `as_completed` in asyncio.
Thanks David – I guess it might be worth commenting on https://bugs.python.org/issue30782 ?
I’ve build some kind of object realization of it. Simply to inject everywhere in my projects, much tnx.
If you’re interested, I’ll create a pull asap.
I`m developing a web crawler framework with asyncio and I met memory high usage problem in test,so thx for you blog.
I notice you used loop to polling futures completion, why not use callback function as same as official “as_completed” method?
Ok, I’ve kinda lost my way and reimplement this for futures:
https://gist.github.com/trahibidadido/3790adf525fae08dee54555978b3c956
Hi Trahibidadido, I’m not sure I understand. As far as I can see a function is the nicest way to provide this – maybe you can explain a bit more?
Hi Bruce, my understanding of the official as_completed is that it is intended to be called in a loop – am I mistaken?
Hi Trahibidadido, I think the version from my blog post will already work with futures, won’t it?
Yes. Your solution uses only one process, and executes coros with threads (threadPoolExecutor). If you try some cpu heavy tasks (in my case many regex text transforms) – it will be slow as hell.
I’ve tried to implement ProcessPoolExecutor and divide coros not by tasking %limit% coroutines and dealing with its futures, but tasking all coroutines at once and deal with all of the futures in processPool. This gives me some extra performance.
But my way, sadly, it doesnt work in oop ): https://stackoverflow.com/questions/47662040/cant-pickle-coroutine-objects-when-processpoolexecutor-is-used-in-class
Hi Andy, maybe my reply is not clear, sorry for confusing you。
Let`s look into your blog code:
“`
…
async def first_to_finish():
while True: # this is the loop I want to say
await asyncio.sleep(0)
for f in futures:
if f.done():
futures.remove(f)
…
“`
The code use a loop to capture futures`s completion, and I see the official “as_completed†method use futures`s callback function.(https://github.com/python/cpython/blob/master/Lib/asyncio/tasks.py#L462)
Hi Bruce, I see that you mean _inside_ the as_completed function, sorry! Yes, the official version does it differently, so when I proposed to make this change in the standard library I did it that way: https://www.artificialworlds.net/blog/2017/06/27/adding-a-concurrency-limit-to-pythons-asyncio-as_completed/ – see the MR here: https://github.com/python/cpython/pull/2424/files . Personally, I prefer a for loop to callbacks, but that is a matter of taste I think.
This is fantastic. Is there any reason you wouldn’t use BoundedSemaphore for rate limiting?
https://docs.python.org/3/library/asyncio-sync.html#asyncio.BoundedSemaphore
I’ve used it to rate limit aiohttp requests and it’s super clean.
Thanks Ethan, yeah that looks really useful.