Performance of Java 2D drawing operations (part 2: images)

Series: operations, images, opacity

In my previous post I examined the performance of various drawing operations in Java 2D rendering. Here I look at some specifics around rendering images, with an eye to finding optimisations I can apply to my game Rabbit Escape.

You can find the code here: gitlab.com/andybalaam/java-2d-performance.

Results

  • Drawing images with transparent sections is very slow
  • Drawing one large image is slower than drawing many small images covering the same area(!)
  • Drawing images outside the screen is slower than not drawing them at all (but faster than drawing them onto a visible area)

Advice

  • Avoid transparent images where possible
  • Don’t bother pre-rendering your background tiles onto a single image
  • Don’t draw images that are off-screen

Images with transparency

All the images I used were PNG files with a transparency layer, but in most of my experiments there were no transparent pixels. When I used images with transparent pixels the frame rate was much slower, dropping from 78 to 46 FPS. So using images with transparent pixels causes a significant performance hit.

I’d be grateful if someone who knows more about it can recommend how to improve my program to reduce this impact – I suspect there may be tricks I can do around setComposite or setRenderingHint or enabling/encouraging hardware acceleration.

Composite images

I assumed that drawing a single image would be much faster than covering the same area of the screen by drawing lots of small images. In fact, the result was the opposite: drawing lots of small images was much faster than drawing a single image covering the same area.

The code for a single image is:

g2d.drawImage(
    singleLargeImage,
    10,
    10,
    null
)

and for the small images it is:

for (y in 0 until 40)
{
    for (x in 0 until 60)
    {
        g2d.drawImage(
            compositeImages[(y*20 + x) % compositeImages.size],
            10 + (20 * x),
            10 + (20 * y),
            null
        )
    }
}

The single large image was rendered at 74 FPS, whereas covering the same area using repeated copies of 100 images was rendered at 80 FPS. I ran this test several times because I found the result surprising, and it was consistent every time.

I have to assume some caching (possibly via accelerated graphics) of the small images is the explanation.

Drawing images off the side of the screen

Drawing images off the side of the screen was faster than drawing them in a visible area, but slower than not drawing them at all. I tested this by adding 10,000 to the x and y positions of the images being drawn (I also tested subtracting 10,000 with similar results). Not drawing any images ran at 93 FPS, drawing images on-screen at 80 FPS, and drawing them off-screen only 83 FPS, meaning drawing images off the side takes significant time.

Advice: check whether images are on-screen, and avoid drawing them if not.

Numbers

Transparency

Test FPS
large nothing 95
large images20 largeimages 78
large images20 largeimages transparentimages 46

Composite images

(Lots of small images covering an area, or a single larger image.)

Test FPS
large nothing 87
large largesingleimage 74
large compositeimage 80

Offscreen images

Test FPS
large nothing 93
large images20 largeimages 80
large images20 largeimages offscreenimages 83

Feedback please

Please do get back to me with tips about how to improve the performance of my experimental code.

Feel free to log issues, make merge requests or add comments to the blog post.

Performance of Java 2D drawing operations (part 1: types of operation)

Series: operations, images, opacity

I want to remodel the desktop UI of my game Rabbit Escape to be more convenient and nicer looking, so I took a new look at game-loop-style graphics rendering onto a canvas in a Java 2D (Swing) UI.

For more on images, see the next post.

Specifically, how fast can it be, and what pitfalls should I avoid when I’m doing it?

Results

  • Larger windows are (much) slower
  • Resizing images on-the-fly is very slow, even if they are the same size every time
  • Drawing small images is fast, but drawing large images is slow
  • Drawing rectangles is fast
  • Drawing text is fast
  • Drawing Swing widgets in front of a canvas is fast
  • Creating fonts on-the-fly is a tiny bit slow

Code

You can find the full code (written in Kotlin) at gitlab.com/andybalaam/java-2d-performance.

Basically, we make a JFrame and a Canvas and tell them not to listen to repaints (i.e. we control their drawing).

val app = JFrame()
app.ignoreRepaint = true
val canvas = Canvas()
canvas.ignoreRepaint = true

