Skip to content

Lecture 8.3: Concurrency

This week, we have seen two examples of state machines: a model of a vending machine, and the control logic for a hardware security module. For both of these examples, their real-life implementations likely are implemented by a state machine very close to what we presented --- the only difference being the programming language.

However, there's another reason to use a state machine: they can serve as models of distributed systems, where multiple computers are jointly working together to accomplish a task. Distributed systems are ubiquitous in computing --- including distributed data stores like Cassandra and MongoDB that are used for the backends of applications; protocols for the internet like DNS and NTP; and coordination protocols for supercomputers, like Jupiter. The tricky part of all of these systems is that they consist of multiple, independent computing nodes that run independent programs, but must coordinate with one another. How do we talk about a distributed system's correctness, when it doesn't even fit within a single program?

A common approach, taken by tools like TLA+ and SPIN is to model the entire system as a single state machine. In effect, we pretend for the purpose of modeling that the entire system is running on a single machine. We do so by having the state of the state machine be a global state, which contains:

  • the state of each individual machine; and
  • the state of all messages "in flight" between the nodes.

A simple example

Let's say we have two nodes sending messages back and forth repeatedly. Each time a node receives a message i, it will send back i + 1 to the other one.

First, let's define a type of buffers that models the ability for nodes to send and receive messages via queue :

module Buffer = struct
    type t = int list
    let empty = []
    let push x xs = xs @ [x]
    let pop xs = 
      match xs with 
      | x :: xs' -> Some (x, xs')
      | [] -> None
end

Now, we can define the inputs and outputs of our system. The inputs will consist of queries for each node to run:

type sys_input = | StepA | StepB
while the outputs will have the nodes report what it sent:
type sys_output = | SendA of int | SendB of int | NoOutput
We also allow the possibility of no output being generated.

Now, we can define the system's state:

type sys_state = {
    nodeA_to_B : Buffer.t;
    nodeB_to_A : Buffer.t;
}

Now --- and this is the new part --- we define helper functions that describe how each node should step. Node A steps by pulling a message of its queue from B, adding one to the value, and sending a message back to B. Additionally, it will create an external output. If nothing is in its input buffer, it does nothing.

let step_nodeA (s : sys_state) : sys_state * sys_output = 
    match Buffer.pop s.nodeB_to_A with
    | None -> (s, NoOutput)
    | Some (v, rest) -> 
        let v' = v + 1 in 
        let s' = {nodeB_to_A = rest; nodeA_to_B = Buffer.push v' s.nodeA_to_B } in 
        (s', SendA v')

Node B's code is symmetric.

let step_nodeB (s : sys_state) : sys_state * sys_output = 
    match Buffer.pop s.nodeA_to_B with
    | None -> (s, NoOutput)
    | Some (v, rest) -> 
        let v' = v + 1 in 
        let s' = {nodeA_to_B = rest; nodeB_to_A = Buffer.push v' s.nodeB_to_A } in 
        (s', SendB v')

Now, we can create the whole state machine by taking in the StepA / StepB commands and stepping each one accordingly:

let sys_step (s : sys_state) (i : sys_input) : sys_state * sys_output = 
    match i with
    | StepA -> step_nodeA s
    | StepB -> step_nodeB s

We can then run the system by having an initial state:

let init_sys = {
    nodeA_to_B = Buffer.empty;
    nodeB_to_A = Buffer.push 0 (Buffer.empty);
}
We begin with an initial state that has a message from B to A.

We can then use utop to see the state and outputs evolve:

utop # let s = init_sys;;
val s : sys_state = {nodeA_to_B = []; nodeB_to_A = [0]}

utop # let (s, o) = sys_step s StepA;;
val s : sys_state = {nodeA_to_B = [1]; nodeB_to_A = []}
val o : sys_output = SendA 1

utop # let (s, o) = sys_step s StepB;;
val s : sys_state = {nodeA_to_B = []; nodeB_to_A = [2]}
val o : sys_output = SendB 2

utop # let (s, o) = sys_step s StepA;;
val s : sys_state = {nodeA_to_B = [3]; nodeB_to_A = []}
val o : sys_output = SendA 3

utop # let (s, o) = sys_step s StepB;;
val s : sys_state = {nodeA_to_B = []; nodeB_to_A = [4]}
val o : sys_output = SendB 4

Scaling the Example Up

Let's make the example a bit more realistic. A big use case of distributed systems, in tools like Apache Kafka, is to support pipelines (or streams) of programming tasks. The idea here is that we have multiple complicated functions we want to compute, in sequence:

type t
val f : t -> t
val g : t -> t
val h : t -> t

Given an x, we want to compute h (g (f x)). However, doing this all on one computer is too much: for example, the functions might be very expensive to compute. Instead, we can pipeline it like an assembly line: node A gets inputs x, and turns them into f x; then, node B gets inputs y and turns then into g y; finally, node C gets inputs z and turns them into h z. In total, we are computing h (g (f x)), but in such a way that node A can start computing f x', on a new input x', as soon as it finishes f x; it doesn't need to wait for node C to compute h z.

