CodePython

Understanding Concurrency & Parallelism In Python

This is the 9th post in a series of learning the Python programming language.

Concurrency and parallelism are two related but distinct concepts in computer science. Concurrency refers to the ability of a program to handle multiple tasks at the same time, while parallelism refers to the ability of a program to execute multiple tasks simultaneously, usually on separate cores or processors.

Concurrency

In Python, concurrency can be achieved using the threading module, which provides a way to create and manage threads. A thread is a separate execution flow within a program and can be used to run tasks concurrently with the main program.

Here’s an example of using threads to download multiple files concurrently:

import threading
import urllib.request
import os

def download_file(url, semaphore):
"""Download a file from the given url"""
try:
with semaphore:
filename = url.split('/')[-1]
if os.path.exists(filename):
print(f"{filename} already exists. Skipping...")
return
print(f"Downloading {filename}...")
opener = urllib.request.build_opener()
opener.addheaders = [('User-agent', 'Mozilla/5.0')]
urllib.request.install_opener(opener)
urllib.request.urlretrieve(url, filename)
print(f"{filename} downloaded.")
except Exception as e:
print(f"Failed to download {filename}: {e}")

# list of files to download
urls = [
"https://filesamples.com/samples/document/txt/sample1.txt",
"https://filesamples.com/samples/document/txt/sample2.txt",
"https://filesamples.com/samples/document/txt/sample3.txt"
]

# Semaphore to limit the number of parallel downloads
semaphore = threading.Semaphore(2)

# create a list of threads
threads = []
for url in urls:
t = threading.Thread(target=download_file, args=(url, semaphore))
threads.append(t)
t.start()

# wait for all threads to finish
for t in threads:
t.join()

This code downloads multiple files from the URLs in the urls list using multiple threads, with some optimizations to handle corner cases and synchronization issues.

  1. download_file function: The function downloads a file from the given URL. It starts by checking if the file already exists and skips the download if it does. It then builds an opener object and sets the User-agent header to simulate a browser request. The file is then downloaded using urllib.request.urlretrieve and saved with a name based on the last part of the URL (i.e., the file name). The function is surrounded by a try-except block to handle any exceptions that may occur during the download process.
  2. Thread pooling: The code uses a semaphore to limit the number of parallel downloads to 2. The semaphore is acquired before starting each download and released after the download is complete. This helps prevent the program from overloading the system with too many threads.

Parallelism

multiprocessing is a module in Python that provides tools to write concurrent, parallel programs. The module allows you to write parallel programs that take advantage of multiple CPU cores in your system. This makes it possible to write fast, efficient programs that can perform multiple tasks at the same time.

The main component of the multiprocessing module is the Process class. The Process class is used to create new processes in your Python program. Each process created with the Process class runs as a separate entity and has its own memory space, making it possible to run multiple processes in parallel.

Here’s a simple example that demonstrates the use of the multiprocessing module to run multiple processes to calculate the sum of a list of numbers. The list of numbers is split into chunks and assigned to each process. The result of each process is stored in a shared Value object, which is managed by a Manager. The shared result is protected by a lock to avoid race conditions. After all the processes have finished executing, the final result is printed.

from multiprocessing import Process, Manager, Lock

def calculate_sum(numbers, result, lock):
"""Calculate the sum of a list of numbers"""
local_sum = sum(numbers)
with lock:
result.value += local_sum

def main():
# list of numbers to sum
numbers = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

# create a manager to share the result between processes
manager = Manager()
result = manager.Value('i', 0)
lock = Lock()

# create a list of processes
processes = []
chunk = len(numbers) // 2
for i in range(0, len(numbers), chunk):
p = Process(target=calculate_sum, args=(numbers[i:i+chunk], result, lock))
processes.append(p)
p.start()

# wait for all processes to finish
for p in processes:
p.join()

# print the result
print(result.value)

if __name__ == "__main__":
main()
  1. def calculate_sum(numbers, result, lock): This is a function that calculates the sum of a list of numbers. It takes three arguments: numbers, a list of numbers to sum, result, a shared Value object that holds the result of the sum, and lock, a Lock object to synchronize access to the shared result.
  2. numbers = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]: This is a list of numbers to sum.
  3. manager = Manager(): This creates a Manager object, which is used to create shared objects that can be accessed and manipulated by multiple processes.
  4. result = manager.Value('i', 0): This creates a shared Value object with an initial value of 0 and data type ‘i’ (integer). The result of the sum will be stored in this shared Value object.
  5. lock = Lock(): This creates a Lock object to synchronize access to the shared result.
  6. processes = []: This creates an empty list to hold the processes.
  7. chunk = len(numbers) // 2: This calculates the size of each chunk of numbers to be assigned to each process.
  8. for i in range(0, len(numbers), chunk):: This for loop creates a process for each chunk of numbers. It starts from 0 and increments by chunk until it covers the entire list of numbers.
  9. p = Process(target=calculate_sum, args=(numbers[i:i+chunk], result, lock)): This creates a new process with the calculate_sum function as its target and the current chunk of numbers, the shared result, and the lock as its arguments.
  10. processes.append(p): This appends the process to the processes list.
  11. p.start(): This starts the process.
  12. for p in processes:: This for loop waits for all processes to finish.
  13. p.join(): This waits for the current process to finish.
  14. print(result.value): This prints the final result, which is stored in the shared Value object.

