Python BoundedBarrier
last modified February 15, 2025
In this article we show how to synchronize Python threads using a custom
BoundedBarrier.
A BoundedBarrier is a synchronization primitive that allows a fixed
number of threads to wait for each other to reach a common barrier point. Unlike
the built-in threading.Barrier, a BoundedBarrier can
be implemented with additional constraints, such as limiting the maximum number
of threads that can wait at the barrier.
This tutorial demonstrates how to create a custom BoundedBarrier
using Python's threading.Condition and threading.Lock.
BoundedBarrier Implementation
The following example demonstrates how to implement a custom BoundedBarrier.
import threading
class BoundedBarrier:
def __init__(self, max_threads):
self.max_threads = max_threads
self.count = 0
self.condition = threading.Condition()
def wait(self):
with self.condition:
self.count += 1
if self.count == self.max_threads:
self.condition.notify_all() # Notify all waiting threads
self.count = 0 # Reset the counter for reuse
else:
self.condition.wait() # Wait for other threads
def worker(barrier, thread_name):
print(f"{thread_name} is starting")
barrier.wait() # Wait at the barrier
print(f"{thread_name} has passed the barrier")
def main():
max_threads = 3
barrier = BoundedBarrier(max_threads)
threads = []
for i in range(max_threads): # Create 3 threads
thread = threading.Thread(target=worker, args=(barrier, f"Thread-{i+1}"))
threads.append(thread)
thread.start()
for thread in threads:
thread.join() # Wait for all threads to complete
print("All threads have passed the barrier")
if __name__ == "__main__":
main()
In this program, a custom BoundedBarrier is implemented using
threading.Condition. The barrier allows a fixed number of threads
to wait for each other before proceeding.
self.condition = threading.Condition()
The BoundedBarrier uses a Condition object to manage thread synchronization.
self.count += 1
if self.count == self.max_threads:
self.condition.notify_all() # Notify all waiting threads
self.count = 0 # Reset the counter for reuse
else:
self.condition.wait() # Wait for other threads
Each thread increments the counter when it reaches the barrier. If the counter reaches the maximum number of threads, all waiting threads are notified, and the counter is reset. Otherwise, the thread waits for other threads to arrive.
barrier = BoundedBarrier(max_threads)
The BoundedBarrier is initialized with the maximum number of
threads that can wait at the barrier.
$ python main.py Thread-1 is starting Thread-2 is starting Thread-3 is starting Thread-1 has passed the barrier Thread-2 has passed the barrier Thread-3 has passed the barrier All threads have passed the barrier
BoundedBarrier with Reuse
The following example demonstrates how to reuse the BoundedBarrier
for multiple synchronization points.
import threading
import time
class BoundedBarrier:
def __init__(self, max_threads):
self.max_threads = max_threads
self.count = 0
self.condition = threading.Condition()
def wait(self):
with self.condition:
self.count += 1
if self.count == self.max_threads:
self.condition.notify_all() # Notify all waiting threads
self.count = 0 # Reset the counter for reuse
else:
self.condition.wait() # Wait for other threads
def worker(barrier, thread_name, num_phases):
for phase in range(num_phases):
print(f"{thread_name} is working on phase {phase + 1}")
time.sleep(1) # Simulate work for the phase
print(f"{thread_name} has completed phase {phase + 1}")
barrier.wait() # Wait at the barrier
print(f"{thread_name} is moving to the next phase")
def main():
max_threads = 3
num_phases = 2 # Number of phases in the task
barrier = BoundedBarrier(max_threads)
threads = []
for i in range(max_threads): # Create 3 threads
thread = threading.Thread(target=worker, args=(barrier, f"Thread-{i+1}", num_phases))
threads.append(thread)
thread.start()
for thread in threads:
thread.join() # Wait for all threads to complete
print("All phases completed by all threads")
if __name__ == "__main__":
main()
In this program, the BoundedBarrier is reused for multiple
synchronization points. Each thread works on two phases, and the barrier ensures
that all threads complete one phase before moving to the next.
barrier.wait() # Wait at the barrier
Each thread calls the wait method on the barrier after completing a
phase. This ensures that all threads finish the current phase before moving to
the next one.
$ python main.py Thread-1 is working on phase 1 Thread-2 is working on phase 1 Thread-3 is working on phase 1 Thread-1 has completed phase 1 Thread-2 has completed phase 1 Thread-3 has completed phase 1 Thread-1 is moving to the next phase Thread-2 is moving to the next phase Thread-3 is moving to the next phase Thread-1 is working on phase 2 Thread-2 is working on phase 2 Thread-3 is working on phase 2 Thread-1 has completed phase 2 Thread-2 has completed phase 2 Thread-3 has completed phase 2 Thread-1 is moving to the next phase Thread-2 is moving to the next phase Thread-3 is moving to the next phase All phases completed by all threads
Multi-Phase Task with Timeout Example
The following example demonstrates how to use a custom
BoundedBarrier with a timeout to synchronize threads across
multiple phases of execution. If a thread does not reach the barrier within the
specified timeout, it will proceed without waiting for the other threads.
import threading
import time
class BoundedBarrier:
def __init__(self, max_threads):
self.max_threads = max_threads
self.count = 0
self.condition = threading.Condition()
def wait(self, timeout=None):
with self.condition:
self.count += 1
if self.count == self.max_threads:
self.condition.notify_all() # Notify all waiting threads
self.count = 0 # Reset the counter for reuse
return True # Barrier tripped
else:
if timeout is None:
self.condition.wait() # Wait indefinitely
else:
if not self.condition.wait(timeout): # Wait with timeout
self.count -= 1 # Decrement count if timeout occurs
return False # Barrier not tripped
return True # Barrier tripped
def worker(barrier, thread_name, num_phases):
for phase in range(num_phases):
print(f"{thread_name} is working on phase {phase + 1}")
time.sleep(1) # Simulate work for the phase
print(f"{thread_name} has completed phase {phase + 1}")
if not barrier.wait(timeout=2): # Wait at the barrier with a timeout
print(f"{thread_name} timed out waiting for the barrier in phase {phase + 1}")
continue
print(f"{thread_name} is moving to the next phase")
def main():
max_threads = 3
num_phases = 2 # Number of phases in the task
barrier = BoundedBarrier(max_threads)
threads = []
for i in range(max_threads): # Create 3 threads
thread = threading.Thread(target=worker, args=(barrier, f"Thread-{i+1}", num_phases))
threads.append(thread)
thread.start()
for thread in threads:
thread.join() # Wait for all threads to complete
print("All phases completed by all threads")
if __name__ == "__main__":
main()
In this program, the BoundedBarrier is used with a timeout to
synchronize threads across multiple phases. If a thread does not reach the
barrier within 2 seconds, it will proceed without waiting for the other threads.
def wait(self, timeout=None):
with self.condition:
self.count += 1
if self.count == self.max_threads:
self.condition.notify_all() # Notify all waiting threads
self.count = 0 # Reset the counter for reuse
return True # Barrier tripped
else:
if timeout is None:
self.condition.wait() # Wait indefinitely
else:
if not self.condition.wait(timeout): # Wait with timeout
self.count -= 1 # Decrement count if timeout occurs
return False # Barrier not tripped
return True # Barrier tripped
The wait method is updated to support a timeout. If the timeout
occurs, the thread decrements the counter and proceeds without waiting for the
other threads.
if not barrier.wait(timeout=2): # Wait at the barrier with a timeout
print(f"{thread_name} timed out waiting for the barrier in phase {phase + 1}")
continue
Each thread calls the wait method with a timeout of 2 seconds. If
the barrier is not tripped within this time, the thread proceeds without
waiting.
$ python main.py Thread-1 is working on phase 1 Thread-2 is working on phase 1 Thread-3 is working on phase 1 Thread-1 has completed phase 1 Thread-2 has completed phase 1 Thread-3 has completed phase 1 Thread-1 is moving to the next phase Thread-2 is moving to the next phase Thread-3 is moving to the next phase Thread-1 is working on phase 2 Thread-2 is working on phase 2 Thread-3 is working on phase 2 Thread-1 has completed phase 2 Thread-2 has completed phase 2 Thread-3 has completed phase 2 Thread-1 is moving to the next phase Thread-2 is moving to the next phase Thread-3 is moving to the next phase All phases completed by all threads
Source
Python threading - documentation
In this article we have shown how to synchronize Python threads using a custom BoundedBarrier.
Author
List all Python tutorials.