On Thread-Safety and Monitors¶
A program module is thread-safe (or serializable) if, when called by multiple threads concurrently, the effect is the same as if the calls had been executed atomically, meaning in some sequential order.
Question: Are following Python procedures thread-safe? When are procedures thread-safe in general?
def binarySearch(a, key):
i, j = 0, len(a)
while i + 1 < j:
h = (i + j) // 2
if a[h] <= key: i = h
else: j = h
return a[i] == key
def swap(a, i, j): # exchange a[i] with a[j]
h = a[i]; a[i] = a[j]; a[j] = h
Answer: binarySearch
is thread-safe, but swap
not: concurrent calls swap(a, 0, 1)
and swap(a, 1, 2)
do not always have the effect of swap(a, 0, 1); swap(a, 1, 2)
or swap(a, 1, 2); swap(a, 0, 1)
. A possible interleaving is (below =
is assignment, not equality):
swap(a, 0, 1): |
a: |
swap(a, 1, 2): |
---|---|---|
[5, 6, 7, 8] |
||
h = a[0] |
||
h = a[1] |
||
a[0] = a[1] |
||
[6, 6, 7, 8] |
||
a[1] = a[2] |
||
[6, 7, 7, 8] |
||
a[1] = h |
||
[6, 5, 7, 8] |
||
a[2] = h |
||
[6, 5, 6, 8] |
When a procedure is called by several threads, the local variables of the procedure are local to each calling thread, thus the threads cannot affect each other's local variables. Threads could affect each other through global variables, e.g. parameters that point to objects. If that is not the case, procedures are thread-safe. Arithmetic functions like sin
are commonly thread-safe.
Question: Is following Java class thread-safe? When are classes thread-safe in general?
class Point {
int x, y;
Point(int x0, int y0) {
x = x0; y = y0;
}
void move(int dx, int dy) {
x += dx; y += dy;
}
}
Answer. Above, two threads calling move
of the same Point
object may lead to a race condition, the updates to the fields x
and y
may overwrite each other. In general, any class with fields is not thread-safe.
For example, HashMap in Java is not thread-safe, but ConcurrentHashMap is. However, there is a cost associated with using ConcurrentHashMap, which makes it less suitable if thread-safety is not needed.
The rule for the correctness of classes assumes that an object is used by a single thread, as in:
algorithm
class Counter
var a: integer = 0
var e: boolean = true
{e = even(a)}
method inc()
a := a + 1 ; e := ¬e
The invariant is preserved by inc()
when called by a single thread. If such a class is to be used concurrently, some form of mutual exclusion has to be guaranteed, e.g. by bracketing the body of methods in P
and V
semaphore operations.
Semaphores are a simple and universal mechanism. However:
- One can easily forget a
P
orV
operation, or have too many. - Semaphores are, in general, global to all processes: to see how they are used, one must examine the whole program.
- They are used for both mutual exclusion and condition synchronization without syntactic distinction.
Monitors were popularized under that name by C.A.R. Hoare (based E.W. Dijkstra's secretrary and P.B. Hansen's shared classes) as modules that impose a syntactic structure:
- Monitors provide data abstraction: the representation is hidden and can be accessed only through public procedures; monitors can be used and implemented independently.
- Mutual exclusion is achieved by not allowing two monitor procedures to execute concurrently, similar to atomic regions.
- Condition synchronization is achieved with condition variables that resemble but are different from semaphores.
The resulting program structure is a composition of active processes and passive monitors.
A monitor with variables a
, initialization S0
, and procedures p1
, ..., pn
with bodies S1
, ..., Sn
is declared as follows:
monitor M
var a: A
initialization
S0
procedure pi
Si
The monitor variables are private and the monitor procedures public. Furthermore, monitor procedures may not access variables declared outside of the monitor. The monitor variables are accessed by calling one of its procedures:
v ← pi(E)
Monitor procedures always execute in mutual exclusion. As there is no interference among the methods, a monitor invariant is preserved if each method preserves it when executed atomically:
algorithm
monitor Counter
var a: integer = 0
var e: boolean = true
{e = even(a)}
procedure inc()
a := a + 1; e := ¬e
procedure even() → (r: boolean)
r := e
Rule for correctness of monitors:
Suppose monitor M
is given with monitor invariant I
and procedures p1
, ..., pn
; the initialization S0
and the method bodies S1
,..., Sn
are annotated statements:
algorithm
monitor M
var ...
{I}
initialization
S0
procedure pi
Si
Monitor M
is correct if
- the initialization establishes
I
,
algorithm
{true} S0 {I}
- every procedure
pi
preservesI
:
algorithm
{I} Si {I}
Condition Variables¶
Condition synchronization is done by condition variables:
var cv: condition
The value of cv
is an initially empty queue of delayed processes. This queue can only be accessed through certain operations that can be used within the monitor in which cv
is declared:
wait(cv)
signal(cv)
After a signal(cv)
operation, two processes are ready to execute within the monitor, the signalling and the signalled process, provided the queue is not empty. With the signal-and-wait discipline the signaller waits and the signalled process continues: signal-and-wait is preemptive. Let P
be the signalling condition of cv
:
algorithm
monitor M
var cv: condition
{I}
procedure p
... {true} wait(cv) {P} ...
procedure q
... {P} signal(cv) {I} ...
With the signal-and-continue discipline the signaller continues and the signalled process executes later: signal-and-continue is non-preemptive.
algorithm
monitor M
var cv: condition
{I}
procedure p
... {true} wait(cv) {I} ...
procedure q
... {P} signal(cv) {P} ...
A monitor has one queue for processes blocked when trying to enter the monitor, and one for each condition variable for processes blocked in a wait operation inside the monitor. Following diagram shows how processes move between the queues and executing in the monitor.
Here is a bounded buffer implementation using the signal-and-wait discipline. The signalling condition for notempty
is n > 0
, meaning that it holds before signal(notempty)
and holds after wait(notempty)
, as control is transferred from one to the other. Likewise, the signalling condition for notfull
is n < C
:
algorithm
monitor BoundedBuffer
var buf: array 0 .. C - 1 of T
var in, out, n = 0, 0, 0
var notfull, notempty: condition
{0 ≤ out < C ∧ 0 ≤ n ≤ C ∧ in = (out + n) mod C}
algorithm
procedure deposit(x: T)
if n = C then wait(notfull) {n < C}
buf(in) := x ; in := (in + 1) mod C ; n := n + 1
{n > 0} signal(notempty)
algorithm
procedure fetch() → (x: T)
if n = 0 then wait(notempty) {n > 0}
x := buf(out) ; out := (out + 1) mod C ; n := n - 1
{n < C} signal(notfull)
Here is the analogous implementation using the signal-and-continue discipline. The signalling condition for notempty
is also n > 0
, meaning that it holds before signal(notempty)
, however, it cannot be assumed to hold after wait(notempty)
, as execution continues with signal(notempty)
. The signalling condition need to be checked after wait(notempty)
and waiting needs to be repeated as long as the signalling condition does not hold. the Likewise, the signalling condition for notfull
is n < C
:
algorithm
monitor BoundedBuffer
var buf: array 0 .. C - 1 of T
var in, out, n = 0, 0, 0
var notfull, notempty: condition
{0 ≤ out < C ∧ 0 ≤ n ≤ C ∧ in = (out + n) mod C}
algorithm
procedure deposit(x: T)
while n = C do wait(notfull)
{n < C}
buf(in) := x ; in := (in + 1) mod C ; n := n + 1
signal(notempty)
algorithm
procedure fetch() → (x: T)
while n = 0 do wait(notempty)
{n > 0}
fetch := buf(out) ; out := (out + 1) mod C ; n := n - 1
signal(notfull)
While originally signal-and-wait was proposed, signal-and-continue is now more widespread: it is used in Java, C/Pthread, Python. We will assume signal-and-continue from now on. While the signalling condition "loses its meaning", it is still a design decision that we keep as an annotation in programs when declaring a condition variable.
Monitors and semaphores are equivalent in the sense that one can be implemented with the other. However, they lead to different programming styles:
V(s)
on semaphores
is never "lost": the semaphore value is always incremented. If one thread is about to executeP(s)
and anotherV(s)
, their order does not matter.signal(cv)
on condition variable signals a waiting thread and does nothing if no thread is waiting. If one thread is about to executewait(cv)
and anothersignal(cv)
, their order matters.
Additional Operations on Condition Variables¶
empty(cv)
signalAll(cv)
while ¬empty(cv) do signal(cv)
Monitor Implementation of a Semaphore¶
As the signalling condition does not necessarily hold after wait
, it has to be re-tested:
algorithm
monitor Semaphore
var s = 0 {invariant: s ≥ 0}
var pos: condition {signalling condition: s > 0}
algorithm
procedure P
while s = 0 do wait(pos)
{s > 0} s := s - 1
algorithm
procedure V
s := s + 1
{s > 0} signal(pos)
Question: Give a scenario that shows that after wait(pos)
, the condition s > 0
does not hold! Suppose we were to replace while s = 0 do
with if s = 0 then
: under which circumstances would this be correct?
Answer: Suppose after initialization of the semaphore, process p1
calls P
and will be suspended. Process v1
calls V
and exits the monitor; the suspendes process p1
is put in the entry queue of Semaphore
. Before the scheduler allows p1
to continue, another process, say p2
calls P
, encounters s = 1
, does not get blocked, and exits the monitor. Now, when p1
is resumed, it encounterns s = 0
. This would not happen if there is only one process calling P
: in that case, while
in P
can be replaced with if
.
The previous semaphore implementation does not guarantee a FIFO discipline. Using the technique of passing the condition, the FIFO discipline can be guaranteed while using only an if statement for waiting:
algorithm
monitor Semaphore
var s = 0 {invariant: s ≥ 0}
var pos: condition {signalling condition: s > 0}
algorithm
procedure P
if s = 0 then wait(pos)
else s := s - 1
algorithm
procedure V
if empty(pos) then s := s + 1
else signal(pos)
Readers and Writers¶
The task to let either several readers or one writer access shared data. That data cannot be encapsulated in a monitor as due to mutual exclusion only one reader would be able to access it. Instead, data is kept shared and a monitor is used for arbitration:
- each read access is initiated by a (possibly blocking) call to
startRead
and terminated by a call toendRead
, - each write access is initiated by a (possibly blocking) call to
startWrite
and terminated by a call toendWrite
.
The solution below does not arbitrate between readers and writers, but rather lets the underlying scheduling policy determine which process gets access to data.
algorithm
monitor RWcontroller
var nr, nw = 0, 0 {(nr = 0 ∨ nw = 0) ∧ nw ≤ 1}
var readOK: condition {signalling condition: nw = 0}
var writeOK: condition {signalling condition: nr = 0 ∧ nw = 0}
algorithm
procedure startRead
while nw > 0 do wait(readOK)
nr := nr + 1
algorithm
procedure endRead
{nr > 0} nr := nr - 1
if nr = 0 then signal(writeOK)
algorithm
procedure startWrite
while nr > 0 or nw > 0 do wait(writeOK)
nw := nw + 1
algorithm
procedure endWrite
{nw > 0} nw := nw - 1
signal(writeOK) ; signalAll(readOK)
The Sleeping Barber¶
The problem goes back to E.W. Dijkstra, 1965. The following formulation is from Concurrent Programming by G. Andrews:
An easygoing town contains a small barbershop having two doors and a few chairs. Customers enter through one door and leave through the other. Due to the size of the shop, at most one customer or the barber can move around at a time.
Question: What is the analogy in computing?
Answer. Customers and the barber are processes. Customers are clients that request a service from the barber. The barber is a server who repeatedly provides this service. This is an example of a client/server relationship.
The synchronization between customer and the barber first requires them to rendezvous, i.e. one to wait for the other. Then the customer needs to wait until the barber has finished with the haircut. Finally the barber has to wait for the customer to leave: both barber and customer proceed through a series of synchronized stages, starting with a rendezvous.
The barbershop is represented by a monitor with procedures getNextCustomer
, finishedCut
, getHaircut
. By having a monitor to encapsulate the state of the barbershop, only one customer or the barber can "move around":
algorithm
procedure barber
while true do
getNextCustomer() --wait for a customer to sit in the barber's chair
cutting hair
finishedCut() --allow the customer to leave; returns after the customer left
algorithm
procedure customer
while true do
live happily
getHaircut() -- returns after the customer has received a the haircut
The town has one barber and a number of customers:
barber() ‖ customer() ‖ ... ‖ customer()
algorithm
monitor BarberShop
var barber, chair, exit = 0, 0, 0
var barberAvailable: condition {signalling condition: barber > 0}
var chairOccupied: condition {signalling condition: chair > 0}
var exitOpen: condition {signalling condition: exit > 0}
var customerLeft: condition {signalling condition: exit = 0}
{0 ≤ barber ≤ 1 ∧ 0 ≤ chair ≤ 1 ∧ 0 ≤ exit ≤ 1}
algorithm
procedure getHaircut()
while barber = 0 do wait(barberAvailable)
barber := barber - 1
chair := chair + 1 ; signal(chairOccupied)
while exit = 0 do wait(exitOpen)
exit := exit - 1 ; signal(customerLeft)
algorithm
procedure getNextCustomer()
{barber = 0}
barber := barber + 1 ; signal(barberAvailable)
while chair = 0 do wait(chairOccupied)
chair := chair - 1
algorithm
procedure finishedCut()
{exit = 0}
exit := exit + 1 ; signal(exitOpen)
while exit > 0 do wait(customerLeft)
Monitors in Java¶
In Java, every object can potentially be a monitor. Mutual exclusion is by default not given, but can be achieved by specifying a method as synchronized
:
%%writefile Counter.java
class Counter {
int a = 0;
boolean e = true;
// e == even(a)
synchronized void inc() {
a += 1; e = !e;
}
synchronized boolean even() {
return e;
}
}
Every object in Java has a lock. A synchronized method waits to obtain that lock before entering the body, and releases the lock again. Alternatively, the synchronized
statement allows the lock of any object to be obtained, in particular "this" object:
synchronized void inc() {
a += 1; e = !e;
}
is equivalent to:
void inc() {
synchronized(this) {a += 1; e = !e;}
}
Note that in contrast to "pure monitors":
- not all methods need to be synchronized,
- it is possible to have only a region of a method synchronized,
- synchronization can involve other objects than the called one,
- fields don't have to be private,
- there is no restriction that only owned objects can be accessed.
Java guarantees atomicity for values that fit into one 32-bit word, e.g. int, boolean, pointers to objects.
Question: Which methods of Counter
need to be synchronized if objects need to be thread-safe? What is the rule in general for a method to be thread-safe and not needing to be synchronized?
%%writefile Counter.java
class Counter {
int a = 0;
boolean e = true;
// e == even(a)
synchronized void inc() {
a += 1; e = !e;
}
synchronized boolean even() {
return e;
}
}
Answer. Above, even
does not need to be synchronized. In general, methods that read only one single word-sized variable do not need to be synchronized and are still thread-safe, provided that variable is updated atomically, i.e. all other methods contain only a single assignment to that variable.
Java does not support explicit condition variables, but there a single implicit condition variable with every object. There is a single delay queue per object (rather than one for entry and one for each condition variable) and the order of threads in that queue is not specified.
Java uses the signal-and-continue discipline. Every object has following methods:
obj.wait();
obj
.obj.notify();
obj
, if the queue is not empty, otherwise has no effectobj.notifyAll();
obj
.Calls obj.wait()
, obj.notify()
, and obj.notifyAll()
can appear only within a synchronized block for obj
. If multiple condition variable are needed, auxiliary objects of class Object
for that purpose can be used.
Semaphores in Java¶
Only one condition variable is needed, hence the queue of the object itself can be used (this is how library class java.util.concurrent.Semaphore is implemented):
%%writefile Semaphore.java
class Semaphore {
int val;
Semaphore(int init) {
val = init;
}
synchronized void P() throws InterruptedException {
while (val == 0) wait();
val -= 1;
}
synchronized void V() {
val += 1;
notify();
}
}
Note that the wait
statement may throw an InterruptedException
.
Question: Would it be acceptable to replace notify()
by notifyAll()
? Should that be done?
Answer. Above, the call to notify()
could be replaced by notifyAll()
: that would wake up all threads in the queue, but only one could proceed and the others would enter again wait()
. It would only lead to inefficiency.
Bounded Buffer in Java¶
For a buffer we need two condition variables, one for signalling that the buffer is empty and one for signalling that the buffer is not full. If we use a single queue for both conditions, a broadcast signal is needed to awaken all threads in the queue, which then have to test if their condition holds:
%%writefile OnePlaceBuffer.java
class OnePlaceBuffer {
private Object buf;
synchronized void deposit(Object x) throws InterruptedException {
while (buf != null) wait();
buf = x; notifyAll();
}
public synchronized Object fetch() throws InterruptedException {
while (buf == null) wait();
Object x = buf; buf = null;
notifyAll(); return x;
}
}
Question: What would happen if notifyAll()
would be replaced by notify()
?
Answer. Supposed multiple threads call deposit
and fetch
. If fetch
is modified to notify only one thread, then that notification may go to another thread waiting in fetch
, rather than to a thread waiting in deposit
. Using notify
instead of notifyAll
would here only be correct if there is a single thread calling deposit
and a single thread calling fetch
.
This implementation is wasteful, as upon waking up, all threads waiting in either deposit
or fetch
evaluate their condition, even though only one will be able to get entry. It would be more efficient to have two queues, one for entry to deposit
, one for entry to fetch
.
Here is an attempt for an integer buffer using counting semaphores:
%%writefile BoundedBuffer.java
class BoundedBuffer { // INCORRECT
int[] buf;
int in = 0, out = 0, size;
Semaphore full, empty;
/* buf != null && buf.length == size && 0 <= in < size && 0 <= out < size &&
0 <= full.val <= size && full.val + empty.val ` == size && in == (out + full.val) % size
*/
BoundedBuffer(int size) {
this.size = size; buf = new int[size];
full = new Semaphore(0); empty = new Semaphore(size);
}
synchronized void deposit(int x) throws InterruptedException {
empty.P(); System.out.println("depositing");
buf[in] = x; in = (in + 1) % size;
full.V();
}
synchronized int fetch() throws InterruptedException {
full.P(); System.out.println("fetching");
int x = buf[out]; out = (out + 1) % size;
empty.V(); return x;
}
}
Question: What can go wrong?
This may lead to a deadlock on an empty (or full) buffer:
- Thread A calls
fetch
and obtains a lock on the buffer; - Thread A obtains a lock to the
full
semaphore; - That call blocks and releases the lock to the
full
semaphore. - Thread B calls
deposit
of the buffer gets blocked as thread A has still a lock on the buffer.
Thus no further progress by thread A or B is possible. This situation can arise with nested monitor calls.
%%writefile BoundedBuffer.java
BoundedBuffer b = new BoundedBuffer(1);
class Fetcher extends Thread {
public void run() {
try {System.out.println(b.fetch());
} catch (Exception e) {}
}
}
System.out.println("starting");
Thread f = new Fetcher(); f.start(); Thread.sleep(100); // milliseconds
b.deposit(3)
Question: How can deadlock be avoided?
The correct solution avoids this by keeping a lock on at most one object by using the synchronized
statement to restrict the range of exclusive access in the buffer.
%%writefile BoundedBuffer.java
class BoundedBuffer <E> { // CORRECT
E[] buf;
int in = 0, out = 0, size;
Semaphore full, empty;
/* buf != null && buf.length == size && 0 <= in < size && 0 <= out < size &&
0 <= full.val <= size && full.val + empty.val == size && in == (out + full.val) % size
*/
BoundedBuffer(int size) {
this.size = size; buf = (E[]) new Object[size];
full = new Semaphore(0); empty = new Semaphore(size);
}
void deposit(E x) throws InterruptedException {
empty.P();
synchronized(this) {buf[in] = x; in = (in + 1) % size;}
full.V();
}
E fetch() throws InterruptedException {
E x; full.P();
synchronized(this) {x = buf[out]; buf[out] = null; out = (out + 1) % size;}
empty.V(); return x;
}
}
Question: What is the role of buf[out] = null
in fetch
?
Answer. The assignment buf[out] = null
prevents a pointer to buf[out]
to be kept unnecessarily, which may not allow the object to be garbage-collected: in Java, there is no notion of ownership that would control the lifetime of objects.
Readers and Writers in Java¶
We can allow only one writer and several readers by making only the method for writing, for starting to read, and ending to read synchronized. The new class serves both as the "database" and as the controller for accessing the data:
%%writefile ReadersWriters.java
class ReadersWriters {
// … data …
private int nr = 0; // number of readers
private synchronized void startRead() {
nr += 1;
}
private synchronized void endRead() {
nr -= 1; if (nr == 0) notify(); // awaken waiting writers
}
void read() {
startRead();
// … read data …
endRead();
}
synchronized void write() throws InterruptedException {
while (nr > 0) wait();
// … write data …;
notify(); // awaken another waiting writer
}
}
Monitors in C with Pthreads¶
[This needs the Python kernel.]
While the C language does not have a syntactic structure for monitors, mutual exclusion and condition synchronization are available through the POSIX threads library, which is supported on Linux, macOS, Android, and other Unix-based systems (see this tutorial from CMU or "man pthreads" from the shell). A mutual exclusion lock, or simply mutex, is declared and initialized by:
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
A critical section is then protected as follows:
pthread_mutex_lock(&mutex); /* also used for monitor entry */
critical section;
pthread_mutex_unlock(&mutex); /* also used for monitor exit */
A condition variable is declared and initialized with default values by:
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
Pthread uses the signal-and-continue discipline. Operations on condition variables are:
pthread_cond_wait(&cond, &mutex);
pthread_cond_signal(&cond, &mutex);
pthread_cond_broadcast(&cond, &mutex); // same as signalAll
pthread_cond_timedwait(&cond, &mutex, ×pec);
The timed wait waits until a signal or boadcast on the condition variable, or a timeout. All wait
and signal
operations require holding a mutex lock. The Pthreads standard also defines which C functions are required to be thread-safe.
%%writefile barber.c
#include <pthread.h>
#include <stdio.h>
#include <stdbool.h>
/* barber shop monitor */
int barber = 0, chair = 0, exit = 0;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; /* monitor lock */
pthread_cond_t barberAvailable = PTHREAD_COND_INITIALIZER; /* signalling condition: barber > 0 */
pthread_cond_t chairOccupied = PTHREAD_COND_INITIALIZER; /* signalling condition: chair > 0 */
pthread_cond_t exitOpen = PTHREAD_COND_INITIALIZER; /* signalling condition: exit > 0 */
pthread_cond_t customerLeft = PTHREAD_COND_INITIALIZER; /* signalling condition: exit = 0 */
/* monitor invariant: 0 <= barber <= 1 && 0 <= chair <= 1 && 0 <= exit <= 1 */
void getHaircut(long cust) {
pthread_mutex_lock(&mutex); /* monitor entry */
printf("%d waiting for barber\n", cust);
while (barber == 0) pthread_cond_wait(&barberAvailable, &mutex);
barber -= 1;
chair += 1; pthread_cond_signal(&chairOccupied);
printf("%d waiting for exit door\n", cust);
while (exit == 0) pthread_cond_wait(&exitOpen, &mutex);
exit -= 1; pthread_cond_signal(&customerLeft);
pthread_mutex_unlock(&mutex); /* monitor exit */
}
void getNextCustomer() {
pthread_mutex_lock(&mutex); /* monitor entry */
barber += 1 ; pthread_cond_signal(&barberAvailable);
printf("waiting for customer to sit in chair\n");
while (chair == 0) pthread_cond_wait(&chairOccupied, &mutex);
chair -= 1;
pthread_mutex_unlock(&mutex); /* monitor exit */
}
void finishedCut() {
pthread_mutex_lock(&mutex); /* monitor entry */
exit += 1; pthread_cond_signal(&exitOpen);
printf("waiting for customer to leave\n");
while (exit > 0) pthread_cond_wait(&customerLeft, &mutex);
pthread_mutex_unlock(&mutex); /* monitor exit */
}
/* end barber shop monitor */
void *villagebarber() {
while (true) {
getNextCustomer(); /* wait for a customer to sit in the barber's chair */
printf("cutting hair\n"); sleep(1);
finishedCut(); /* allow the customer to leave; returns after the customer left */
}
}
void *customer(void *t) {
long cust = (long) t;
while (true) {
printf("%d lives happily\n", cust); sleep(2);
getHaircut(cust);
}
}
int main(int argc, char *argv[]) {
int numcust = atoi(argv[1]);
long i;
pthread_t b, c[numcust];
pthread_create(&b, NULL, &villagebarber, NULL);
for (i = 0; i < numcust; i++) pthread_create(&c[i], NULL, &customer, (void *) i);
pthread_join(b, NULL);
for (i = 0; i < numcust; i++) pthread_join(c[i], NULL);
}
!cc -pthread barber.c
!./a.out 5
Recursive Locking¶
Python also supports monitors through lock objects and condition objects.
from threading import Lock
class Counter:
def __init__(self):
self.count, self.lock = 0, Lock()
def once(self):
self.lock.acquire()
self.count += 1
self.lock.release()
c = Counter()
for _ in range(10):
c.once()
c.count
This can be expressed more concisely and more safely with the with
statement: that ensures that even in case of an exception, the lock is released (as if release
occurs in finally
), so a potential deadlock is avoided. This follows the principle of robust programming of leaving an object always in a "consistent state":
class Counter:
def __init__(self):
self.count, self.lock = 0, Lock()
def once(self):
with self.lock:
self.count += 1
c = Counter()
for _ in range(10):
c.once()
c.count
Suppose we inted to define a more complex atomic operation on counters by inheriting Counter
and super-calling once
:
class CounterPlus(Counter):
def twice(self):
with self.lock:
super().once() # same as Counter.once(self)
super().once()
c = CounterPlus()
for _ in range(10):
c.twice()
c.count
Question: What goes wrong?
Answer. Method twice
obtains the lock to self
and calls once
, which tries to obtain the same lock again and blocks.
Question: How can this be avoided?
One solution is to avoid double locking by separating locking from the computation:
from threading import Lock
class Counter:
def __init__(self):
self.count, self.lock = 0, Lock()
def inc(self):
self.count += 1
def once(self):
with self.lock:
self.inc()
c = Counter()
for _ in range(10):
c.once()
c.count
class CounterPlus(Counter):
def twice(self):
with self.lock:
super().inc()
super().inc()
c = CounterPlus()
for _ in range(10):
c.twice()
c.count
Another solution is use recursive locks: these allow a thread to lock the same object repeatedly and keep a count of how often the thread acquired and released the lock:
from threading import RLock
class Counter:
def __init__(self):
self.count, self.lock = 0, RLock()
def once(self):
with self.lock:
self.count += 1
class CounterPlus(Counter):
def twice(self):
with self.lock:
super().once()
super().once()
c = CounterPlus()
for _ in range(10):
c.twice()
c.count
Pthreads also allows locks (mutexes) to be recursive. Recursive locks are more complex and, in general, considered to be more error-prone: the state of the lock now includes the id of the locking thread in addition to the state of the lock.
Implementation of Monitors with Semaphores¶
algorithm
monitor M
var cv: condition
...
procedure pi
... wait(cv) ...
procedure pj
... signal(cv) ...
One semaphore, e
, is needed for mutual exclusion of monitor procedures. For every condition variable cv
, one semaphore, cs
, for theads blocked on wait
, and one counter, cn
, for the length of the queue of cs
, are needed. The implementation with the signal-and-continue discipline is:
algorithm
var e: semaphore := 1 {0 ≤ e ≤ 1}
var cs: semaphore := 0 {0 ≤ cs}
var cn: integer := 0 {0 ≤ cn}
...
procedure pi
P(e) ... cn := cn + 1 ; V(e) ; P(cs) ; P(e) ... V(e)
procedure pj
P(e) ... if cn > 0 then (cn := cn - 1 ; V(cs)) ... V(e)
Discussion of Monitors¶
While the motivation for monitors, combining mutual exclusion with encapsulation in a syntactic construct, is appealing, the details are intricate:
- to the queue of threads trying to enter the monitor and the queues for each of the condition variables, one can add another queue of threads that were waiting and are ready to continue. Then, in total five different signalling disciplines are possible!
- the problem of nested monitor calls can also be solved by making a call to another monitor within a monitor an open call, meaning that the exclusive lock to the first monitor is released during the call. We have assumed (Java, Pthreads) that calls are closed calls. If calls are open, then the monitor invariant has to be established before each call.
- mutual exclusion is not always necessary to ensure the integrity (invariant) of the monitor; strict mutual exclusion may affect performance unnecessarily.
These issues have lead to numerous variants of monitors over the years. Some of those deviate even further. For example, C# has a lock
construct that is similar to Java's synchronized
. Synchronization is performed by events which are explicitly set and cleared in two different ways:
AutoResetEvent
: an event remains signalled until a call toWaitOne
, when the event is unsignalled,ManualResetEvent
: an event remains signalled until it is explicitly reset.