Hello everyone! In this tutorial, we will learn about the Producer-Consumer problem which is a classical problem of concurrency and how to solve it using Python Threads. So let’s get started.
Also read: How to create custom datasets in Python?
What is Producer-Consumer Problem?
Producer-Consumer Problem consists of 3 components:
1. Bounded Buffer
A buffer is temporary storage that is accessible by different threads. A simple example of a buffer is an array. Multiple threads can read the data from the buffer as well as can write the data to the buffer concurrently. A bounded buffer is one that has a limited capacity and can’t store the data beyond its capacity.
2. Producer Thread
A Producer Thread is one that generates some data, puts it into the buffer, and starts again until all the data needed is not produced. An example of this could be a thread that downloads some data over the network and stores it temporarily into the buffer
3. Consumer Thread
A Consumer Thread is one that consumes the data present inside the buffer, uses it for some task, and starts again until the task assigned to the thread is not completed. An example of this could be a thread that reads the data that is downloaded over the internet and stores it in the database.
What happens when the operating rate for threads is different?
The speed of the operations done by the threads can vary depending on the tasks assigned. So, in our case, either our Producer Thread might be slow as compared to Consumer Thread or Producer Thread might be fast in generating data as compared to the speed at which Consumer Thread is consuming.
If the rate at which threads are operating is different, there might be some issues and that’s what Producer-Consumer Problem says.
- If the Producer Thread is trying to generate the data into the buffer and found that the buffer is already full, the Producer Thread can neither add more data inside the buffer nor it can overwrite the existing data that has not been consumed by the consumer yet. Therefore, the Producer Thread should stop itself until some data is not consumed from the buffer. This scenario might be possible if the Producer Thread is fast.
- If the Consumer Thread is trying to consume the data from the buffer but found that the buffer is empty, the Consumer Thread can’t take the data and it should stop itself until some data is not added into the buffer. This scenario might be possible if the Consumer Thread is fast.
- Since the buffer is shared among different threads that can access the data from the buffer simultaneously, race conditions are possible and both threads should not access the shared buffer at the same time. Either the Producer Thread should add the data to the buffer and the Consumer Thread should wait or the Producer Thread should wait while the Consumer Thread is working on shared buffer to read the data.
Solution to the problem using Semaphore
We can solve this problem with the help of Semaphores, which is a tool for synchronization between threads. We maintain 3 Semaphores in order to tackle 3 issues defined in our problem statement of the Producer-Consumer problem.
- empty: This semaphore stores the number of slots that are empty in our buffer. The initial value of this semaphore is the size of our bounded buffer. Before adding any data in the buffer, the Producer thread will try to acquire this semaphore and will decrease its value by 1. If the value of this semaphore is already 0, this means that the buffer is full and our empty semaphore will block the Producer Thread until the value of the empty semaphore becomes greater than 0. Similarly, after the Consumer Thread has consumed the data from the buffer, it will release this semaphore, increasing the value of the semaphore by 1.
- full: This semaphore stores the number of slots that are full in our buffer. The initial value of this semaphore is 0. Before consuming the data from the buffer, the Consumer Thread will try to acquire this semaphore. If the value of this semaphore is already 0, this means that the buffer is already empty and our full semaphore will block the Consumer Thread until the value of the full semaphore becomes greater than 0. Similarly, the Producer Thread will release this semaphore after it has added one item in it.
- mutex: This semaphore will handle the race condition by allowing only one semaphore to operate on the shared buffer at a time. The initial value of this semaphore is 1. Before operating on the shared buffer, both threads will try to acquire this semaphore. If any thread found the value of this semaphore as 0, this means that the other thread is operating on the buffer and it will be blocked by the semaphore. After operating on the buffer, the working thread will release this semaphore so that the other thread can operate on the buffer.
We also maintain 2 pointer to help our threads where to add or take the data.
- in pointer: This pointer will tell our Producer Thread where to add the next data in the buffer generated by the producer. After adding, the pointer is incremented by 1.
- out pointer: This pointer will tell our Consumer Thread where to read the next data from the buffer. After reading, the pointer is incremented by 1.
Implementing Producer-Consumer Problem in Python
Let us check the implementation on how to solve this problem in Python. Say we have a bounded buffer of capacity 10. The Producer Thread will produce 20 items and the Consumer Thread will consume those 20 items produced by the Producer. Adding
time.sleep(1) in Producer and
time.sleep(2.5) in Consumer makes our Producer Thread operate faster than Consumer Thread. Even if we are starting our Consumer Thread first, it will wait till there is no data present in our buffer.
import threading import time # Shared Memory variables CAPACITY = 10 buffer = [-1 for i in range(CAPACITY)] in_index = 0 out_index = 0 # Declaring Semaphores mutex = threading.Semaphore() empty = threading.Semaphore(CAPACITY) full = threading.Semaphore(0) # Producer Thread Class class Producer(threading.Thread): def run(self): global CAPACITY, buffer, in_index, out_index global mutex, empty, full items_produced = 0 counter = 0 while items_produced < 20: empty.acquire() mutex.acquire() counter += 1 buffer[in_index] = counter in_index = (in_index + 1)%CAPACITY print("Producer produced : ", counter) mutex.release() full.release() time.sleep(1) items_produced += 1 # Consumer Thread Class class Consumer(threading.Thread): def run(self): global CAPACITY, buffer, in_index, out_index, counter global mutex, empty, full items_consumed = 0 while items_consumed < 20: full.acquire() mutex.acquire() item = buffer[out_index] out_index = (out_index + 1)%CAPACITY print("Consumer consumed item : ", item) mutex.release() empty.release() time.sleep(2.5) items_consumed += 1 # Creating Threads producer = Producer() consumer = Consumer() # Starting Threads consumer.start() producer.start() # Waiting for threads to complete producer.join() consumer.join()
Producer produced : 1 Consumer consumed item : 1 Producer produced : 2 Producer produced : 3 Consumer consumed item : 2 Producer produced : 4 Producer produced : 5 Consumer consumed item : 3 Producer produced : 6 Producer produced : 7 Producer produced : 8 Consumer consumed item : 4 Producer produced : 9 Producer produced : 10 Consumer consumed item : 5 Producer produced : 11 Producer produced : 12 Producer produced : 13 Consumer consumed item : 6 Producer produced : 14 Producer produced : 15 Consumer consumed item : 7 Producer produced : 16 Producer produced : 17 Consumer consumed item : 8 Producer produced : 18 Consumer consumed item : 9 Producer produced : 19 Consumer consumed item : 10 Producer produced : 20 Consumer consumed item : 11 Consumer consumed item : 12 Consumer consumed item : 13 Consumer consumed item : 14 Consumer consumed item : 15 Consumer consumed item : 16 Consumer consumed item : 17 Consumer consumed item : 18 Consumer consumed item : 19 Consumer consumed item : 20
Congratulations! Now you know how to solve the classical Producer-Consumer problem. There are many real life examples where similar situations can occur, like printing a document where multiple applications want to print a document, downloading the data over the network and storing in a database, etc.
Thanks for reading!!