Locks & Semaphores

Locks

In Python, locks and semaphores are used to synchronize access to shared resources when working with threads or processes.

A lock is a synchronization primitive that can be in one of two states: locked or unlocked. When a lock is locked, any other thread or process trying to acquire the lock will be blocked until it is released. Locks are typically used to protect shared resources that can be accessed by multiple threads or processes, such as a shared data structure or a file.

Here’s an example of using a lock to protect access to a shared list:

import threading

# shared list
numbers = []

# lock to protect access to the shared list
lock = threading.Lock()

def add_number(n):
"""Add a number to the shared list"""
# acquire the lock
lock.acquire()
numbers.append(n)
# release the lock
lock.release()

# create two threads
t1 = threading.Thread(target=add_number, args=(1,))
t2 = threading.Thread(target=add_number, args=(2,))

# start the threads
t1.start()
t2.start()

# wait for the threads to finish
t1.join()
t2.join()

# print the final state of the shared list
print(numbers)

Semaphores

A semaphore is similar to a lock, but it allows for multiple threads or processes to acquire the semaphore at the same time, up to a certain limit. Semaphores are typically used to limit the number of threads or processes that can access a shared resource at the same time.

Here’s an example of using a semaphore to limit the number of threads that can access a shared list:

import threading

# shared list
numbers = []

# semaphore to limit access to the shared list
sem = threading.Semaphore(value=2)

def add_number(n):
"""Add a number to the shared list"""
# acquire the semaphore
sem.acquire()
numbers.append(n)
# release the semaphore
sem.release()

# create four threads
t1 = threading.Thread(target=add_number, args=(1,))
t2 = threading.Thread(target=add_number, args=(2,))
t3 = threading.Thread(target=add_number, args=(3,))
t4 = threading.Thread(target=add_number, args=(4,))

# start the threads
t1.start()
t2.start()
t3.start()
t4.start()

# wait for the threads to finish
t1.join()
t2.join()
t3.join()
t4.join()

# print the final state of the shared list
print(numbers)

It’s worth noting that in python Lock and Semaphore can be used for both process and threading, but to share the resources between different processes, it’s necessary to use shared memory objects such as ValueArray from the multiprocessing module.

Value (multiprocessing module)

Value is used to create a shared variable that can be used to store a single value. The value can be of any type that can be pickled, such as an integer, float, or string.

Here’s an example of how to use Value to share a single integer value between two processes:

from multiprocessing import Process, Value

def increment(n):
for i in range(1000):
n.value += 1

if __name__ == '__main__':
num = Value('i', 0) # create a shared integer with initial value of 0
p1 = Process(target=increment, args=(num,))
p2 = Process(target=increment, args=(num,))
p1.start()
p2.start()
p1.join()
p2.join()
print(num.value)

Since the num value is shared among two processes if you run the code the num.value will be different every time (not 2000) because we have not synchronized it using the lock.

This is the updated version to handle synchronization with lock:

from multiprocessing import Process, Value, Lock

def increment(n, lock):
for i in range(1000):
with lock:
n.value += 1

if __name__ == '__main__':
num = Value('i', 0)
lock = Lock()
p1 = Process(target=increment, args=(num, lock))
p2 = Process(target=increment, args=(num, lock))
p1.start()
p2.start()
p1.join()
p2.join()
print(num.value)

Array (multiprocessing module)

Array is used to create a shared array that can be used to store multiple values. The array can be of any type that can be used with the array module, such as an integer, float, or string.

Here’s an example of how to use Array to share an array of 10 integers between two processes:

from multiprocessing import Process, Array

def increment(n):
for i in range(1000):
n[i % 10] += 1

if __name__ == '__main__':
num = Array('i', [0, 0, 0, 0, 0, 0, 0, 0, 0, 0]) # create a shared array of 10 integers
p1 = Process(target=increment, args=(num,))
p2 = Process(target=increment, args=(num,))
p1.start()
p2.start()
p1.join()
p2.join()
print(num[:])

It’s worth noting that Value and Array are only safe to use when the processes that use them are created using the multiprocessing module. Sharing memory between processes created using the multiprocessing and threading module is not safe and can lead to errors or data corruption.

Additionally, when using shared memory objects, it is important to use appropriate synchronization mechanisms such as locks, and semaphores to avoid race conditions and other synchronization issues.

If you like the post, don’t forget to clap. If you’d like to connect, you can find me on LinkedIn.

References:

https://www.python.org/doc/

Leave a Reply

Your email address will not be published.