Python 3 – large numbers of tasks with limited concurrency

Series: asyncio basics, large numbers in parallel, parallel HTTP requests, adding to stdlib

I am interested in running large numbers of tasks in parallel, so I need something like asyncio.as_completed, but taking an iterable instead of a list, and with a limited number of tasks running concurrently. First, let’s try to build something pretty much equivalent to asyncio.as_completed. Here is my attempt, but I’d welcome feedback from readers who know better:

# Note this is not a coroutine - it returns
# an iterator - but it crucially depends on
# work being done inside the coroutines it
# yields - those coroutines empty out the
# list of futures it holds, and it will not
# end until that list is empty.
def my_as_completed(coros):

    # Start all the tasks
    futures = [asyncio.ensure_future(c) for c in coros]

    # A coroutine that waits for one of the
    # futures to finish and then returns
    # its result.
    async def first_to_finish():

        # Wait forever - we could add a
        # timeout here instead.
        while True:

            # Give up control to the scheduler
            # - otherwise we will spin here
            # forever!
            await asyncio.sleep(0)

            # Return anything that has finished
            for f in futures:
                if f.done():
                    futures.remove(f)
                    return f.result()

    # Keep yielding a waiting coroutine
    # until all the futures have finished.
    while len(futures) > 0:
        yield first_to_finish()

The above can be substituted for asyncio.as_completed in the code that uses it in the first article, and it seems to work. It also makes a reasonable amount of sense to me, so it may be correct, but I’d welcome comments and corrections.

my_as_completed above accepts an iterable and returns a generator producing results, but inside it starts all tasks concurrently, and stores all the futures in a list. To handle bigger lists we will need to do better, by limiting the number of running tasks to a sensible number.

Let’s start with a test program:

import asyncio
async def mycoro(number):
    print("Starting %d" % number)
    await asyncio.sleep(1.0 / number)
    print("Finishing %d" % number)
    return str(number)

async def print_when_done(tasks):
    for res in asyncio.as_completed(tasks):
        print("Result %s" % await res)

coros = [mycoro(i) for i in range(1, 101)]

loop = asyncio.get_event_loop()
loop.run_until_complete(print_when_done(coros))
loop.close()

This uses asyncio.as_completed to run 100 tasks and, because I adjusted the asyncio.sleep command to wait longer for earlier tasks, it prints something like this:

$ time python3 python-async.py
Starting 47
Starting 93
Starting 48
...
Finishing 93
Finishing 94
Finishing 95
...
Result 93
Result 94
Result 95
...
Finishing 46
Finishing 45
Finishing 42
...
Finishing 2
Result 2
Finishing 1
Result 1

real    0m1.590s
user    0m0.600s
sys 0m0.072s

So all 100 tasks were completed in 1.5 seconds, indicating that they really were run in parallel, but all 100 were allowed to run at the same time, with no limit.

We can adjust the test program to run using our customised my_as_completed function, and pass in an iterable of coroutines instead of a list by changing the last part of the program to look like this:

async def print_when_done(tasks):
    for res in my_as_completed(tasks):
        print("Result %s" % await res)
coros = (mycoro(i) for i in range(1, 101))
loop = asyncio.get_event_loop()
loop.run_until_complete(print_when_done(coros))
loop.close()

But we get similar output to last time, with all tasks running concurrently.

To limit the number of concurrent tasks, we limit the size of the futures list, and add more as needed:

from itertools import islice
def limited_as_completed(coros, limit):
    futures = [
        asyncio.ensure_future(c)
        for c in islice(coros, 0, limit)
    ]
    async def first_to_finish():
        while True:
            await asyncio.sleep(0)
            for f in futures:
                if f.done():
                    futures.remove(f)
                    try:
                        newf = next(coros)
                        futures.append(
                            asyncio.ensure_future(newf))
                    except StopIteration as e:
                        pass
                    return f.result()
    while len(futures) > 0:
        yield first_to_finish()

We start limit tasks at first, and whenever one ends, we ask for the next coroutine in coros and set it running. This keeps the number of running tasks at or below limit until we start running out of input coroutines (when next throws and we don’t add anything to futures), then futures starts emptying until we eventually stop yielding coroutine objects.

I thought this function might be useful to others, so I started a little repo over here and added it: asyncioplus/limited_as_completed.py. Please provide merge requests and log issues to improve it – maybe it should be part of standard Python?

When we run the same example program, but call limited_as_completed instead of the other versions:

async def print_when_done(tasks):
    for res in limited_as_completed(tasks, 10):
        print("Result %s" % await res)
coros = (mycoro(i) for i in range(1, 101))
loop = asyncio.get_event_loop()
loop.run_until_complete(print_when_done(coros))
loop.close()

We see output like this:

$ time python3 python-async.py
Starting 1
Starting 2
...
Starting 9
Starting 10
Finishing 10
Result 10
Starting 11
...
Finishing 100
Result 100
Finishing 1
Result 1

real	0m1.535s
user	0m1.436s
sys	0m0.084s

So we can see that the tasks are still running concurrently, but this time the number of concurrent tasks is limited to 10.

See also

To achieve a similar result using semaphores, see Python asyncio.semaphore in async-await function and Making 1 million requests with python-aiohttp.

