Making 100 million requests with Python aiohttp

Series: asyncio basics, large numbers in parallel, parallel HTTP requests, adding to stdlib

Update: slides of a talk I gave at the London Python Meetup on this: Talk slides: Making 100 million HTTP requests with Python aiohttp.

Update: see how Cristian Garcia improved on this code here: Making an Unlimited Number of Requests with Python aiohttp + pypeln.

I’ve been working on how to make a very large number of HTTP requests using Python’s asyncio and aiohttp.

Paweł Miech’s post Making 1 million requests with python-aiohttp taught me how to think about this, and got us a long way, with 1 million requests running in a reasonable time, but I need to go further.

Paweł’s approach limits the number of requests that are in progress, but it uses an unbounded amount of memory to hold the futures that it wants to execute.

See also: 2 excellent related posts by Quentin Pradet: How do you rate limit calls with asyncio?, How do you limit memory usage with asyncio?.

We can avoid using unbounded memory by using the limited_as_completed function I outined in my previous post.

Setup

Server

We have a server program “server”:

(Note it differs from Paweł’s version because I am using an older version of aiohttp which has fewer convenient features.)

#!/usr/bin/env python3.5

from aiohttp import web
import asyncio
import random

async def handle(request):
    await asyncio.sleep(random.randint(0, 3))
    return web.Response(text="Hello, World!")

async def init():
    app = web.Application()
    app.router.add_route('GET', '/{name}', handle)
    return await loop.create_server(
        app.make_handler(), '127.0.0.1', 8080)

loop = asyncio.get_event_loop()
loop.run_until_complete(init())
loop.run_forever()

This just responds “Hello, World!” to every request it receives, but after an artificial delay of 0-3 seconds.

Synchronous client

As a baseline, we have a synchronous client “client-sync”:

#!/usr/bin/env python3.5

import requests
import sys

url = "http://localhost:8080/{}"
for i in range(int(sys.argv[1])):
    requests.get(url.format(i)).text

This waits for each request to complete before making the next one. Like the other clients below, it takes the number of requests to make as a command-line argument.

Async client using semaphores

Copied mostly verbatim from Making 1 million requests with python-aiohttp we have an async client “client-async-sem” that uses a semaphore to restrict the number of requests that are in progress at any time to 1000:

#!/usr/bin/env python3.5

from aiohttp import ClientSession
import asyncio
import sys

limit = 1000

async def fetch(url, session):
    async with session.get(url) as response:
        return await response.read()

async def bound_fetch(sem, url, session):
    # Getter function with semaphore.
    async with sem:
        await fetch(url, session)

async def run(session, r):
    url = "http://localhost:8080/{}"
    tasks = []
    # create instance of Semaphore
    sem = asyncio.Semaphore(limit)
    for i in range(r):
        # pass Semaphore and session to every GET request
        task = asyncio.ensure_future(bound_fetch(sem, url.format(i), session))
        tasks.append(task)
    responses = asyncio.gather(*tasks)
    await responses

loop = asyncio.get_event_loop()
with ClientSession() as session:
    loop.run_until_complete(asyncio.ensure_future(run(session, int(sys.argv[1]))))

Async client using limited_as_completed

The new client I am presenting here uses limited_as_completed from the previous post. This means it can make a generator that provides the futures to wait for as they are needed, instead of making them all at the beginning.

It is called “client-async-as-completed”:

#!/usr/bin/env python3.5

from aiohttp import ClientSession
import asyncio
from itertools import islice
import sys

def limited_as_completed(coros, limit):
    futures = [
        asyncio.ensure_future(c)
        for c in islice(coros, 0, limit)
    ]
    async def first_to_finish():
        while True:
            await asyncio.sleep(0)
            for f in futures:
                if f.done():
                    futures.remove(f)
                    try:
                        newf = next(coros)
                        futures.append(
                            asyncio.ensure_future(newf))
                    except StopIteration as e:
                        pass
                    return f.result()
    while len(futures) > 0:
        yield first_to_finish()

async def fetch(url, session):
    async with session.get(url) as response:
        return await response.read()

limit = 1000

async def print_when_done(tasks):
    for res in limited_as_completed(tasks, limit):
        await res

r = int(sys.argv[1])
url = "http://localhost:8080/{}"
loop = asyncio.get_event_loop()
with ClientSession() as session:
    coros = (fetch(url.format(i), session) for i in range(r))
    loop.run_until_complete(print_when_done(coros))
loop.close()

Again, this limits the number of requests to 1000.

Test setup

Finally, we have a test runner script called “timed”:

#!/usr/bin/env bash

./server &
sleep 1 # Wait for server to start

/usr/bin/time --format "Memory usage: %MKB\tTime: %e seconds" "$@"

