Python ThreadPoolExecutor
last modified January 29, 2024
In this article we show how to do concurrent programming in Python using ThreadPoolExecutor.
Concurrent programming's main goal is to make our code more efficient.
Concurrency may be achieved with threading, parallelism, or asynchrony. In this
tutorial, we deal with threading using the ThreadPoolExecutor
.
A thread is an independent execution of code. Threads are used for IO-bound tasks, such as downloading a file or executing a database command. Since Python enforces GIL, true parallelism is (mostly) not possible with threads. For parallel programming, we should consider multiprocessing/ThreadPoolExecutor.
A global interpreter lock (GIL) is a mechanism used in Python interpreter to synchronize the execution of threads so that only one native thread can execute at a time, even if run on a multi-core processor. This is to prevent concurrency bugs.
The threading
module provides a basic interface for concurrency
using threads.
ThreadPoolExecutor
The concurrent.futures
module provides a high-level interface for
concurrently executing callables. The ThreadPoolExecutor
is part
of the module. The ThreadPoolExecutor
simplifies the creation and
management of threads. It manages a pool of workes which makes the whole process
of creation, management, and closing of threads more efficient.
There is some significant overhead when creating a new thread. The worker threads are designed to be reused once the tasks are completed.
Future
A Future is an object that represents the eventual result of an
asynchronous operation. A future ends in a result or an exeption. Its
result
function returns the result of an concurrent operation.
Python ThreadPoolExecutor submit
The submit
function schedules the callable to be executed as and
returns a future object representing the execution of the callable.
#!/usr/bin/python from time import sleep from concurrent.futures import ThreadPoolExecutor import threading def task(id, n): print(f"thread {id} started") print(f"thread {id} : {threading.get_ident()}") sleep(n) print(f"thread {id} completed") with ThreadPoolExecutor() as executor: executor.submit(task, 1, 4) executor.submit(task, 2, 3) executor.submit(task, 3, 2)
In the example, we submit three tasks to be executed.
def task(id, n): print(f"thread {id} started") print(f"thread {id} : {threading.get_ident()}") sleep(n) print(f"thread {id} completed")
The task is a function which prints the It od the thread and some basic
messages. It also sleeps for a given number of seconds. The
time.sleep
function is often used to simulate some long-running
task.
with ThreadPoolExecutor() as executor:
A new executor is created. It is used as a context manager so that it is shut down at the end.
executor.submit(task, 1, 4) executor.submit(task, 2, 3) executor.submit(task, 3, 2)
We submit three tasks with the submit
function.
$ ./submitfun.py thread 1 started thread 1 : 140563097032256 thread 2 started thread 2 : 140563088639552 thread 3 started thread 3 : 140563005306432 thread 3 completed thread 2 completed thread 1 completed
Python ThreadPoolExecutor map
The map
function applies the given function to each element in an
iterable. The function may accept multiple iterables.
#!/usr/bin/python from time import sleep from concurrent.futures import ThreadPoolExecutor import threading def task(id, n): print(f"thread {id} started") print(f"thread {id} : {threading.get_ident()}") sleep(n) print(f"thread {id} completed") with ThreadPoolExecutor() as executor: executor.map(task, [1, 2, 3], [4, 3, 2])
We rewrite the previous example using map
. It accepts two
iterables: the ids and the durations.
Python ThreadPoolExecutor Future.result
A Future represents an eventual result of a concurrent operation. The future's
result
funtion returns the value of the callable; it blocks until
the task associated with the future is done.
#!/usr/bin/python from time import sleep, perf_counter import random from concurrent.futures import ThreadPoolExecutor def task(tid): r = random.randint(1, 5) print(f'task {tid} started, sleeping {r} secs') sleep(r) return f'finished task {tid}, slept {r}' start = perf_counter() with ThreadPoolExecutor() as executor: t1 = executor.submit(task, 1) t2 = executor.submit(task, 2) t3 = executor.submit(task, 3) print(t1.result()) print(t2.result()) print(t3.result()) finish = perf_counter() print(f"It took {finish-start} second(s) to finish.")
Our task sleeps for a random number of seconds. We get a message from each of
the tasks. In addition, we use the time
module to calculate the
elapsed time.
return f'finished task {tid}, slept {r}'
The returned value will be available via the result
function call.
start = perf_counter()
With perf_counter
, we calculate the elapsed time.
t1 = executor.submit(task, 1) t2 = executor.submit(task, 2) t3 = executor.submit(task, 3) print(t1.result()) print(t2.result()) print(t3.result())
We submit three tasks and retrieve their results. Note that the
result
function is blocking; therefore, we get the task results in
the original scheduling order.
$ ./resultfun.py task 1 started, sleeping 3 secs task 2 started, sleeping 4 secs task 3 started, sleeping 1 secs finished task 1, slept 3 finished task 2, slept 4 finished task 3, slept 1 It took 4.005295900977217 second(s) to finish.
The whole process lasts as long as its longest task (plus some overhead). The
tasks finish in the order they were scheduled, because the result
function is blocking. In the next example, we fix this.
Python ThreadPoolExecutor as_completed
The as_completed
function returns an iterator over the futures that
yields futures as they complete.
Unfortunately, it is not possible to use map
with
as_completed
.
#!/usr/bin/python from time import sleep, perf_counter import random from concurrent.futures import ThreadPoolExecutor, as_completed def task(tid): r = random.randint(1, 5) print(f'task {tid} started, sleeping {r} secs') sleep(r) return f'finished task {tid}, slept {r}' start = perf_counter() with ThreadPoolExecutor() as executor: tids = [1, 2, 3] futures = [] for tid in tids: futures.append(executor.submit(task, tid)) for res in as_completed(futures): print(res.result()) finish = perf_counter() print(f"It took {finish-start} second(s) to finish.")
In this example, we get the results of the tasks in the order they finish.
$ ./as_completed.py task 1 started, sleeping 3 secs task 2 started, sleeping 4 secs task 3 started, sleeping 2 secs finished task 3, slept 2 finished task 1, slept 3 finished task 2, slept 4 It took 4.00534593896009 second(s) to finish.
Multiple concurrent HTTP requests
In the next example, we use ThreadPoolExecutor
to generate multiple
HTTP requests. The requests
library is used to generate an HTTP
request.
#!/usr/bin/python import requests import concurrent.futures import time def get_status(url): resp = requests.get(url=url) return resp.status_code urls = ['http://webcode.me', 'https://httpbin.org/get', 'https://google.com', 'https://stackoverflow.com', 'https://github.com', 'https://clojure.org', 'https://fsharp.org'] tm1 = time.perf_counter() with concurrent.futures.ThreadPoolExecutor() as executor: futures = [] for url in urls: futures.append(executor.submit(get_status, url=url)) for future in concurrent.futures.as_completed(futures): print(future.result()) tm2 = time.perf_counter() print(f'elapsed {tm2-tm1:0.2f} seconds')
The example checks the HTTP status code of several web sites concurrently.
$ ./web_requests.py 200 200 200 200 200 200 200 elapsed 0.81 seconds
Concurrent pinging
In the next example, we ping the given web sites with an external program.
#!/usr/bin/python from time import perf_counter from concurrent.futures import ThreadPoolExecutor, as_completed import subprocess def task(url): ok, _ = subprocess.getstatusoutput( [f'ping -c 3 -w 10 {url}']) return ok == 0, url urls = ['webcode.me', 'clojure.org', 'fsharp.org', 'www.perl.org', 'python.org', 'go.dev', 'raku.org'] start = perf_counter() with ThreadPoolExecutor() as executor: futures = [] for url in urls: futures.append(executor.submit(task, url)) for future in as_completed(futures): r, u = future.result() if r: print(f'OK -> {u}') else: print(f'failed -> {u}') finish = perf_counter() print(f"elapsed {finish-start} second(s)")
The example uses the subprocess
module to execute an external
program.
ok, _ = subprocess.getstatusoutput( [f'ping -c 3 -w 10 {url}'])
The getstatusoutput
function returns the (exitcode, output) of the
executed command. The ping
is a standard Unix program which sends
ICMP ECHO_REQUESTs to network hosts. The -c
option determines the
number of packets being sent. The -w
option sets the deadline in
seconds.
$ ./pinging.py OK -> go.dev OK -> fsharp.org OK -> www.perl.org OK -> python.org OK -> raku.org OK -> clojure.org OK -> webcode.me elapsed 2.384801392967347 second(s)
Source
Python ThreadPoolExecutor - language reference
In this article we have worked with the ThreadPoolExecutor.
Author
List all Python tutorials.