Then we add any buttons to the JFrame, and the canvas last (so it displays behind):

app.add(button)
app.add(canvas)

Now we make the canvas double-buffered and get hold of a buffer image for it:

app.isVisible = true
canvas.createBufferStrategy(2)
val bufferStrategy = canvas.bufferStrategy
val bufferedImage = GraphicsEnvironment
    .getLocalGraphicsEnvironment()
    .defaultScreenDevice
    .defaultConfiguration
    .createCompatibleImage(config.width, config.height)

Then inside a tight loop we draw onto the buffer image:

val g2d = bufferedImage.createGraphics()
try
{
    g2d.color = backgroundColor
    g2d.fillRect(0, 0, config.width, config.height)

    ... the different drawing operations go here ...

and then swap the buffers:

    val graphics = bufferStrategy.drawGraphics
    try {
        graphics.drawImage(bufferedImage, 0, 0, null)
        if (!bufferStrategy.contentsLost()) {
            bufferStrategy.show()
        }
    } finally {
        graphics.dispose()
    }
} finally {
    g2d.dispose()
}

Results

Baseline: some rectangles

I decided to compare everything against drawing 20 rectangles at random points on the screen, since that seems like a minimal requirement for a game.

My test machine is an Intel Core 2 Duo E6550 2.33GHz with 6GB RAM and a GeForce GT 740 graphics card (I have no idea whether it is being used here – I assume not). I am running Ubuntu 18.04.1 Linux, OpenJDK Java 1.8.0_191, and Kotlin 1.3.20-release-116. (I expect the results would be identical if I were using Java rather than Kotlin.)

I ran all the tests in two window sizes: 1600×900 and 640×480. 640×480 was embarrassingly fast for all tests, but 1600×900 struggled with some of the tasks.

Drawing rectangles looks like this:

g2d.color = Color(
    rand.nextInt(256),
    rand.nextInt(256),
    rand.nextInt(256)
)
g2d.fillRect(
    rand.nextInt(config.width / 2),
    rand.nextInt(config.height / 2),
    rand.nextInt(config.width / 2),
    rand.nextInt(config.height / 2)
)

In the small window, the baseline (20 rectangles) ran at 553 FPS. In the large window it ran at 87 FPS.

I didn’t do any statistics on these numbers because I am too lazy. Feel free to do it properly and let me know the results – I will happily update the article.

Fewer rectangles

When I reduced the number of rectangles to do less drawing work, I saw small improvements in performance. In the small window, drawing 2 rectangles instead of 20 increased the frame rate from 553 to 639, but there is a lot of noise in those results, and other runs were much closer. In the large window, the same reduction improved the frame rate from 87 to 92. This is not a huge speed-up, showing that drawing rectangles is pretty fast.

Adding fixed-size images

Drawing pre-scaled images looks like this:

g2d.drawImage(
    image,
    rand.nextInt(config.width),
    rand.nextInt(config.height),
    null
)

When I added 20 small images (40×40 pixels) to be drawn in each frame, the performance was almost unchanged. In the small window, the run showing 20 images per frame (as well as rectangle) actually ran faster than the one without (561 FPS versus 553), suggesting the difference is negligible and I should do some statistics. In the large window, the 20 images version ran at exactly the same speed (87 FPS).

So, it looks like drawing small images costs almost nothing.

When I moved to large images (400×400 pixels), the small window slowed down from 553 to 446 FPS, and the large window slowed from 87 to 73 FPS, so larger images clearly have an impact, and we will need to limit the number and size of images to keep the frame rate acceptable.

Scaling images on the fly

You can scale an image on the fly as you draw onto a Canvas. (Spoiler: don’t do this!)

My code looks like:

val s = config.imageSize
val x1 = rand.nextInt(config.width)
val y1 = rand.nextInt(config.height)
val x2 = x1 + s
val y2 = y1 + s
g2d.drawImage(
    unscaledImage,
    x1, y1, x2, y2,
    0, 0, unscaledImageWidth, unscaledImageHeight,
    null
)