# %e Elapsed real (wall clock) time used by the process, in seconds.
# %M Maximum resident set size of the process in Kilobytes.

kill %1

This runs each process, ensuring the server is restarted each time it runs, and prints out how long it took to run, and how much memory it used.

Results

When making only 10 requests, the async clients worked faster because they launched all the requests simultaneously and only had to wait for the longest one (3 seconds). The memory usage of all three clients was fine:

$ ./timed ./client-sync 10
Memory usage: 20548KB	Time: 15.16 seconds
$ ./timed ./client-async-sem 10
Memory usage: 24996KB	Time: 3.13 seconds
$ ./timed ./client-async-as-completed 10
Memory usage: 23176KB	Time: 3.13 seconds

When making 100 requests, the synchronous client was very slow, but all three clients worked eventually:

$ ./timed ./client-sync 100
Memory usage: 20528KB	Time: 156.63 seconds
$ ./timed ./client-async-sem 100
Memory usage: 24980KB	Time: 3.21 seconds
$ ./timed ./client-async-as-completed 100
Memory usage: 24904KB	Time: 3.21 seconds

At this point let’s agree that life is too short to wait for the synchronous client.

When making 10000 requests, both async clients worked quite quickly, and both had increased memory usage, but the semaphore-based one used almost twice as much memory as the limited_as_completed version:

$ ./timed ./client-async-sem 10000
Memory usage: 77912KB	Time: 18.10 seconds
$ ./timed ./client-async-as-completed 10000
Memory usage: 46780KB	Time: 17.86 seconds

For 1 million requests, the semaphore-based client took 25 minutes on my (32GB RAM) machine. It only used about 10% of my CPU, and it used a lot of memory (over 3GB):

$ ./timed ./client-async-sem 1000000
Memory usage: 3815076KB	Time: 1544.04 seconds

Note: Paweł’s version only took 9 minutes on his laptop and used all his CPU, so I wonder whether I have made a mistake somewhere, or whether my version of Python (3.5.2) is not as good as a later one.

The limited_as_completed version ran in a similar amount of time but used 100% of my CPU, and used a much smaller amount of memory (162MB):

$ ./timed ./client-async-as-completed 1000000
Memory usage: 162168KB	Time: 1505.75 seconds

Now let’s try 100 million requests. The semaphore-based version lasted 10 hours before it was killed by Linux’s OOM Killer, but it didn’t manage to make any requests in this time, because it creates all its futures before it starts making requests:

$ ./timed ./client-async-sem 100000000
Command terminated by signal 9

I left the limited_as_completed version over the weekend and it managed to succeed eventually:

$ ./timed ./client-async-as-completed 100000000
Memory usage: 294304KB	Time: 150213.15 seconds

So its memory usage was still very bounded, and it managed to do about 665 requests/second over an extended period, which is almost identical to the throughput of the previous cases.

Conclusion

Making a million requests is usually enough, but when we really need to do a lot of work while keeping our memory usage bounded, it looks like an approach like limited_as_completed is a good way to go. I also think it’s slightly easier to understand.

In the next post I describe my attempt to get something like this added to the Python standard library.