To specify the correctness of this system, we want to say that if the last node outputs a value v, that value must be equal to h (g (f x)), where x was a prior input to node A. Because of this, we are going to associate each value with a tag, which uniquely identifies each value. Additionally, we will modify our buffer to hold tagged values. Tags will be created uniquely by the first node whenever it gets a value.

type value = int
type tag = string
module Buffer = struct
    type t = (tag * value) list
    let empty = []
    let push x xs = xs @ [x]
    let pop xs = 
      match xs with 
      | x :: xs' -> Some (x, xs')
      | [] -> None
end

Now, we are going to design our pipelined system. For simplicity, we will only have two nodes -- node A computing f and node B computing g -- just like in our first example. Our pipelined system will have four kinds of inputs:

type pipeline_input = | Push of value | StepA | StepB | Pop
In addition to StepA and StepB, we can also "push" new inputs values onto node A, and "pop" outputs from node B.

Our outputs will be of three kinds:

type pipeline_output = 
    | NoOutput
    | PushedValue of tag * value
    | PoppedValue of tag * value
Here, NoOutput stands for the system not having an output; while node A will report PushedValue (t, v) whenever we pushed value v, which caused a fresh tag t to be generated; while node B will report PoppedValue (t, v) when value v is popped off of the pipeline with tag t. StepA and StepB will generate NoOutput, while Push will cause PushedValue to be generated, and Pop will cause PoppedValue to be generated (when there is an output ready).

Specifying a valid trace

We can now specify a valid trace. We want a correctness property: if we received value (t, v) popped off the pipeline, then we must have pushed a v' onto the pipeline, with the same generated tag, and v = g (f v).

let valid_trace (tr : (pipeline_input * pipeline_output) list)  = 
      (* if Output (t, v) is in trace, then 
          exists v', ReceivedInput (t, v') in trace and 
            v = g (f v')
      *)

      List.for_all (fun (_, o) ->
        match o with 
        | PoppedValue (t, v) -> 
            List.exists (fun (_, o') -> 
              match o' with 
              | PushedValue (t', v') -> 
                  t = t' && v = (g (f v'))
              | _ -> false
            ) tr
        | _ -> true
      ) tr

Implementing the System

We let the state of our state machine be three buffers: one for input to node A; one from node A to node B; and one for node B to output on. We additionally have some state, held by node A, to represent a counter for generating fresh tags.

(* Global state of all nodes *)
type pipeline_state = {
    ctr : int; (* Node A's state for counter, for tagging *)
    start_to_nodeA : Buffer.t; (* Buffer for start -> node A *)
    nodeA_to_nodeB : Buffer.t; (* Buffer for node A -> node B *)
    nodeB_to_end : Buffer.t (* Buffer for node B -> end *)
}

Let's now define the expensive computations that each node should run.

(* Computation functions to be run by nodeA and nodeB, respectively. *)
(* These stand in for expensive computations that should be run in parallel by multiple nodes. *)
let f v = v * 2
let g v = v + 2

Now, we define the functions for nodes A and B to execute whenever we query them to step:

let step_nodeA (s : pipeline_state) : pipeline_state = 
    match Buffer.pop s.start_to_nodeA with 
    | None -> s
    | Some ((t, v), rest) -> 
        let v' = f v in 
        {s with start_to_nodeA = rest; nodeA_to_nodeB = Buffer.push (t, v') s.nodeA_to_nodeB}

let step_nodeB (s : pipeline_state) : pipeline_state = 
    match Buffer.pop s.nodeA_to_nodeB with 
    | None -> s
    | Some ((t, v), rest) -> 
        let v' = g v in 
        {s with nodeA_to_nodeB = rest; nodeB_to_end = Buffer.push (t, v') s.nodeB_to_end}
Node A operates by reading a message from start_to_nodeA, computing f of that message, and sending the result to nodeA_to_nodeB. Node B is similar, but going from nodeA_to_nodeB to nodeB_to_end.

We then define two other helper functions for pushing and popping from the pipeline.

let step_push (s : pipeline_state) (x : value) : pipeline_state * tag = 
    let tag = string_of_int (s.ctr) in 
    let s' = {s with ctr = s.ctr + 1; start_to_nodeA = Buffer.push (tag, x) s.start_to_nodeA} in 
    (s', tag)

let step_pop (s : pipeline_state) : pipeline_state * (tag * value) option = 
    match Buffer.pop s.nodeB_to_end with 
    | None -> (s, None)
    | Some ((t, v), rest) -> 
        let s' = {s with nodeB_to_end = rest} in 
        (s', Some (t, v))
Here, step_push, given a state and a value to push, returns a new state and the freshly generated tag for that value. On the other side, step_pop tries to pop from nodeB_to_end; if it succeeds, we return it.

Finally, we weave these step functions together into a global state machine:

let pipeline_step (s : pipeline_state) (i : pipeline_input) : pipeline_state * pipeline_output = 
    match i with 
    | Push x -> 
        let (s', tag) = step_push s x in 
        (s', PushedValue (tag, x))
    | StepA -> let s' = step_nodeA s in (s', NoOutput)
    | StepB -> let s' = step_nodeB s in (s', NoOutput)
    | Pop -> 
        let (s', res) = step_pop s in 
        match res with 
        | None -> (s', NoOutput)
        | Some (t, v) -> (s', PoppedValue (t, v))