Note the 10-argument form of drawImage is being used. You can be sure you have avoided this situation if you use the 4-argument form from the previous section.

Note: the resulting image is the same size every time, and the Java documentation implies that scaled images may be cached by the system, but I saw a huge slow-down when using the 10-argument form of drawImage above.

On-the-fly scaled images slowed the small window from 446 to 67 FPS(!), and the large window from 73 to 31 FPS, meaning the exact same rendering took over twice as long.

Advice: check you are not using one of the drawImage overloads that scales images! Pre-scale them yourself (e.g. with getScaledInstance as I did here).

Displaying text

Drawing text on the canvas like this:

g2d.font = Font("Courier New", Font.PLAIN, 12)
g2d.color = Color.GREEN
g2d.drawString("FPS: $fpsLastSecond", 20, 20 + i * 14)

had a similar impact to drawing small images – i.e. it only affected the performance very slightly and is generally quite fast. The small window slowed from 553 to 581 FPS, and the large window from 87 to 88.

Creating the font every time (as shown above) slowed the process a little more, so it is worth moving the font creation out of the game loop and only doing it once. The slowdown just for creating the font was 581 to 572 FPS in the small window, and 88 to 86 FPS in the large.

Swing widgets

By adding Button widgets to the JFrame before the Canvas, I was able to display them in front. Their rendering and focus worked as expected, and they had no impact at all on performance.

The same was true when I tried adding these widgets in front of images rendered on the canvas (instead of rectangles).

Turning everything up to 11

When I added everything I had tested all at the same time: rectangles, text with a new font every time, large unscaled images, and large window, the frame rate reduced to 30 FPS. This is a little slow for a game already, and if we had more images to draw it could get even worse. However, when I pre-scaled the images the frame rate went up to 72 FPS, showing that Java is capable of running a game at an acceptable frame rate on my machine, so long as we are careful how we use it.

Numbers

Small window (640×480)

Test FPS
nothing 661
rectangles2 639
rectangles20 553
rectangles20 images2 538
rectangles20 images20 561
rectangles20 images20 largeimages 446
rectangles20 images20 unscaledimages 343
rectangles20 images20 largeimages unscaledimages 67
rectangles20 text2 582
rectangles20 text20 581
rectangles20 text20 newfont 572
rectangles20 buttons2 598
rectangles20 buttons20 612

Large window (1200×900)

Test FPS
large nothing 93
large rectangles2 92
large rectangles20 87
large rectangles20 images2 87
large rectangles20 images20 87
large rectangles20 images20 largeimages 73
large rectangles20 images20 unscaledimages 82
large rectangles20 images20 largeimages unscaledimages 31
large rectangles20 text2 89
large rectangles20 text20 88
large rectangles20 text20 newfont 86
large rectangles20 buttons2 88
large rectangles20 buttons20 87
large images20 buttons20 largeimages 74
large rectangles20 images20 text20 buttons20 largeimages newfont 72
large rectangles20 images20 text20 buttons20 largeimages unscaledimages newfont 30

Feedback please

Please do get back to me with tips about how to improve the performance of my experimental code.

Feel free to log issues, make merge requests or add comments to the blog post.

Adding a concurrency limit to Python’s asyncio.as_completed

Series: asyncio basics, large numbers in parallel, parallel HTTP requests, adding to stdlib

In the previous post I demonstrated how the limited_as_completed method allows us to run a very large number of tasks using concurrency, but limiting the number of concurrent tasks to a sensible limit to ensure we don’t exhaust resources like memory or operating system file handles.

I think this could be a useful addition to the Python standard library, so I have been working on a modification to the current asyncio.as_completed method. My work so far is here: limited-as_completed.

I ran similar tests to the ones I ran for the last blog post with this code to validate that the modified standard library version achieves the same goals as before.

I used an identical copy of timed from the previous post and updated versions of the other files because I was using a much newer version of aiohttp along with the custom-built python I was running.

server looked like:

#!/usr/bin/env python3

from aiohttp import web
import asyncio
import random

