ZetCode

Python ThreadPoolExecutor

last modified July 29, 2022

In this article, we show how to do concurrent programming in Python using ThreadPoolExecutor.

Concurrent programming's main goal is to make our code more efficient. Concurrency may be achieved with threading, parallelism, or asynchrony. In this tutorial, we deal with threading using the ThreadPoolExecutor.

A thread is an independent execution of code. Threads are used for IO-bound tasks, such as downloading a file or executing a database command. Since Python enforces GIL, true parallelism is (mostly) not possible with threads. For parallel programming, we should consider multiprocessing/ThreadPoolExecutor.

A global interpreter lock (GIL) is a mechanism used in Python interpreter to synchronize the execution of threads so that only one native thread can execute at a time, even if run on a multi-core processor. This is to prevent concurrency bugs.

The threading module provides a basic interface for concurrency using threads.

ThreadPoolExecutor

The concurrent.futures module provides a high-level interface for concurrently executing callables. The ThreadPoolExecutor is part of the module. The ThreadPoolExecutor simplifies the creation and management of threads. It manages a pool of workes which makes the whole process of creation, management, and closing of threads more efficient.

There is some significant overhead when creating a new thread. The worker threads are designed to be reused once the tasks are completed.

Future

A Future is an object that represents the eventual result of an asynchronous operation. A future ends in a result or an exeption. Its result function returns the result of an concurrent operation.

Python ThreadPoolExecutor submit

The submit function schedules the callable to be executed as and returns a future object representing the execution of the callable.

submitfun.py
#!/usr/bin/python

from time import sleep
from concurrent.futures import ThreadPoolExecutor
import threading

def task(id, n):

    print(f"thread {id} started")
    print(f"thread {id} : {threading.get_ident()}")
    sleep(n)
    print(f"thread {id} completed")


with ThreadPoolExecutor() as executor:

    executor.submit(task, 1, 4)
    executor.submit(task, 2, 3)
    executor.submit(task, 3, 2)

In the example, we submit three tasks to be executed.

def task(id, n):

    print(f"thread {id} started")
    print(f"thread {id} : {threading.get_ident()}")
    sleep(n)
    print(f"thread {id} completed")

The task is a function which prints the It od the thread and some basic messages. It also sleeps for a given number of seconds. The time.sleep function is often used to simulate some long-running task.

with ThreadPoolExecutor() as executor:

A new executor is created. It is used as a context manager so that it is shut down at the end.

executor.submit(task, 1, 4)
executor.submit(task, 2, 3)
executor.submit(task, 3, 2)

We submit three tasks with the submit function.

$ ./submitfun.py
thread 1 started
thread 1 : 140563097032256
thread 2 started
thread 2 : 140563088639552
thread 3 started
thread 3 : 140563005306432
thread 3 completed
thread 2 completed
thread 1 completed

Python ThreadPoolExecutor map

The map function applies the given function to each element in an iterable. The function may accept multiple iterables.

mapfun.py
#!/usr/bin/python

from time import sleep
from concurrent.futures import ThreadPoolExecutor
import threading

def task(id, n):

    print(f"thread {id} started")
    print(f"thread {id} : {threading.get_ident()}")
    sleep(n)
    print(f"thread {id} completed")

with ThreadPoolExecutor() as executor:

    executor.map(task, [1, 2, 3], [4, 3, 2])

We rewrite the previous example using map. It accepts two iterables: the ids and the durations.

Python ThreadPoolExecutor Future.result

A Future represents an eventual result of a concurrent operation. The future's result funtion returns the value of the callable; it blocks until the task associated with the future is done.

resultfun.py
#!/usr/bin/python

from time import sleep, perf_counter
import random
from concurrent.futures import ThreadPoolExecutor

def task(tid):

    r = random.randint(1, 5)
    print(f'task {tid} started, sleeping {r} secs')
    sleep(r)

    return f'finished task {tid}, slept {r}'

start = perf_counter()

with ThreadPoolExecutor() as executor:

    t1 = executor.submit(task, 1)
    t2 = executor.submit(task, 2)
    t3 = executor.submit(task, 3)

    print(t1.result())
    print(t2.result())
    print(t3.result())

finish = perf_counter()

print(f"It took {finish-start} second(s) to finish.")

Our task sleeps for a random number of seconds. We get a message from each of the tasks. In addition, we use the time module to calculate the elapsed time.

return f'finished task {tid}, slept {r}'

The returned value will be available via the result function call.

start = perf_counter()

With perf_counter, we calculate the elapsed time.