28 thoughts on “Making 100 million requests with Python aiohttp”

  1. I am using this to scrape data from a website however the website will place a temp block on IP addresses if it detects excessive use. Is there a way I can place a delay between requests? I am assuming I can use the time.sleep(seconds) function but unsure of the best place to insert within script.

  2. Jack, I think you could put a sleep inside the “async with” block in fetch(), but I’d seriously suggest you avoid spamming a web site against its terms and conditions! Maybe contact the site about getting hold of their data another way?

  3. Hi Nam, since many of the requests are happening in parallel, sharing cookies may not work the way you need, but apart from that caveat, you should be able to provide cookies the way you normally do it using the requests library.

  4. That is a nice experiment. Andy, the fact that your client-async-as-completed uses 100% CPU is not a good thing. It indicates a problem with using busy-waiting. Your “while True” loop continuously checks for task completion, and that causes 100% CPU utilization. A better approach is to use

    while futures:
    done, futures = await asyncio.wait(futures, return_when=asyncio.FIRST_COMPLETED)
    # then re-fill the set of futures from the coros iterable

    asyncio.wait does not do busy-waiting

  5. This is a great post! I learned a lot from it.
    I’m also thinking about how to use aiohttp for a similar task.
    Inspired by this post, I’m using only the semaphore to control the concurrent requests without exhausting CPU or memory. The only thing I do differently is to acquire the semaphore before creating the future task and release the semaphore after fetching the result.
    Code:
    https://github.com/flyakite/100-million-requests-aiohttp

  6. Hi!
    I created this library called `pypeln` – https://github.com/cgarciae/pypeln for creating many kinds of concurrent data pipelines. It currently supports Processes, Threads and asyncio Tasks.

    Your post was an inspiration when implementing the io module!

    With pypeln you can easily solve the problem you showed like this:

    “`
    from aiohttp import ClientSession
    from pypeln import io
    import sys

    r = int(sys.argv[1])
    url = “http://localhost:8080/{}”

    with ClientSession() as session:
    data = range(r)
    io.each(lambda i: fetch(url.format(i), session), data, workers=1000)
    “`

    Thanks for sharing you knowledge!

  7. I was planning to write a follow-up using pypeln, but I got this:

     cat client-pypeln 
    #!/usr/bin/env python3.6
    
    from aiohttp import ClientSession
    from pypeln import io
    import sys
    
    r = int(sys.argv[1])
    url = "http://localhost:8080/{}"
    
    with ClientSession() as session:
        data = range(r)
        io.each(lambda i: fetch(url.format(i), session), data, workers=1000)
    
    $ ./client-pypeln 10
    ./client-pypeln:10: UserWarning: Creating a client session outside of coroutine is a very dangerous idea
      with ClientSession() as session:
    Creating a client session outside of coroutine
    client_session: 
    Traceback (most recent call last):
      File "./client-pypeln", line 10, in 
        with ClientSession() as session:
      File "/usr/lib/python3/dist-packages/aiohttp/client.py", line 742, in __enter__
        raise TypeError("Use async with instead")
    TypeError: Use async with instead
    Unclosed client session
    client_session: 
    

    Can you give a more complete example?

  8. Hey Andy,

    Glad you liked it!

    The error is due to changes in the iohttp library, it seems that ClientSession now has to be run with “async with” instead of “with”. Your original code should no longer work.

    Here is a full working example:

    from aiohttp import ClientSession
    from pypeln import io
    import asyncio
    import sys

    async def fetch(url, session):
    async with session.get(url) as response:
    return await response.read()

    async def main():
    r = 10
    url = “http://google.com”

    # r = int(sys.argv[1])
    # url = “http://localhost:8080/{}”

    async with ClientSession() as session:
    data = range(r)
    await io.each(lambda i: fetch(url, session), data, workers=1000, run = False)

    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

    print(“Finish”)

  9. Hmm, something is not working the way we expect – it took longer for this code to do 1000 requests than the client-async-as-completed took to do 10000:

    $ ./timed ./client-pypeln 1000
    Memory usage: 38672KB	Time: 21.66 seconds
    
  10. Thank you very much for the feedback!
    If possible, can we continue the discussion in this issue: https://github.com/cgarciae/pypeln/issues/9

    I looked at the code and made some optimizations, please update to the development code like this before you try:

    pip install git+https://github.com/cgarciae/pypeln@develop

  11. Hi ,

    This is great analysis on timecomplexity.
    Looking for similar requirement with Python based Unit test framework:
    1. Fire 100000 unique URL requests in parallel ( Not sequential and not repetitive requests) — Thread1
    2. Verify HTTP response status code ( eg: 200, 403, 404 etc) of each requests in parallel , may be handled via different thread — Thread2
    3. Verify Response headers of each URL requests –> may be handled via different thread – Thread3
    4. Login to remote machine ( to which requests is fired) and verify logs –> Can this be done using RpyC — Thread4

    any unit test framework like tornado or different framework using asyncio/aiohttp suitable with above requirements in optimized time fashion.

  12. Hi,

    Nice post – I would suggest looking into Queues to manage more easily the memory consumption:
    – you can insert 100M jobs into an asyncio queue: `queue = asyncio.Queue(); [queue.put_nowait(i) for i in range(r)]`. This will only consume a few 100s of MB of RAM.
    – then you can create a worker task that gets a job from the queue, run it, and moves on the the next one:
    “`
    async def worker(queue):
    while True:
    job = await queue.get()
    await actual_work(job) # you might want to wrap this in a try/except to handle crappy jobs
    job.task_done()
    “`

    – You can launch as many concurrent tasks as necessary with asyncio.create_task
    `workers = [asyncio.create_task(worker(queue)) for _ in range(1000)]`
    – And wait for everything to be finished before cancelling the workers:
    “`
    await queue.join()
    for task in workers:
    task.cancel()
    with contextlib.suppress(asyncio.CancelledError):
    await task
    “`

  13. Hi,

    thanks for a great article! Aiohttp got my intrest, as I’m trying to gather some basic info (not the whole response, but things like status, redirect history, etc.), and async approach seems to be the best. I had some memory issues as I’m trying to run my script on over a milion urls, and found your solution extremely helpful.

    I’m wondering however, have you ever tested it outside of the setup described in the article (i.e. with real urls, some of which might be broken or temporarily unavailable)? I guess I’m sort of reproducing your approach and adjusting it to my needs, however I’ve noticed that performance of my script dminishes greatly over time. For example, I was able to check 20K urls in roughly 20 mins, but then while running the script on a longer list, I got to 125K after 9 hours. Moreover, adjusting the task limit doesn’t seem to influence much – when I run it on 20K urls it was 20 mins regardless of whether I set the limit to 100 or 1000.

    Would you be able to help me with that? I’m not an expert on general programming (I use python primarily for analytical stuff), so I could have missed something. Do you know what this performance degradation might be caused by?

    Please see the code I’m using below (I’ve also tried to mix the ‘as completed’ with semaphores, but eventually, I think it doesn’t make too much sense).

    import pandas as pd
    import asyncio, aiohttp
    import time
    from itertools import islice

    def task_limiter(tasks, task_limit):

    futures = [asyncio.ensure_future(c) for c in islice(tasks, 0, task_limit)]

    async def task_list_manager():
    while True:
    await asyncio.sleep(0)
    for f in futures:
    if f.done():
    futures.remove(f)
    try:
    newf = next(tasks)
    futures.append(asyncio.ensure_future(newf))
    except StopIteration:
    pass
    return f.result()

    while len(futures) > 0:
    yield task_list_manager()

    async def collect_url_info(session, url, n):

    try:
    async with await session.get(url) as response:

    redirect_history = []

    for i in range(len(response.history)):
    if str(url) == str(response.history[i].url):
    continue
    else:
    redirect_history.append(str(response.history[i].url))

    print(f’Task {n} done’)
    return response.status, response.reason, redirect_history

    except Exception as e:

    msg = str(type(e)) + “|||” + str(e)
    print(f’Task {n} done with status -1 {msg}’)
    return (-1, msg, list())

    async def bound_collect(sem, session, url, n):
    async with sem:
    return await collect_url_info(session, url, n)

    async def run_url_check(urls, task_limit, conn_limit, timeout):

    sem = asyncio.Semaphore(conn_limit)
    connector = aiohttp.TCPConnector(limit=None)

    headers = {
    “Accept”: “text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9”,
    “Accept-Encoding”: “gzip, deflate”,
    “Accept-Language”: “en-GB,en-US;q=0.9,en;q=0.8”,
    “Dnt”: “1”,
    “Upgrade-Insecure-Requests”: “1”,
    “User-Agent”: “Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/74.0.3729.169 Safari/537.36”,
    “Referer”: “google.com”
    }

    async with aiohttp.ClientSession(connector=connector, timeout=timeout, headers=headers) as session:
    tasks = (collect_url_info(session, urls[i], i) for i in range(len(urls)))

    result = [None for i in range(len(urls))]
    for i, res in enumerate(task_limiter(tasks, task_limit)):
    result[i] = await res

    return result

    def prep_output_df(df, url_col, url_info):

    df[‘status_code’] = [x[0] for x in url_info]
    df[‘status_reason’] = [x[1] for x in url_info]
    df[‘redirect_history’] = [str(x[2]) for x in url_info]
    df[‘final_url’] = [x[2][-1] if len(x[2]) != 0 else ” for x in url_info]
    df[‘redirect_flag’] = df[‘final_url’].apply(lambda x: 1 if x != ” else 0, 1)

    return df

    ###SCRIPT
    start = time.time()

    task_limit = 10000
    #conn_limit = 1000
    timeout = aiohttp.ClientTimeout(total=None, sock_connect=60, sock_read=60)

    input_path = ‘input.csv’
    data = pd.read_csv(input_path, nrows = 20000)
    url_col = ‘WEBSITE’
    urls = data[url_col]

    loop = asyncio.get_event_loop()
    url_info = loop.run_until_complete(run_url_check(urls, task_limit, conn_limit, timeout))

    data = prep_output_df(data, url_col, url_info)

    total_requests = len(data)
    error_requests = (data[‘status_code’] == -1).sum()
    print(‘NUMBER OF REQUESTS:’)
    print(total_requests)
    print(‘NUMBER OF ERRORS:’)
    print(error_requests)
    print(‘ERROR RATE:’)
    print((error_requests/total_requests))

    output_path = ‘out.csv’
    data.to_csv(output_path, index=False)

    end = time.time()

    print(f’Time taken: {end-start}s’)

  14. Hi Lukasz, my first guess would be that the site(s) you are connecting to may slow down or throttle you if you hit them this hard. It might be worth logging the time between making a request and receiving a response. Another guess: if you’re storing the responses in memory, you may be starting to use up your computer’s memory space. If memory gets tight, the machine will slow right down. Does it seem unresponsive? The solution in that case would be to write responses to a file as you go, instead of doing it at the end.

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.