async def handle(request):
    await asyncio.sleep(random.randint(0, 3))
    return web.Response(text="Hello, World!")

app = web.Application()
app.router.add_get('/{name}', handle)

web.run_app(app)

client-async-sem needed me to add a custom TCPConnector to avoid a new limit on the number of concurrent connections that was added to aiohttp in version 2.0. I also need to move the ClientSession usage inside a coroutine to avoid a warning:

#!/usr/bin/env python3

from aiohttp import ClientSession, TCPConnector
import asyncio
import sys

limit = 1000

async def fetch(url, session):
    async with session.get(url) as response:
        return await response.read()

async def bound_fetch(sem, url, session):
    # Getter function with semaphore.
    async with sem:
        await fetch(url, session)

async def run(r):
    with ClientSession(connector=TCPConnector(limit=limit)) as session:
        url = "http://localhost:8080/{}"
        tasks = []
        # create instance of Semaphore
        sem = asyncio.Semaphore(limit)
        for i in range(r):
            # pass Semaphore and session to every GET request
            task = asyncio.ensure_future(
                bound_fetch(sem, url.format(i), session))
            tasks.append(task)
        responses = asyncio.gather(*tasks)
        await responses

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.ensure_future(run(int(sys.argv[1]))))

My new code that uses my proposed extension to as_completed looked like:

#!/usr/bin/env python3

from aiohttp import ClientSession, TCPConnector
import asyncio
import sys

async def fetch(url, session):
    async with session.get(url) as response:
        return await response.read()

limit = 1000

async def print_when_done():
    with ClientSession(connector=TCPConnector(limit=limit)) as session:
        tasks = (fetch(url.format(i), session) for i in range(r))
        for res in asyncio.as_completed(tasks, limit=limit):
            await res

r = int(sys.argv[1])
url = "http://localhost:8080/{}"
loop = asyncio.get_event_loop()
loop.run_until_complete(print_when_done())
loop.close()

and with these, we get similar behaviour to the previous post:

$ ./timed ./client-async-sem 10000
Memory usage: 73640KB	Time: 19.18 seconds
$ ./timed ./client-async-stdlib 10000
Memory usage: 49332KB	Time: 18.97 seconds

So the implementation I plan to submit to the Python standard library appears to work well. In fact, I think it is better than the one I presented in the previous post, because it uses on_complete callbacks to notice when futures have completed, which reduces the busy-looping we were doing to check for and yield finished tasks.

The Python issue is bpo-30782 and the pull request is #2424.

Note: at first glance, it looks like the aiohttp.ClientSession‘s limit on the number of connections (introduced in version 1.0 and then updated in version 2.0) gives us what we want without any of this extra code, but in fact it only limits the number of connections, not the number of futures we are creating, so it has the same problem of unbounded memory use as the semaphore-based implementation.

Making 100 million requests with Python aiohttp

Series: asyncio basics, large numbers in parallel, parallel HTTP requests, adding to stdlib

Update: slides of a talk I gave at the London Python Meetup on this: Talk slides: Making 100 million HTTP requests with Python aiohttp.

Update: see how Cristian Garcia improved on this code here: Making an Unlimited Number of Requests with Python aiohttp + pypeln.

I’ve been working on how to make a very large number of HTTP requests using Python’s asyncio and aiohttp.

PaweÅ‚ Miech’s post Making 1 million requests with python-aiohttp taught me how to think about this, and got us a long way, with 1 million requests running in a reasonable time, but I need to go further.

PaweÅ‚’s approach limits the number of requests that are in progress, but it uses an unbounded amount of memory to hold the futures that it wants to execute.

See also: 2 excellent related posts by Quentin Pradet: How do you rate limit calls with asyncio?, How do you limit memory usage with asyncio?.

We can avoid using unbounded memory by using the limited_as_completed function I outined in my previous post.

Setup

Server

We have a server program “server”:

(Note it differs from PaweÅ‚’s version because I am using an older version of aiohttp which has fewer convenient features.)

#!/usr/bin/env python3.5