t1 = executor.submit(task, 1)
t2 = executor.submit(task, 2)
t3 = executor.submit(task, 3)

print(t1.result())
print(t2.result())
print(t3.result())

We submit three tasks and retrieve their results. Note that the result function is blocking; therefore, we get the task results in the original scheduling order.

$ ./resultfun.py
task 1 started, sleeping 3 secs
task 2 started, sleeping 4 secs
task 3 started, sleeping 1 secs
finished task 1, slept 3
finished task 2, slept 4
finished task 3, slept 1
It took 4.005295900977217 second(s) to finish.

The whole process lasts as long as its longest task (plus some overhead). The tasks finish in the order they were scheduled, because the result function is blocking. In the next example, we fix this.

Python ThreadPoolExecutor as_completed

The as_completed function returns an iterator over the futures that yields futures as they complete.

Unfortunately, it is not possible to use map with as_completed.

as_completed.py
#!/usr/bin/python

from time import sleep, perf_counter
import random
from concurrent.futures import ThreadPoolExecutor, as_completed

def task(tid):

    r = random.randint(1, 5)
    print(f'task {tid} started, sleeping {r} secs')
    sleep(r)

    return f'finished task {tid}, slept {r}'

start = perf_counter()

with ThreadPoolExecutor() as executor:

    tids = [1, 2, 3]
    futures = []

    for tid in tids:
        futures.append(executor.submit(task, tid))

    for res in as_completed(futures):
        print(res.result())

finish = perf_counter()

print(f"It took {finish-start} second(s) to finish.")

In this example, we get the results of the tasks in the order they finish.

$ ./as_completed.py
task 1 started, sleeping 3 secs
task 2 started, sleeping 4 secs
task 3 started, sleeping 2 secs
finished task 3, slept 2
finished task 1, slept 3
finished task 2, slept 4
It took 4.00534593896009 second(s) to finish.

Multiple concurrent HTTP requests

In the next example, we use ThreadPoolExecutor to generate multiple HTTP requests. The requests library is used to generate an HTTP request.

web_requests.py
#!/usr/bin/python

import requests
import concurrent.futures
import time

def get_status(url):

    resp = requests.get(url=url)
    return resp.status_code

urls = ['http://webcode.me', 'https://httpbin.org/get',
    'https://google.com', 'https://stackoverflow.com',
    'https://github.com', 'https://clojure.org',
    'https://fsharp.org']

tm1 = time.perf_counter()

with concurrent.futures.ThreadPoolExecutor() as executor:

    futures = []

    for url in urls:
        futures.append(executor.submit(get_status, url=url))

    for future in concurrent.futures.as_completed(futures):
        print(future.result())

tm2 = time.perf_counter()
print(f'elapsed {tm2-tm1:0.2f} seconds')

The example checks the HTTP status code of several web sites concurrently.

$ ./web_requests.py 
200
200
200
200
200
200
200
elapsed 0.81 seconds

Concurrent pinging

In the next example, we ping the given web sites with an external program.

pinging.py
#!/usr/bin/python

from time import perf_counter
from concurrent.futures import ThreadPoolExecutor, as_completed
import subprocess


def task(url):

    ok, _ = subprocess.getstatusoutput(
        [f'ping -c 3 -w 10 {url}'])
    
    return ok == 0, url


urls = ['webcode.me', 'clojure.org', 'fsharp.org', 
    'www.perl.org', 'python.org', 'go.dev', 'raku.org']

start = perf_counter()

with ThreadPoolExecutor() as executor:

    futures = []

    for url in urls:
        futures.append(executor.submit(task, url))

    for future in as_completed(futures):

        r, u = future.result()
        
        if r:
            print(f'OK -> {u}')
        else:
            print(f'failed -> {u}')

finish = perf_counter()

print(f"elapsed {finish-start} second(s)")

The example uses the subprocess module to execute an external program.

ok, _ = subprocess.getstatusoutput(
    [f'ping -c 3 -w 10 {url}'])

The getstatusoutput function returns the (exitcode, output) of the executed command. The ping is a standard Unix program which sends ICMP ECHO_REQUESTs to network hosts. The -c option determines the number of packets being sent. The -w option sets the deadline in seconds.

$ ./pinging.py 
OK -> go.dev
OK -> fsharp.org
OK -> www.perl.org
OK -> python.org
OK -> raku.org
OK -> clojure.org
OK -> webcode.me
elapsed 2.384801392967347 second(s)

In this article, we have worked with the ThreadPoolExecutor.

List all Python tutorials.