So far, we have assumed that processes share memory: they may have their own local variables, but they communicate through global variables, including those protected by monitors. In message passing, processes communicate by sending and receiving messages over channels. Shared memory is not needed. If all variables are local to a process, mutual exclusion is guaranteed and no interference is possible.
- In distributed systems, there is no shared memory and all communication is by message passing. This is the case for geographically distributed systems and for "thinly" connected multiprocessors. Examples are hypercube, systolic array, and flexible interconnect.
- Even when shared memory is available, message passing isolates processes, which may contribute to higher reliability and better performance. The Mach operating system kernel, on which macOS is built, uses message passing between processes (although Apple added other communication primities as well).
Message passing can vary:
- channels may be one-way or two-way
- channels may be global to the processes, attached to the sending process, or attached to the receiving process
- communication can be synchronous or asynchronous
In synchronous communication, sender and receiver have to wait for each other. With asynchronous communication, the sender and receiver execute independently and messages may be received arbitrarily later. Buffered communication is in between, as the sender may need to wait if the buffer is full.
Question: Give examples of the various forms of message passing in computing and everyday life!
Answer. Calling a phone is an example for synchronous communication. Sending e-mail and text messages are examples for asynchronous communication. With e-mail and text messages, the channel is attached to the receiver (mailbox). With live streaming, the channel is attached to the sender. In discussion fora, the channel is global. With e-mail, the channel is one-directinal, with video conferencing, the channel is bidirectional.
Synchronous Message Passing¶
The communication constructs introduced here build on those of Communicating Sequential Processes, introduced by Hoare (in 1978 and in 1985 expanded into a book).
Variables can be declared of channel type, which also specifies the type of data that can be sent over a channel. The notation is
var v: channel[T]
where T
is a type or a list of types. For example:
var printer: channel[string]
var keyboard: channel[char]
var port: channel[byte]
var dbrequest: channel[SQLquery]
var dbreply: channel[Table]
var dbinsert: channel[Key × Value]
There are two communication statements, the output statement (send statement)
c ! E
where c
is a channel and E
is an expression or list of expressions (pronounced "c
output E
") and the input statement (receive statement)
c ? v
where v
is a variable or a list of variables (pronounced "c
input v
").
For example, a producer is a process that send over a channel and a consumer is a process that receives over a channel:
var c: channel[T]
algorithm
process producer
do true →
produce data
c ! data
algorithm
process consumer
do true →
c ? data
consume data
If one process only sends data once and another only receives data once, then this is the same as just copying the data:
var c: channel[integer]
c ! 7 ‖ c ? v
v := 7
Processes may both receive and send. For example process copy
copies values from channel west
to channel east
:
algorithm
var west, east: channel[T]
process copy
var v: T
do true → west ? v ; east ! v
Other processes would then send by west ! ...
and receive by east ? ...
.
In the following example, a server is an infinite process that computes the greatest common divisor of two numbers. The sever waits for a request from channel args
, does the computation, and sends the result to channel result
:
var args: channel[integer × integer]
var result: channel[integer]
process gcd
var x, y: integer
do true →
args ? (x, y)
do x > y → x := x - y
⫿ y > x → y := y - x
result ! x
A client would get service by first sending the arguments and then waiting for the result:
args ! (a1, a2) ; result ? r
This illustrates that the procedure call r ← gcd(a1, a2)
can be understood as sending the arguments to the gcd
process and waiting for the result, which is how remote procedure calls (RPC) work.
Guarded Communication¶
If a server waits for a request from multiple clients, it would select the client that requests first. If clients send over different channels, this cannot be expressed with a c ? v
input statement, as that waits for channel c
specifically. A construct is needed that "waits for whatever comes first".
The guarded communication statement generalizes guarded commands by including communication in the guard. It is of the form
B; C → S
where B
is a Boolean expression and C
is a communication statement. It can be used in if
and do
guarded commands. If B
is left out, it is assumed to be true
. The conditional statement if B0 → S0 ⫿ B1 → S1
choses arbitrarily between S0
and S1
when both are true. If the guards contain communication, this applies analogously:
if west0 ? v → east ! v
⫿ west1 ? v → east ! v
This statement receives from either west0
or west1
, depending on which is ready to communicate. If both are ready, the choice is arbitrary.
A communication statement in the guard of a do
loop similarly blocks until communication can proceed. For example, process copy
can be reformulated by having the guard of the loop block when receiving, rather than the body block:
process copy
var v: T
do west ? v → east ! v
The do
loop never terminates. The guard blocks until another process sends on west
. In this form, copy
is a one-place buffer.
In general, the loop do B0 → S0 ⫿ B1 → S1
choses arbitrarily between S0
and S1
when both are true. If the guards contain communication, this applies analogously. For example, following process continuously merges from channels west0
and west1
:
process merge
var v: T
do west0 ? v → east ! v
⫿ west1 ? v → east ! v
Question: What is the difference to zip
? Both send to east
and receive from west0
and west1
.
process zip
var v: T
do west0 ? v → east ! v ; west1 ? v ; east ! v
Answer. Process zip
will alternate between receiving from west0
and west1
, starting with west0
. On the other hand, merge
will always receive from either west0
or west1
. For example trying to send twice over west0
will block with zip
but not with merge
.
Likewise, guards can contain send statements. For example, following process passes data from west
to either east0
or east1
:
process split
var v: T
west ? v
do east0 ! v → west ? v
⫿ east1 ! v → west ? v
Guards can contain both a Boolean expression and a communication statement. Communication can take place only when the corresponding guard is true:
process select
var v: T, s: integer
set s to 0 or 1
do s = 0; west0 ? v → east ! v ; set s to 0 or 1
⫿ s = 1; west1 ? v → east ! v ; set s to 0 or 1
Question: What happens if s
is set to a value different from 0
and 1
? What would happen if there were no communication statements?
If s
is neither 0
nor 1
, the loop would terminate.
Likewise, guards can contain send statements.
process distribute
var v: T, s: integer
west ? v ; set s to 0 or 1
do s = 0; east0 ! v → west ? v ; set s to 0 or 1
⫿ s = 1; east1 ! v → west ? v ; set s to 0 or 1
In general, guards of if
and do
statements can contain both send and receive statements. For example, this allows copy
to be generalized to a bounded buffer. Below C
is the capacity of the buffer:
process copy
var buffer: array 0 .. C - 1 of char
var in, out, n: integer = 0, 0, 0
do n < C; west ? buffer(in) → in, n := (in + 1) mod C, n + 1
⫿ n > 0; east ! buffer(out) → out, n := (out + 1) mod C, n - 1
The do
loop never terminates as one of the Boolean expression is always true.
- if
n = 0
, only the first alternative can be taken; it will be taken when another process executeswest ! E
; processes executingeast ? v
will block - if
n = C
, only the second alternative can be taken; it will be taken when another processes executeseast ? v
; processes executingwest ! E
will block - if
0 < n < C
, either alternative will be taken, depending on another process executingwest ! E
oreast ? v
.
Types of Processes¶
There are four common types of message-passing processes:
- Filter: a filter receives data from its input channels, transforms data, and sends it to its output channels;
- Client: a client initiates requests to a server; it may delay until it receives a response
- Server: a server is a reactive process: it waits for requests from clients, to which it may be able to respond immediately or would postpone the response. Often, servers run continuously
- Peers: peers are "equal" processes; they communicate, for example by exchanging a file, but none of the peers is a client or server.
Unix Pipes and Filters¶
The Unix operating system (including Linux, macOS) allows any program that reads from stdin
and writes to stdout
to be a filter. Files become asynchronous channels (traditionally with 4K bytes capacity). They can be connected by the pipe operator. Following connects the stdout
channel of P
with the stdin
channel of Q
:
P | Q
Common filters are cat
, sort
, wc
, uniq
, grep
, ps
, ls
, tee
:
!cat philosophers.go
!cat philosophers.go | wc
!ps aux | grep root | wc
Question: Find out the number of function declarations in philosophers.go
! Each function declaration starts with func
.
!cat philosophers.go | grep func
!cat philosophers.go | grep func | wc
More complex filters are tbl
, eqn
, troff
for text processing, pic
for drawing pictures, and cpp
, the C preprocessor.
Prime Number Generation¶
The Sieve of Eratosthenes works by starting with all potential prime numbers and successively eliminating all those that a multiples of other numbers.
2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
Eliminating all multiples of 2 results in:
2 3 5 7 9 11 13 15 17 19
Eliminating all multiples of 3 results in:
2 3 5 7 11 13 17 19
Now we can stop, as the first multiple of 5 that would need to be considered is 25; all smaller multiples have already been eliminated.
Question: How can that be implemented with communicating processes?
For determining the first L
prime numbers, we chain L
processes. The first process, sieve(0)
generates all odd numbers starting from 3
. All other processes sieve(i)
receive first a prime number, which they keep. For all other numbers they receive, they check if it is a multiple of their prime. If so, they drop it, otherwise they pass it on to the next process.
var ch: array 0 .. L - 1 of channel[integer]
process sieve(i: 0 .. L - 1)
if i = 0 then -- 0-th prime is 2
var j = 3 -- pass odd numbers
while j < N do
ch(0) ! j ; j := j + 2
else
var p, n: integer -- prime, next
ch(i - 1) ? p -- i-th prime is p
while true do
ch(i - 1) ? n
if n mod p ≠ 0 then ch(i) ! n
Processes and Channels in Go¶
In Go, procedures are called functions; processes (threads) are called goroutines and are created (forked) using the syntax
go f()
where f
is a function that can have parameters; there is no equivalent of joining a thread. A synchronous channel is declared by:
var c chan int
In addition, a channel needs to be created before it can be used:
c = make(chan int)
Sending E
over channel c
, in CSP c ! E
, is written as:
c <- E
Receiving x
over channel c
, in CSP c ? x
, is written as:
x = <- c
Go treats <- c
as an expression, which can also be used e.g. as a parameter.
In Go, there are only for
loops, which can be used as traditional for loops, as while loops, and as infinite loops. Following program starts a goroutine with an infinite loop. However, the program terminates when the main function terminates. The statement x := 3
declares x
to be an integer and sets it to 3
.
%%writefile printer.go
package main
var printer chan int
func printToScreen() {
for {print(<- printer)}
}
func main() {
printer = make(chan int)
go printToScreen()
for i := 3; i <= 9; i++ {printer <- i}
}
!go run copy.go
Here is the earlier copy
process with an infinite loop, receiving from west
and sending to east
.
Hint: Use the l key to turn on line numbers; the line numbers from the Go compiler are off by one, though.
%%writefile copy.go
package main
var west, east chan int
func copyWestEast() {
for {c := <- west; east <- c}
}
func printToScreen() {
for {print(<- east)}
}
func main() {
west = make(chan int)
east = make(chan int)
go copyWestEast()
go printToScreen()
for i := 3; i <= 9; i++ {west <- i}
}
!go run copy.go
Question: What is the output of following program?
%%writefile puzzle.go
package main
var c1, c2 chan int
func sub() {
c2 <- 1; println(<- c1)
}
func main() {
c1 = make(chan int)
c2 = make(chan int)
go sub()
println(<- c1); c2 <- 1
}
!go run puzzle.go
Go has built-in deadlock detection, unlike other languages. Hence the program terminates with an error message, rather than being stuck forever.
The following program illustrates the client-server structure in Go. The server is an infinite loop that waits for input from channel arg
, does the computation, and sends the result back on channel result
. If a pair of values is sent over a channel, a struct
needs to be defined. The example also illustrates how a while-loop is expressed in Go:
%%writefile gcd1.go
package main
type Arguments struct {x, y int}
var args chan Arguments
var result chan int
func gcd() {
for {
a := <- args
x, y := a.x, a.y
for x != y {
if x > y {x = x - y
} else {y = y - x}
}
result <- x
}
}
func main() {
args, result = make(chan Arguments), make(chan int)
go gcd()
args <- Arguments{20, 15}; println(<- result)
args <- Arguments{9, 6}; println(<- result)
}
!go run gcd1.go
The range
construct in Go can be used in for
loops to iterate over
- all elements of an array or array slice
- all elements of a string
- all elements of a map
- all values received through a channel
%%writefile range.go
package main
func main() {
fib := []int{1, 1, 2, 3, 5, 8, 13}
for i, v := range(fib) {println(i, v)}
forecast := "☀️🌨⛄️🌬"
for i, c := range(forecast) {println(i, c)} // caution
days := map[string]int{"mon":0, "tue":1, "wed":2, "thu":3, "fri":4, "sat":5, "sun":6}
for s, n := range(days) {println(s, n)}
}
!go run range.go
A loop over range(c)
for channel c
will iterate until the channel is closed. This can be used to shut down a server:
%%writefile gcd2.go
package main
import "time"
type Arguments struct {x, y int}
var args chan Arguments
var result chan int
func gcd() {
for a := range args {
x, y := a.x, a.y
for x != y {
if x > y {x = x - y}
else {y = y - x}
}
result <- x
}
println("gcd done");
}
func main() {
args, result = make(chan Arguments), make(chan int)
go gcd()
args <- Arguments{20, 15}; println(<- result)
args <- Arguments{9, 6}; println(<- result)
close(args); time.Sleep(time.Second)
println("main done")
}
!go run gcd2.go
Question: What happens if the time.Sleep
statement is left out?
Answer. Above program terminates when main
terminates; without time.Sleep
, the gcd
goroutine may not print "gcd done"
before the program terminates.
Exercise: Relying on timing for a thread to finish its work is not recommendable. Modify the program such that gcd
sends an acknowledgment when it is done and main
waits for that acknowledgment, instead of sleeping.
%%writefile gcd3.go
package main
type Arguments struct {x, y int}
var args chan Arguments
var result chan int
func gcd() {
for a := range args {
x, y := a.x, a.y
for x != y {
if x > y {x = x - y
} else {y = y - x}
}
result <- x
}
println("gcd done")
result <- -1
}
func main() {
args, result = make(chan Arguments), make(chan int)
go gcd()
args <- Arguments{20, 15}; println(<- result)
args <- Arguments{9, 6}; println(<- result)
close(args); <- result
println("main done")
}
!go run gcd3.go
Receiving over a channel can also take the form
x, ok = <- c
Variable ok
is set of false
if the channel is closed and to true
otherwise.
%%writefile closed.go
package main
import "time"
var c, d chan int
func printing() {
for {
x, ok := <- c
if !ok {break}
println(x)
}
for x := range(d) {
println(x)
}
println("done printing")
}
func main() {
c, d = make(chan int), make(chan int)
go printing()
c <- 3; c <- 5; close(c)
d <- 7; d <- 9; close(d)
time.Sleep(time.Second)
}
!go run closed.go
An alternative to closing a channel is to send a dedicated end of stream value. In the following implementation of the Sieve of Eratosthenes, the generating process, sieve(0)
, sends EOS
as the last value and terminates. All other processes of the sieve pass EOS
on once they received it and terminate. The main program starts all processes and waits to receive EOS
.
%%writefile eratosthenes.go
package main
const N = 100
const L = 15 // length of chain of filters
const EOS = -1 // end of stream
var c [L] chan int
func sieve(i int) {
if i == 0 {
println("prime", 0, 2) // 0-th prime is 2
for j := 3; j < N; j += 2 {c[0] <- j}
c[0] <- EOS
} else {
p := <- c[i - 1]; println("prime", i, p) // i-th prime is p
n := <- c[i - 1]
for n != EOS {
if n % p != 0 {c[i] <- n}
n = <- c[i - 1]
}
c[i] <- EOS
}
}
func main() {
for i := 0; i < L; i++ {c[i] = make(chan int)}
for i := 0; i < L; i++ {go sieve(i)}
<- c[L - 1] // receiving EOS
}
!go run eratosthenes.go
Vector Multiplication¶
Suppose in
is an n
-dimensional vector that is to be multiplied by a fixed vector v
of length n
. The resulting scalar product is
out = ∑ i ∈ 0 .. n - 1 • v(i) × in(i)
We generalize this to in
being a channel, delivering a stream of vectors to be multiplied with v
,and out
being a channel, accepting a stream of numbers. First, we rewrite out
using the usual inductive definition of ∑
and auxiliary vector mid(i)
, for 0 ≤ i ≤ n
:
mid(0)
mid(i + 1)
out
=
=
=
0
v(i) × in(i) + mid(i) for 0 ≤ i < n
mid(n)
For example, for n = 3
:
mid(0)
mid(1)
mid(2)
mid(3)
out
=
=
=
=
=
=
=
=
0
v(0) × in(0) + mid(0)
v(0) × in(0)
v(1) × in(1) + mid(1)
v(1) × in(1) + v(0) × in(0)
v(2) × in(2) + mid(2)
v(2) × in(2) + v(1) × in(1) + v(0) × in(0)
mid(3)
To parallelize the computation, we decide to have n
processes, each one performing a multiplication and addition:
func scalarmult(v int, in, mid, out chan int) {
for {out <- v * <- in + <- mid}
}
%%writefile vectormult.go
package main
const N = 4
var in [N] chan int
var out chan int
func scalarmult(v int, in, mid, out chan int) {
for {out <- v * <- in + <- mid}
}
func vectormult(v [N] int) {
var mid [N] chan int
for i := 0; i < N; i++ {mid[i] = make(chan int)}
for i := 0; i < N - 1; i++ {go scalarmult(v[i], in[i], mid[i], mid[i + 1])}
go scalarmult(v[N - 1], in[N - 1], mid[N - 1], out)
for {mid[0] <- 0}
}
func genstream() {
for r := 0; r < 20; r++ {
for i := 0; i < N; i++ {in[i] <- r}
}
}
func main() {
for i := 0; i < N; i++ {in[i] = make(chan int)}
out = make(chan int)
go genstream()
go vectormult([4]int{3, 7, 8, 5})
for {println(<- out)}
!go run vectormult.go
Guarded Communication in Go¶
An if
statement with guarded communication is expressed by the select
statement:
select {
case x = <- c: // receives x from channel c
case y := <- c: // declares y and receives y from channel c
case z, ok = <- c: // succeeds and ok set to false if c is closed
case c <- 7: // send 7 over channel c
}
The select
statement waits until communication on any of the cases is possible. The cases cannot have Boolean expressions. The select
statement can also have a default
if no communication is possible.
select {
case ...
default: // taken if no communication possible
}
The effect of default
may depend on scheduling and should therefore be used cautiously. The empty select
statement blocks forever:
select {}
Question: What does following program print?
%%writefile zerosandones.go
package main
var c chan int
func send0and1() {
for {
select {
case c <- 0:
case c <- 1:
}
}
}
func main() {
c = make(chan int)
go send0and1()
for i := 0; i < 100; i++ {print(<- c)}
}
!go run zerosandones.go
Answer. The created goroutine nondeterministically sends 0
or 1
over c
; in Go, nondeterminism is resolved randomly, so the program prints a random sequence of 0
's and 1
's.
Bounded Buffer with Go¶
A bounded buffer can only receive if the buffer is empty, can only send if the buffer is full, and can send or receive if the buffer is in between. As Go does not allow communication statements in guards of loops and does not allow Boolean expressions in select
statements, a for
loop with an explicit case analysis by if
statements is needed:
%%writefile bufferedcopy.go
package main
var west, east chan int
func copyWestEast() {
const C = 10
var buffer [C] int
var in, out, n int
for {
if n == C { // buffer full, send to east
east <- buffer[out]; out, n = (out + 1) % C, n - 1
} else if n == 0 { // buffer empty, receive from west
buffer[in] = <- west; in, n = (in + 1) % C, n + 1
} else { // either send to east or receive from west
select {
case east <- buffer[out]: out, n = (out + 1) % C, n - 1
case buffer[in] = <- west: in, n = (in + 1) % C, n + 1
}
}
}
}
func printToScreen() {
for v := range(east) {print(v, " ")}
}
func main() {
west = make(chan int)
east = make(chan int)
go copyWestEast()
go printToScreen()
for i := 0; i < 30; i++ {west <- i}
}
!go run bufferedcopy.go
Note that not necessarily all numbers are printed: the program terminates when main
terminates, even if other goroutines are still running.
Dining Philosophers in Go¶
Both philosophers and forks are processes (goroutines) communicating via channels:
- forks are shared between two philosophers: they get a request from either their left-hand or their right-hand philosopher
-
there are 5
right
channels and 5left
channels;right[i]
connectsfork[i]
withphilosopher[i]
andleft[i]
connectsfork[i]
withphilosopher[(i - 1) % 5])
-
fork
i
receives acquire and release notifications throughleft[i]
andright[i]
-
philosopher
i
picks up the forks by sending a notification to its left-hand fork, forki
, through that fork's right-hand channel, and then to its right-hand fork, fork(i + 1) % 5
, through that fork's left-hand channel - philosophers put down the forks by sending a notification again over to their left-hand fork and then their right-hand fork
- when forks are released by the philosopher who acquired it, they are ready to accept a new request again.
The channels are only used for synchronization, no data is being sent. As channel declarations in Go always require a type, bool
is used and an arbitrary value is sent. The blocking select {}
statement at the end of main
prevents the program from terminating immediately.
%%writefile philosophers.go
package main
import ("time"; "math/rand")
var left, right [5] chan bool
var ph [5] string
func philosopherState(i int, s string) {
ph[i] = s; println(ph[0], ph[1], ph[2], ph[3], ph[4])
time.Sleep(time.Second * time.Duration(rand.Int() % 4)) // sleep between 0 and 3 sec
}
func fork(i int) {
for {
select {
case <-left[i]: <- left[i]
case <-right[i]: <- right[i]
}
}
}
func philosopher(i int) {
for {
philosopherState(i, "thinks")
right[i] <- true; left[(i + 1) % 5] <- true
philosopherState(i, "eats ")
right[i] <- false; left[(i + 1) % 5] <- false
}
}
func main() {
for i := 0; i < 5; i++ {left[i], right[i], ph[i] = make(chan bool), make(chan bool), "thinks"}
for i := 0; i < 5; i++ {go fork(i); go philosopher(i)}
select {}
}
Question: Will the program deadlock or not?
!go run philosophers.go
Asynchronous Message Passing¶
In asynchronous message passing channels can store messages. The send operation does not block if the channel can store the sent message, irrespective if the receiver is ready or not. The receive operation blocks only if the channel is empty. The channel can have an (idealized) unbounded capacity or can be bounded. We consider channels with bounded capacity, as an unbounded channel is a bounded channel with capacity ∞
. The notation is
var v: channel[T](C)
where T
is a type or a list of types and C
is a positive integer, the capacity. For example:
var requests: channel[string](100)
Erlang uses asychronous message passing as the fundamental communication construct.
Formally, an asynchronous channel var c: channel[T](C)
is like a variable c
of type seq[T]
; sending and receiving simply appends to the sequence and removes the first element, provided that the sequence is not full or not empty.
Definition of asynchronous communication:
var c: channel[T](C)
c ! E
c ? v
=
=
=
var c: seq[T] = []
⟨#c < C → c := c + [E]⟩
⟨#c > 0 → v, c := c(0), c[1:]⟩
These definitions can be used to reason about programs with channels as about any programs with global variables (the channels being the global variables). However, a "more algebraic" style of reasoning is possible, which is the motivation for process algebras.
Laws of Programs¶
Let S
, T
, U
be statements, v
, w
disjoint list of variables, and B
, E
, F
expressions.
Sequential composition is associative (allowing to leave out parenthesis), has skip
as unit, and stop
as left-zero:
(L1)
(S ; T) ; U = S ; (T ; U)
(L2)
S ; skip = S = skip ; S
(L3)
stop ; S = stop
Question: Is stop
also right-zero, i.e. is S ; stop
equal to stop
?
If S
communicates with other processes, S ; stop = stop
does not hold. In particular, if S
keeps communicating and does not terminate, we would have rather S ; stop = S
.
Parallel composition is commutative, associative, and has skip
as unit:
(L4)
S ‖ T = T ‖ S
(L5)
(S ‖ T) ‖ U = S ‖ (T ‖ U)
(L6)
S ‖ skip = S
Question: Is stop
also unit of parallel composition, i.e. is S ‖ stop
equal to S
, or is stop
zero of parallel composition, i.e. is S ‖ stop
equal to stop
?
As stop
does not terminate, neither will S ‖ stop
, so that is not equal to S
for terminating S
. If S
communicates, then S ‖ stop
is also not equal to S
.
Nondeterministic choice is idempotent, commutative, associative, and has stop
as unit:
(L7)
S ⫿ S = S
(L8)
S ⫿ T = T ⫿ S
(L9)
(S ⫿ T) ⫿ U = S ⫿ (T ⫿ U)
(L10)
S ⫿ stop = S
Question: Is parallel composition also idempotent, i.e. is S ‖ S
equal to S
?
If S = x := x + 1
, then S ‖ S
is not the same as S
, so parallel composition is not idempotent in general. Sequential composition is not idempotent either. However, idempotency plays a role in the sequential composition of REST API calls.
Sequential composition distributes over nondeterministic choice to the left: first choosing between S
and T
and the continuing with U
is the same as choosing between S
followed by U
and T
followed by U
:
(L11)
(S ⫿ T) ; U = (S ; U) ⫿ (T ; U)
Question: Does sequential composition distribute over nondeterministic choice to the right, i.e. is S ; (T ⫿ U)
the same as (S ; T) ⫿ (S ; U)
?
For example S ; (T ⫿ stop)
is the same as S ; T
, but is different from (S ; T) ⫿ (S ; stop)
in case S
communicates via global variables with another process.
Parallel composition also distributes over nondeterministic choice. As parallel composition is commutative, we need to give only one direction:
(L12)
S ‖ (T ⫿ U) = (S ‖ T) ⫿ (S ‖ U)
Question: Does nondeterministic choice also distribute over parallel composition, i.e. is S ⫿ (T ‖ U)
equal to (S ⫿ T) ‖ (S ⫿ U)
?
In S ⫿ (T ‖ U)
, statement S
may be executed only once, in (S ⫿ T) ‖ (S ⫿ U)
is may be executed twice, so they cannot be the same.
Parallel composition relates to sequential composition and nondeterministic choice as follows. In the case both operands are atomic, their parallel composition is the same as executing them in any order:
(L13)
⟨S⟩ ‖ ⟨T⟩ = (⟨S⟩ ; ⟨T⟩) ⫿ (⟨T⟩ ; ⟨S⟩)
Question: Above, ⟨S⟩
and ⟨T⟩
are atomic. Give an example why in general S ‖ T = (S ; T) ⫿ (T ; S)
does not hold!
Consider S = T = x := x + 1
. Then S ‖ T
will increment x
by 1
or 2
, but (S ; T) ⫿ (T ; S)
will always increment x
by 2
.
In case both operands start with an atomic statement, their parallel composition would nondeterministically execute one of the atomic statements first and then the rest:
(L14)
(⟨S⟩ ; T) ‖ (⟨U⟩ ; V) = (⟨S⟩ ; (T ‖ ⟨U⟩ ; V)) ⫿ (⟨U⟩ ; (⟨S⟩ ; T) ‖ V))
A guarded atomic statement with a true guard is always executed and with a false guard never executed:
(L15)
⟨true → S⟩ = ⟨S⟩
(L16)
⟨false → S⟩ = stop
If a variable is declared but not used, the declaration can be omitted:
(L17)
var v: V · S = S provided v does not occur in S
If a variable is only assigned and not used, the assignment and the declaration can be omitted:
(L18)
var v: V · v, w := E, F = w := F
Declaring an initialized variable is the same as declaring an uninitialized variable and the initializing it:
(L19)
var v: V = E ; S = var v: V · v := E ; S
More generally, if initialized a variable is declared and then assigned a new value, the initialization can be left out if the initialization is "merged" into the assignment:
(L20)
var v: V = E · v := F ; S = var v: V · v := F[v := E] ; S
For example, var x: integer = 3 · x := x + 1 ; S
is the same as var x: integer = 3 · x := (x + 1)[x := 3] ; S
, which in turn is the same as var x: integer · x := 4 ; S
. This can be further generalized for an atomic guarded assignment statement:
(L21)
var v: V = E · ⟨B → v := F⟩ ; S = var v: V · ⟨B[v := E] → v := F[v := E]⟩ ; S
Declaring a variable global to two nondeterministic alternatives is the same as declaring it local in each of the alternatives:
(L22)
var v: V = E · (S ⫿ T) = (var v: V = E · S) ⫿ (var v: V = E · T)
If the goal of a distributed system is to achieve something what could be done without distribution, then these laws can in principle be used for showing the equivalence. For example, we can prove that
var c: channel[integer](1) · c ! 7 ‖ c ? x = x := 7
as follows:
var c: channel[integer](1) · c ! 7 ‖ c ? x
= «by definitions»
var c = [] · ⟨#c < 1 → c := c + [7]⟩ ‖ ⟨#c > 0 → x, c := c(0), c[1:]⟩
= «by (L13)»
var c = [] ·
(⟨#c < 1 → c := c + [7]⟩ ; ⟨#c > 0 → x, c := c(0), c[1:]⟩) ⫿
(⟨#c > 0 → x, c := c(0), c[1:]⟩ ; ⟨#c < 1 → c := c + [7]⟩)
= «by (L22)»
(var c = [] · ⟨#c < 1 → c := c + [7]⟩ ; ⟨#c > 0 → x, c := c(0), c[1:]⟩) ⫿
(var c = [] · ⟨#c > 0 → x, c := c(0), c[1:]⟩ ; ⟨#c < 1 → c := c + [7]⟩)
= «by (L21)»
(var c: seq[integer] · ⟨#[] < 1 → c := [] + [7]⟩ ; ⟨#c > 0 → x, c := c(0), c[1:]⟩) ⫿
(var c: seq[integer] · ⟨#[] > 0 → x, c := [](0), [][1:]⟩ ; ⟨#c < 1 → c := c + [7]⟩)
= «by (L15), (L16), simplifications»
(var c: seq[integer] · c := [7] ; ⟨#c > 0 → x, c := c(0), c[1:]⟩) ⫿
(var c: seq[integer] · stop ; ⟨#c < 1 → c := c + [7]⟩)
= «by (L3), (L17)»
(var c: seq[integer] · c := [7] ; ⟨#c > 0 → x, c := c(0), c[1:]⟩) ⫿
stop
= «by (L10), (L19)»
var c = [7] · ⟨#c > 0 → x, c := c(0), c[1:]⟩
= «by (L21)»
var c: seq[integer] · ⟨#[7] > 0 → x, c := [7](0), [7][1:]⟩
= «by (L15), simplification»
var c: seq[integer] · x, c := 7, []
= «by (L18)»
x := 7
Synchronous vs Asynchronous Channels¶
A synchronous channel c
can be defined in terms of two asynchronous channels of capacity 1
, one for sending the data and the other for acknowledgement of receipt:
var c: channel[T]
c ! E
c ? v
=
=
=
var cs: channel[T](1)
var ca: channel[](1)
cs ! E ; ca ?
cs ? v ; ca !
In turn, an asynchronous channel c
can be defined in terms of two synchronous channels cs
, cr
, one for sending and one for receiving, with a buffer process cb
in between:
var c: channel[T](C)
c ! E
c ? v
=
=
=
var cs, cr: channel[T]
cs ! E
cr ? v
algorithm
process cb
var b: seq[T] = []
var x: T
do #b < C; cs ? x → b := b + [x]
⫿ #b > 0; cr ! b(0] → b := b[1:]
The buffer process can be implemented as a circular array, as earlier. To conclude, this shows that synchronous and asynchronous channels are equivalent. While some algorithms work with both synchronous and asynchronous channels, they do lead to a different programming style and some algorithms work only with the one and not the other.
Sorting Network¶
Suppose we have an incoming stream of N
elements and want to output them sorted on a single stream. An approach for that is a merge network: each process merges two incoming streams of numbers, picking the largest it sees on its input channels and forwards that to its output channels. The processes are arranged in a tree-like structure:
For sorting N
numbers, N - 1
processes and 2N - 1
channels are needed. The output will be a stream of N
sorted numbers. We assume that each merger process appends EOS
to its output once it receives EOS
on both inputs. Hence the output will have EOS
between N
sorted numbers. In turn, the input values have to be a value followed by EOS
.
%%writefile sorter.go
package main
import ("fmt"; "time"; "math/rand")
const EOS = -1
const N = 1 << 17 // N = 2**16
const R = 2000 // number of repetitions
func printer(in chan int) {
for {//j := 0; j < R * (N + 1); j++ {
v := <- in
if v == EOS {println()
} else {print(v, " ")
}
}
}
func merger(in1, in2 chan int, out chan int) {
for {
v1, v2 := <- in1, <- in2
for v1 != EOS || v2 != EOS {
if v1 != EOS && v2 != EOS {
if v1 <= v2 {out <- v1; v1 = <- in1
} else {out <- v2; v2 = <- in2
}
} else if v1 != EOS {
out <- v1; v1 = <- in1
} else {
out <- v2; v2 = <- in2
}
}
out <- EOS
}
}
func main() {
start := time.Now()
var c [2 * N] chan int
for i := range c {c[i] = make(chan int, 4)}
for i := 1; i < N; i++ {go merger(c[i * 2], c[i * 2 + 1], c[i])}
go func () {
for j := 0; j < R; j++ {
for i := N; i < 2 * N; i++ {c[i] <- rand.Int()%100; c[i] <- EOS}
}
} ()
//printer(c[1]) // comment out one of these lines
for k := 0; k < R; k++ {<- c[1]}
fmt.Println(time.Since(start))
}
!go run sorter.go
Question: Measure the execution time for various values of N
, R
, and capacity of the channels. What do you observe?
Answer.
- When varying
N
, the times are almost proportional. What is the explanation, given that doublingN
increases the depth of the network only by1
? - When varying
R
, say from1000
to2000
, for largeN
the times increase is less than proportional. What is the explanation, as one would expect a proportional increase? - When varying the capacity of the channel, the best times are achieved with a rather small channel. If it is make too small, up to a capacity of 1, the times increase. If it is made too large, the times increase significantly What is the explanation?
Question: Can the channels be made sychronous?
Answer. The network will deadlock with synchronous channels: starting with i = N
, two values are attempted to be sent to the leftmost merger, c[i] <- rand.Int() % 100
and c[i] <- EOS
, then c[i + 1] <- rand.Int() % 100
and c[i + 1] <- EOS
. However, the merger goroutine will read from c[i]
and c[i + 1]
first before reading from c[i]
again, so a deadlock occurs.
Active Monitors¶
Monitors are resource managers consisting of private variables and public procedures. Monitors can be implemented by processes communicating via messages. This allows for active monitors that can perform operations between requests. For example, equivalent formulations of a server are:
algorithm
monitor Counter
var a: integer = 0
var e: even = true
{e = even(a)}
procedure inc()
a := a + 1; e := ¬e
procedure even() → (r: boolean)
r := e
algorithm
var inc: channel[]
var even: channel[channel[boolean]]
process counter
var a: integer = 0
var e: even = true
{e = even(a)}
do inc ? → a := a + 1; e := ¬e
⫿ even ? r → r ! e
The corresponding clients are:
Counter.inc()
b ← Counter.even()
var res: channel[boolean]
inc !
even ! res
res ? b
Question: Can request or reply channels be buffered?
Answer. Channels can be either.
Resource Allocator in Go¶
Suppose a fixed number of resources are to be shared among a larger number of clients who repeatedly need one of the resources, but do not care which they use (say, frequencies for transmission). Each client cyclically requests a resource from the allocator, uses it, and then releases it. Requesting and releasing is done by sending messages to the allocator process. The request includes the channel on which the available resources is sent back to the client.
%%writefile allocator.go
package main
import ("time"; "math/rand")
func allocator(capacity int, request chan chan int, release chan int) {
avail := make([]bool, capacity)
for i := 0; i < capacity; i++ {avail[i] = true}
next := 0
for {
if next < capacity {
select {
case reply := <- request: {reply <- next; avail[next] = false}
case unit := <- release: avail[unit] = true
}
} else {
unit := <- release; avail[unit] = true
}
// now comes the computation that takes place between client communication
for i := 0; i < capacity; i++ {
if avail[i] {print(" ")} else {print("X")}
}
println();
next = 0; for next < capacity && !avail[next] {next++}
}
}
func client(i int, request chan chan int, release chan int) {
reply := make(chan int)
for {
request <- reply; unit := <- reply
time.Sleep(time.Second * time.Duration(rand.Int() % 5)) // sleep between 0 and 4 sec
release <- unit
}
}
func main() {
request, release := make(chan chan int), make(chan int)
go allocator(5, request, release) // 5 resources
for i := 0; i < 10; i++ {go client(i, request, release)} // 10 clients
select {}
}
!go run allocator.go
"Active objects" that communicate via asynchronous messages are also known as actors. They are used by Twitter, the Halo 4 game engine, the Facebook Chat system. Actors are the basis of Microsoft's Orleans framework
Related to actors is asynchrony in Google's Dart language via procedures (called functions) that return immediately without completing.
Timing¶
A timed event, like waiting for 3 seconds, can be signalled over a channel: an auxiliary process (goroutine) waits for 3 seconds and then signals. A timeout when waiting for a desired event is expressed by nondeterministically waiting for the desired event or the timeout event. In the following example, input from a keyboard is sent over a channel; if no input is provided within 3 seconds, a timeout occurs:
%%writefile keyboard.go
package main
import ("time"; "fmt")
func main() {
abort, enter := make (chan bool), make(chan string)
go func() {
var s string
fmt.Scanln(&s)
enter <- s
} ()
go func() {
time.Sleep(3 * time.Second)
abort <- true
} ()
fmt.Println("Enter within 3 seconds:")
select {
case s := <- enter: fmt.Println("You entered:", s)
case <- abort: fmt.Println("Timeout")
}
}
Note: Since the program accepts input from the keyboard, it cannot be run within Jupyter; use go run keyboard.go
in a terminal. The example also illustates textual input / output with the fmt
package.
Suppose a worker process has to run a job every second but may fail to finish within a second. A watchdog timer observes if the worker completes each round within the given time. If there is a timeout, the program below simply terminates.
%%writefile timeout.go
package main
import ("time"; "math/rand"; "fmt")
var tick chan bool
var done chan bool
func ticker() {
for {
time.Sleep(time.Second)
tick <- true
// fmt.Println(time.Now())
}
}
func worker() {
for { // "work" randomly 0 to 2 sec
time.Sleep(time.Second * time.Duration(rand.Int() % 3))
done <- true
}
}
func main() {
rand.Seed(time.Now().UnixNano())
tick = make(chan bool); go ticker()
done = make(chan bool); go worker()
for {
select {
case <- done: fmt.Println("done"); <- tick
case <- tick: fmt.Println("timeout"); return
}
}
}
!go run timeout.go
Above implementation of the ticker process exhibits cumulative drift: even if Sleep
would sleep for exactly one second, there is a slight delay between calls to Sleep
, so every tick
will be slightly more than one second apart, as can be observed by printing Now()
at every tick. Hence process ticker
above cannot be used reliably for a clock. For this, the time between calls to Sleep
has to be measured and sleeping reduced by that amount. The time
library provides Ticker
objects that implement that. They are created with NewTicker(duration)
and have a field C
with a channel that continuously ticks by sending the time of the tick:
%%writefile ticker.go
package main
import ("time"; "math/rand")
var done chan bool
func worker() {
for { // "work" 0 to 2 sec
time.Sleep(time.Second * time.Duration(rand.Int() % 3))
done <- true
}
}
func main() {
rand.Seed(time.Now().UnixNano())
ticker := time.NewTicker(time.Second)
done = make(chan bool); go worker()
for {
select {
case <- done: println("done"); <- ticker.C
case <- ticker.C: println("timeout"); return
}
}
}
!go run ticker.go