from aiohttp import web
import asyncio
import random

async def handle(request):
    await asyncio.sleep(random.randint(0, 3))
    return web.Response(text="Hello, World!")

async def init():
    app = web.Application()
    app.router.add_route('GET', '/{name}', handle)
    return await loop.create_server(
        app.make_handler(), '127.0.0.1', 8080)

loop = asyncio.get_event_loop()
loop.run_until_complete(init())
loop.run_forever()

This just responds “Hello, World!” to every request it receives, but after an artificial delay of 0-3 seconds.

Synchronous client

As a baseline, we have a synchronous client “client-sync”:

#!/usr/bin/env python3.5

import requests
import sys

url = "http://localhost:8080/{}"
for i in range(int(sys.argv[1])):
    requests.get(url.format(i)).text

This waits for each request to complete before making the next one. Like the other clients below, it takes the number of requests to make as a command-line argument.

Async client using semaphores

Copied mostly verbatim from Making 1 million requests with python-aiohttp we have an async client “client-async-sem” that uses a semaphore to restrict the number of requests that are in progress at any time to 1000:

#!/usr/bin/env python3.5

from aiohttp import ClientSession
import asyncio
import sys

limit = 1000

async def fetch(url, session):
    async with session.get(url) as response:
        return await response.read()

async def bound_fetch(sem, url, session):
    # Getter function with semaphore.
    async with sem:
        await fetch(url, session)

async def run(session, r):
    url = "http://localhost:8080/{}"
    tasks = []
    # create instance of Semaphore
    sem = asyncio.Semaphore(limit)
    for i in range(r):
        # pass Semaphore and session to every GET request
        task = asyncio.ensure_future(bound_fetch(sem, url.format(i), session))
        tasks.append(task)
    responses = asyncio.gather(*tasks)
    await responses

loop = asyncio.get_event_loop()
with ClientSession() as session:
    loop.run_until_complete(asyncio.ensure_future(run(session, int(sys.argv[1]))))

Async client using limited_as_completed

The new client I am presenting here uses limited_as_completed from the previous post. This means it can make a generator that provides the futures to wait for as they are needed, instead of making them all at the beginning.

It is called “client-async-as-completed”:

#!/usr/bin/env python3.5

from aiohttp import ClientSession
import asyncio
from itertools import islice
import sys

def limited_as_completed(coros, limit):
    futures = [
        asyncio.ensure_future(c)
        for c in islice(coros, 0, limit)
    ]
    async def first_to_finish():
        while True:
            await asyncio.sleep(0)
            for f in futures:
                if f.done():
                    futures.remove(f)
                    try:
                        newf = next(coros)
                        futures.append(
                            asyncio.ensure_future(newf))
                    except StopIteration as e:
                        pass
                    return f.result()
    while len(futures) > 0:
        yield first_to_finish()

async def fetch(url, session):
    async with session.get(url) as response:
        return await response.read()

limit = 1000

async def print_when_done(tasks):
    for res in limited_as_completed(tasks, limit):
        await res

r = int(sys.argv[1])
url = "http://localhost:8080/{}"
loop = asyncio.get_event_loop()
with ClientSession() as session:
    coros = (fetch(url.format(i), session) for i in range(r))
    loop.run_until_complete(print_when_done(coros))
loop.close()

Again, this limits the number of requests to 1000.

Test setup

Finally, we have a test runner script called “timed”:

#!/usr/bin/env bash

./server &
sleep 1 # Wait for server to start

/usr/bin/time --format "Memory usage: %MKB\tTime: %e seconds" "$@"

# %e Elapsed real (wall clock) time used by the process, in seconds.
# %M Maximum resident set size of the process in Kilobytes.

kill %1

This runs each process, ensuring the server is restarted each time it runs, and prints out how long it took to run, and how much memory it used.

Results

When making only 10 requests, the async clients worked faster because they launched all the requests simultaneously and only had to wait for the longest one (3 seconds). The memory usage of all three clients was fine:

$ ./timed ./client-sync 10
Memory usage: 20548KB	Time: 15.16 seconds
$ ./timed ./client-async-sem 10
Memory usage: 24996KB	Time: 3.13 seconds
$ ./timed ./client-async-as-completed 10
Memory usage: 23176KB	Time: 3.13 seconds

When making 100 requests, the synchronous client was very slow, but all three clients worked eventually:

$ ./timed ./client-sync 100
Memory usage: 20528KB	Time: 156.63 seconds
$ ./timed ./client-async-sem 100
Memory usage: 24980KB	Time: 3.21 seconds
$ ./timed ./client-async-as-completed 100
Memory usage: 24904KB	Time: 3.21 seconds

At this point let’s agree that life is too short to wait for the synchronous client.

When making 10000 requests, both async clients worked quite quickly, and both had increased memory usage, but the semaphore-based one used almost twice as much memory as the limited_as_completed version:

$ ./timed ./client-async-sem 10000
Memory usage: 77912KB	Time: 18.10 seconds
$ ./timed ./client-async-as-completed 10000
Memory usage: 46780KB	Time: 17.86 seconds

For 1 million requests, the semaphore-based client took 25 minutes on my (32GB RAM) machine. It only used about 10% of my CPU, and it used a lot of memory (over 3GB):

$ ./timed ./client-async-sem 1000000
Memory usage: 3815076KB	Time: 1544.04 seconds

Note: PaweÅ‚’s version only took 9 minutes on his laptop and used all his CPU, so I wonder whether I have made a mistake somewhere, or whether my version of Python (3.5.2) is not as good as a later one.

The limited_as_completed version ran in a similar amount of time but used 100% of my CPU, and used a much smaller amount of memory (162MB):

$ ./timed ./client-async-as-completed 1000000
Memory usage: 162168KB	Time: 1505.75 seconds

Now let’s try 100 million requests. The semaphore-based version lasted 10 hours before it was killed by Linux’s OOM Killer, but it didn’t manage to make any requests in this time, because it creates all its futures before it starts making requests:

$ ./timed ./client-async-sem 100000000
Command terminated by signal 9

I left the limited_as_completed version over the weekend and it managed to succeed eventually:

$ ./timed ./client-async-as-completed 100000000
Memory usage: 294304KB	Time: 150213.15 seconds

So its memory usage was still very bounded, and it managed to do about 665 requests/second over an extended period, which is almost identical to the throughput of the previous cases.

Conclusion

Making a million requests is usually enough, but when we really need to do a lot of work while keeping our memory usage bounded, it looks like an approach like limited_as_completed is a good way to go. I also think it’s slightly easier to understand.

In the next post I describe my attempt to get something like this added to the Python standard library.

Python 3 – large numbers of tasks with limited concurrency

Series: asyncio basics, large numbers in parallel, parallel HTTP requests, adding to stdlib

I am interested in running large numbers of tasks in parallel, so I need something like asyncio.as_completed, but taking an iterable instead of a list, and with a limited number of tasks running concurrently. First, let’s try to build something pretty much equivalent to asyncio.as_completed. Here is my attempt, but I’d welcome feedback from readers who know better:

# Note this is not a coroutine - it returns
# an iterator - but it crucially depends on
# work being done inside the coroutines it
# yields - those coroutines empty out the
# list of futures it holds, and it will not
# end until that list is empty.
def my_as_completed(coros):

    # Start all the tasks
    futures = [asyncio.ensure_future(c) for c in coros]

    # A coroutine that waits for one of the
    # futures to finish and then returns
    # its result.
    async def first_to_finish():

        # Wait forever - we could add a
        # timeout here instead.
        while True:

            # Give up control to the scheduler
            # - otherwise we will spin here
            # forever!
            await asyncio.sleep(0)

            # Return anything that has finished
            for f in futures:
                if f.done():
                    futures.remove(f)
                    return f.result()

    # Keep yielding a waiting coroutine
    # until all the futures have finished.
    while len(futures) > 0:
        yield first_to_finish()