It feels like limited_as_completed is more re-usable as an approach but I’d love to hear others’ thoughts on this. E.g. could/should I use a semaphore to implement limited_as_completed instead of manually holding a queue?

Iterating over the lines of a file in Java

If you’re trying to write a standard command-line tool in Java you are going to experience pain.

In Python, if we want to do something with each line on input to your program, we can do something like:

import sys
for ln in sys.stdin:
    print(ln)  # (We have an extra newline but who's counting?)

In Java, after some fighting, the two closest alternatives I can find are:

import java.util.Scanner;
class IterateInputScanner {
    public static void main(String[] args) {
        for(String ln : (Iterable)(() -> new Scanner(System.in).useDelimiter("\n"))) {
            System.out.println(ln);
        }
    }
}

or, less fashionably, and even more horribly:

import java.io.BufferedReader;
import java.io.InputStreamReader;
class IterateInputReader {
    public static void main(String[] args) throws java.io.IOException {
        BufferedReader reader =
            new BufferedReader(new InputStreamReader(System.in));
        String ln;
        while((ln = reader.readLine()) != null) {
            System.out.println(ln);
        }
    }
}

Note that if you are reading from a stream other than stdin you will probably want to wrap this in a try-with-resources to close the stream when you’ve finished.

Take your pick, and I hope it saves you some fighting.

Raspberry Pi Jam “Chaos Car!”

Raspberry Pi 1
+ battery pack
+ Bluetooth USB dongle
+ i-racer bluetooth car
+ Raspberry Pi camera
+ some Python code
+ loads of sellotape
=

Chaos car!

Here’s the code:

#!/usr/bin/env python2

import os
import random
import bluetooth
import sys
import time

car_name = "DaguCar"

def find_car_mac( car_name ):
    ret = None

    print "Scanning for a device called %s..." % car_name

    devices = bluetooth.discover_devices()

    for addr in devices:
        dev_name = bluetooth.lookup_name( addr )
        if dev_name == car_name:
            print "Car found!  (MAC=%s)" % addr
            time.sleep( 1 )
            return addr
        else:
            print "Skipping device named '%s'." % dev_name

    sys.stderr.write( "Couldn't find a device called %s!\n" % car_name )
    sys.exit( 1 )


#car_mac = find_car_mac( car_name )

print "Using hard-coded MAC address"
car_mac = "20:13:04:23:05:71"

print "Connecting to the car at %s..." % car_mac

sock = bluetooth.BluetoothSocket( bluetooth.RFCOMM )
sock.connect( ( car_mac, 1 ) )

class Car:
    def turn_right(self):
        print("looking right")
        sock.send( '\x6A' )
    def turn_left(self):
        print("looking left")
        sock.send( '\x5A' )
    def roll_forward(self):
        print("watch out in front coming forward")
        sock.send( '\x1A' )
    def roll_backward(self):
        print("beep beep going backward")
        sock.send( '\x2A' )
    def wait(self):
        print("waiting")
        sock.send( '\x00' )

car = Car()

instruction = ["turn right", "turn left", "roll forward", "roll backward", "wait"]

while True:
    ci = random.choice(instruction)
    if ci == "turn right":
        car.turn_right()
    elif ci == "turn left":
        car.turn_left()
    elif ci == "roll forward":
        car.roll_forward()
    elif ci == "roll backward":
        car.roll_backward()
    elif ci == "wait":
        car.wait()
    time.sleep(0.5)

Improving the code to avoid bumping into walls is left as an exercise for the reader.

Resources for year 6 teachers on coding and programming

I have been introducing some year 6 (UK) teachers to coding by showing them how to lay out a simple web page by writing HTML. I promised I would find some links to resources for them, so here it is:

HTML and JavaScript

My examples of how to write HTML are at github.com/andybalaam/html-examples

There are several web sites that allow you to experiment with writing HTML and JavaScript and seeing the results immediately:

I also made some videos about how to make a snowflake animation in both JavaScript and Scratch here: Snowflake Christmas card.

Raspberry Pi

The Raspberry Pi is a cheap education-focussed computer that looks like a piece of circuit board the size of a credit card.

Their educational resources are at: raspberrypi.org/resources.

There are lots of great videos about how to do different things with the Raspberry Pi, including the ones by The Raspberry Pi Guy.

There are also my (boring, but comprehensive) videos teaching you to write a simple game in Python, from a starting point of no programming experience at all: My First Raspberry Pi Game.

Graphical programming

There are several tools and sites for learning programming by dragging and dropping blocks instead of typing code:

  • Scratch – creative, unguided, a bit old-fashined looking but tried-and-trusted
  • code.org – fun, attractive guided tasks featuring Disney characters, Minecraft etc.
  • blockly – guided tasks with good progression
  • codeforlife.education – I’ve not used this, but it looks like it could have potential

Other languages

Update: there is a huge list of similar resources here: Beginner’s Resources to Learn Programming Languages. Thanks to Erica and Sarah for emailing me the link!

Update 2: some more useful links: Learn to code from home and Computer Language for beginners: HTML. Thanks to Sarah for the suggestions, and Stacey for sending them to me!

Update 3: a comprehensive page of links to resources for learning how to code Virtual Reality. Thanks to Katie for the suggestion, and Yazmin for sending it. Keep on being a computer nerd!