The Producer-Consumer Problem in Python
The Producer-Consumer problem is an important abstraction and very relevant to how web applications work.
Threads in Python¶
For those of you unfamiliar with thread programming in Python, a few notes:
- Any object can be threaded (made to run with multiple threads); it just has to inherit from the
Thread
class. - It has to implement a
run()
method. This method is closely analogous tomain()
, in that it is the start of a sequence of execution steps (an entry point). - You start a thread by first making an instance of the runnable
class; call it
runner
. You then invoke the.start()
method. - If some other thread wants to wait for the runner to end, it
can invoke the
.join()
method on the other thread object (the runner).
Here's a good place to start learning more about understanding threads in Python
Here's the documentation on Python Threading
Synchronization¶
The key concern with using threads is the worry that one thread will modify a shared data structure while another thread is using it, so that the result is inconsistent or even disastrous (following null pointers and the like). We saw these issues with transactions as well.
One of the main tools to avoid problems is to make sure that we lock any shared data structures whenever we use them and release the lock when we are done. This is exactly the same idea as locking tables in MySQL transactions.
The understanding threads in Python blog post has an example of multiple threads all incrementing a global counter without locking. Here's an excerpt of that code:
#define a global variable
some_var = 0
class IncrementThread(Thread):
def run(self):
#we want to read a global variable
#and then increment it
global some_var
read_value = some_var
print("some_var in thread {} before increment is {}"
.format(self.name, read_value))
some_var = read_value + 1
print("some_var in thread {} after increment is {}"
.format(self.name, some_var))
Here it is with the accesses to the global counter synchronized using locks:
# define a global, shared lock
lock = Lock() # <------------------- create lock
#define a global variable
some_var = 0
class IncrementThread(Thread):
def run(self):
#we want to read a global variable
#and then increment it
lock.acquire() # <-------------------- acquire it
global some_var
read_value = some_var
print("some_var in thread {} before increment is {}"
.format(self.name, read_value))
some_var = read_value + 1
print("some_var in thread {} after increment is {}"
.format(self.name, some_var))
lock.release() # <-------------------- release it
Next, we'll look at more sophisticated examples of threads the course, when we look at synchronization and locking, including the producer-consumer problem.
Producer/Consumer¶
The Producer/Consumer problem is a useful abstraction of many real-world problems of threading. Indeed, it's pretty close to what Apache and Flask do:
- The main thread listens to the network (say on ports 80 and 443), and when a request comes in, it puts the request on a work queue and returns to listening. Thus, it's a producer
- The other threads wait for work to be added to the work queue, and when there's something to do, they do it (respond to the request). Thus, they are consumers.
Here's a useful tutorial about the Producer/Consumer problem:
The next few sections will discuss three different solutions to the producer/consumer problem in Python. These are important concepts and techniques for your knowledge of computer science, but you will probably not use any of these coding techniques in your projects, so you don't have to dig deeply into the code. Read for concepts.
Examples/Demos¶
These examples are in the threads
folder:
cd ~/cs304
cp -r ~cs304/pub/downloads/threads threads
cd threads
Daemonic Threads¶
The Python threading API defines two kinds of threads: daemons and non-daemons. A Python program is defined to end when all non-daemons are done. So, if your non-daemon thread is infinite, which they often are, they will never be done and your program is hard to exit.
Therefore, in the examples below, I go to a little extra effort to
make the threads daemons so that we can easily kill the
program. This is easily done with the setDaemon
method:
t1 = MyThread()
t1.setDaemon(True)
t1.start()
If we don't set the thread to be a daemon, the Python program becomes
hard to kill. ^C
, which usually works, doesn't. Instead, we have to
^Z
to suspend the process and then kill it:
$ python daemon.py N
starting thread
thread started. Waiting for you to type something
hi
jklsd
die!
^C
^C
^Z
[1]+ Stopped python daemon.py N
$ jobs
[1]+ Stopped python daemon.py N
$ kill %1
Note that killing a thread is usually the wrong thing. What if it's holding a resource or doing something to a data structure when it gets killed? What if it's halfway through a balance transfer in our banking/babysitting app? Killing it could leave that data structure in a broken state or fail an audit.
Usually, it's better to set up a communication, possibly using a shared variable, that requests that the thread exit. Something like this:
from threading import Thread
import time
keepGoing = None
class play(Thread):
def run(self):
global keepGoing
keepGoing = True
while keepGoing:
time.sleep(1)
print(' play')
print(' darn it!')
def main():
global keepGoing
kid = play()
print('go and play')
kid.start()
time.sleep(10)
print('time to stop!')
keepGoing = False
main()
The code above is in frolic.py
So, the moral is:
avoid killing a thread. Have it exit gracefully.
Producer-Consumer with Locking¶
Our first producer-consumer example is
producer_consumer_lock.py
. Here's an excerpt from the code. The lines
with comments are the important ones.
work = [] # shared data structure ===
lock = Lock() # create lock =============
class ProducerThread(Thread):
def run(self):
while True:
num = random.randint(1,10)
lock.acquire() # acquire it ==============
work.append(num)
print('Produced ',num,
'work is ',work)
lock.release() # release it ==============
time.sleep(random.random())
class ConsumerThread(Thread):
def run(self):
while True:
lock.acquire() # acquire it ==============
if len(work) == 0:
num = -1
else:
num = work.pop()
print('Consumed ',num,
'work is ',work)
lock.release() # release it =============
time.sleep(random.random())
Observations:
- prints the content of the work queue whenever anything happens
- If there's nothing to consume, this consumer thread just spins (loops endlessly), taking up CPU time doing nothing but checking if there's something to do. This is a busy wait. Busy-waits are usually bad because they waste the CPU.
- Better is for the consumers to go to sleep, and have the producer wake it up when there's work to be done.
Producer-Consumer with Conditions¶
Conditions are objects similar to locks, except that they have a list of threads waiting on that condition, and a thread can add itself to the list. When the condition that they are waiting for happens, the threads can be notified (woken up). That avoids the problem of busy-wait; instead the thread just sleeps until there is work to do.
This example of producer-consumer is
producer_consumer_lock.py
. Here's an excerpt from the code. The
lines with comments are the important ones.
work = [] # shared data structure
condition = Condition() # create condition
class ProducerThread(Thread):
def run(self):
while True:
num = random.randint(1,10)
condition.acquire() # acquire it
work.append(num)
print('Produced ',num,
'work is ',work)
condition.notify() # --- new! -------
condition.release() # release it
time.sleep(random.random())
class ConsumerThread(Thread):
def run(self):
while True:
condition.acquire() # acquire it
if len(work) == 0:
condition.wait() # --- new! -------
num = work.pop()
print('Consumed ',num,
'work is ',work)
condition.release() # release it
time.sleep(random.random())
Producer-Consumer with Queue¶
The Queue object encapsulates the blocking and notification. See Queue The code works like the condition version, but is easier.
producer_consumer_lock.py
. Here's an excerpt from the code. The
lines with comments are the important ones.
work = queue.Queue() # ----- new data structure ----
class ProducerThread(Thread):
def run(self):
while True:
num = random.randint(1,10)
work.put(num) # ---- automatically synchronized
print('Produced ',num,
'work len ',work.qsize())
time.sleep(random.random())
class ConsumerThread(Thread):
def run(self):
while True:
num = work.get(True) # block if work queue empty ------
print('Consumed ',num,
'work len ',work.qsize())
time.sleep(random.random())
Example Summary¶
Let's summarize our producer-consumer examples before turning to our last topics.
- threads should not be killed; they should be coded so they can gracefully exit
- access to shared data structures should be managed for thread safety
- locks can give exclusive access to shared data structure
- conditions can give also exclusive access to shared data structure, but can avoid busy-wait.
- the Python library has thread-safe data structures
Videos¶
I have created videos of these demos in action. See the videos page.