The above can be substituted for asyncio.as_completed in the code that uses it in the first article, and it seems to work. It also makes a reasonable amount of sense to me, so it may be correct, but I’d welcome comments and corrections.

my_as_completed above accepts an iterable and returns a generator producing results, but inside it starts all tasks concurrently, and stores all the futures in a list. To handle bigger lists we will need to do better, by limiting the number of running tasks to a sensible number.

Let’s start with a test program:

import asyncio
async def mycoro(number):
    print("Starting %d" % number)
    await asyncio.sleep(1.0 / number)
    print("Finishing %d" % number)
    return str(number)

async def print_when_done(tasks):
    for res in asyncio.as_completed(tasks):
        print("Result %s" % await res)

coros = [mycoro(i) for i in range(1, 101)]

loop = asyncio.get_event_loop()
loop.run_until_complete(print_when_done(coros))
loop.close()

This uses asyncio.as_completed to run 100 tasks and, because I adjusted the asyncio.sleep command to wait longer for earlier tasks, it prints something like this:

$ time python3 python-async.py
Starting 47
Starting 93
Starting 48
...
Finishing 93
Finishing 94
Finishing 95
...
Result 93
Result 94
Result 95
...
Finishing 46
Finishing 45
Finishing 42
...
Finishing 2
Result 2
Finishing 1
Result 1

real    0m1.590s
user    0m0.600s
sys 0m0.072s

So all 100 tasks were completed in 1.5 seconds, indicating that they really were run in parallel, but all 100 were allowed to run at the same time, with no limit.

We can adjust the test program to run using our customised my_as_completed function, and pass in an iterable of coroutines instead of a list by changing the last part of the program to look like this:

async def print_when_done(tasks):
    for res in my_as_completed(tasks):
        print("Result %s" % await res)
coros = (mycoro(i) for i in range(1, 101))
loop = asyncio.get_event_loop()
loop.run_until_complete(print_when_done(coros))
loop.close()

But we get similar output to last time, with all tasks running concurrently.

To limit the number of concurrent tasks, we limit the size of the futures list, and add more as needed:

from itertools import islice
def limited_as_completed(coros, limit):
    futures = [
        asyncio.ensure_future(c)
        for c in islice(coros, 0, limit)
    ]
    async def first_to_finish():
        while True:
            await asyncio.sleep(0)
            for f in futures:
                if f.done():
                    futures.remove(f)
                    try:
                        newf = next(coros)
                        futures.append(
                            asyncio.ensure_future(newf))
                    except StopIteration as e:
                        pass
                    return f.result()
    while len(futures) > 0:
        yield first_to_finish()

We start limit tasks at first, and whenever one ends, we ask for the next coroutine in coros and set it running. This keeps the number of running tasks at or below limit until we start running out of input coroutines (when next throws and we don’t add anything to futures), then futures starts emptying until we eventually stop yielding coroutine objects.

I thought this function might be useful to others, so I started a little repo over here and added it: asyncioplus/limited_as_completed.py. Please provide merge requests and log issues to improve it – maybe it should be part of standard Python?

When we run the same example program, but call limited_as_completed instead of the other versions:

async def print_when_done(tasks):
    for res in limited_as_completed(tasks, 10):
        print("Result %s" % await res)
coros = (mycoro(i) for i in range(1, 101))
loop = asyncio.get_event_loop()
loop.run_until_complete(print_when_done(coros))
loop.close()

We see output like this:

$ time python3 python-async.py
Starting 1
Starting 2
...
Starting 9
Starting 10
Finishing 10
Result 10
Starting 11
...
Finishing 100
Result 100
Finishing 1
Result 1

real	0m1.535s
user	0m1.436s
sys	0m0.084s

So we can see that the tasks are still running concurrently, but this time the number of concurrent tasks is limited to 10.

See also

To achieve a similar result using semaphores, see Python asyncio.semaphore in async-await function and Making 1 million requests with python-aiohttp.

It feels like limited_as_completed is more re-usable as an approach but I’d love to hear others’ thoughts on this. E.g. could/should I use a semaphore to implement limited_as_completed instead of manually holding a queue?