Lecture 27: Concurrency II
1 Purpose
Show a more involved example with Map Reduce
2 Outline
One use of concurrency is to enable faster data processing in the case when you have enough hardware (either separate processors on a single machine, or separate physical machines). This has been useful since at least the 1970s, but has become more and more important recently, as physical limitations have made it harder and harder to improve single threaded performance better, and thus speeding up computation at all now relies more on running things in parallel. Another reason for concurrency, even without multiple processors, is that certain operations (like waiting for network requests, reading files, etc) take a long time, and your program can do other work while it is waiting for those operations to complete.
Today we’ll focus on parallel data processing. There are now many sophisticated algorithms for approaching this, but one of the simplest ones that was none-the-less used for a while by Google to regenerate its web index is called MapReduce.
The name comes from the list abstraction map, and a reduce, a name for a restricted version of fold where the input and output types are the same.
The idea of the architecture is to separate jobs into separate map steps and then reduce steps. The map steps can happen entirely in parallel, on different physical machines if desired. Then, the results are gradually combined with reduce steps that each take the output of 2 (or more) map steps and combine them together. This continues to happen until there is only a single result left.
Concurrent map
We’ll start by just implementing map using our message passing primitives. First, we’ll do it with just two processes: one will traverse the list, passing data off to the other, which will apply the function, and then pass the results back to the first. The first will collect the results and return them. One interesting consequence of the nondeterminism of scheduling: unless we do extra work, the order of elements in the result may not match the order that were in the input! But we won’t worry about this for now.
(define (random-ref l) (list-ref l (random (length l))))
(: par-map (All (X Y) (-> (-> X Y) (List X) (List Y))))
(define (par-map f l) (local ((: traverse-start (-> (List String) (Action (List Any)))) (define (traverse-start others) (action empty (map (lambda (x) (send-packet "map" x)) l))) (: traverse-receive (-> (List Any) (ReceivePacket Any) (Action (List Any)))) (define (traverse-receive state pkt) (action (cons (receive-packet-msg pkt) state) (list))) (: map-start (-> (List String) (Action False))) (define (map-start others) (action #f (list))) (: map-receive (-> False (ReceivePacket Any) (Action False))) (define (map-receive state pkt) (action #f (list (send-packet "traverse" (f (receive-packet-msg pkt))))))) (second (assq "traverse" (start random-ref (list (process (name "traverse") (on-start traverse-start) (on-receive traverse-receive)) (process (name "map") (on-start map-start) (on-receive map-receive)))))))) (par-map add1 (list 1 2 3 4)) '(5 4 3 2)
Exercise: Now that we have separated where the mapping is happening, let’s make there be multiple "map" processes.
What is the challenge?
This is more complicated than just duplicating the process, since, first: processes need unique names, but more importantly, the "traverse" process needs to know where to send the values to!
So while we can start by just creating two "map" processes, and still do all the work in one:
Note: in this all subsequent examples, I’ve presented this in a more compact manner, without local definition or signatures. Translating them to the above form is a good exercise, if you are looking for more practice!
(define (par-map2 f l) (start random-ref (list (process (name "traverse") (on-start (lambda (others) (action empty (map (lambda (x) (send-packet "map1" x)) l)))) (on-receive (lambda (state pkt) (action (cons (receive-packet-msg pkt) state) (list))))) (process (name "map1") (on-start (lambda (others) (action #f (list)))) (on-receive (lambda (st pkt) (action #f (list (send-packet "traverse" (f (receive-packet-msg pkt)))))))) (process (name "map2") (on-start (lambda (others) (action #f (list)))) (on-receive (lambda (st pkt) (action #f (list (send-packet "traverse" (f (receive-packet-msg pkt))))))))))) (par-map2 add1 (list 1 2 3 4)) '(("traverse" (5 4 3 2)) ("map2" #f) ("map1" #f))
This works, of course, but if we ran this with start-debug, we could see that "map2" never does any work. This isn’t an actual solution. A better option is to have "traverse" alternate sending jobs to "map1" and "map2":
(define (par-map3 f l) (local [(define (traverse b l) (cond [(empty? l) empty] [(cons? l) (cons (send-packet (if b "map1" "map2") (first l)) (traverse (not b) (rest l)))]))] (start-debug random-ref (list (process (name "traverse") (on-start (lambda (others) (action empty (traverse #t l)))) (on-receive (lambda (state pkt) (action (cons (receive-packet-msg pkt) state) (list))))) (process (name "map1") (on-start (lambda (others) (action #f (list)))) (on-receive (lambda (st pkt) (action #f (list (send-packet "traverse" (f (receive-packet-msg pkt)))))))) (process (name "map2") (on-start (lambda (others) (action #f (list)))) (on-receive (lambda (st pkt) (action #f (list (send-packet "traverse" (f (receive-packet-msg pkt)))))))))))) (par-map3 add1 (list 1 2 3 4))
;;;; (packet #:from "traverse" #:to "map1" #:msg 1)
;;;; (state #:process "map1" #:value #f)
;;;; (packet #:from "map1" #:to "traverse" #:msg 2)
;;;; (state #:process "traverse" #:value '(2))
;;;; (packet #:from "traverse" #:to "map1" #:msg 3)
;;;; (state #:process "map1" #:value #f)
;;;; (packet #:from "traverse" #:to "map2" #:msg 2)
;;;; (state #:process "map2" #:value #f)
;;;; (packet #:from "map1" #:to "traverse" #:msg 4)
;;;; (state #:process "traverse" #:value '(4 2))
;;;; (packet #:from "map2" #:to "traverse" #:msg 3)
;;;; (state #:process "traverse" #:value '(3 4 2))
;;;; (packet #:from "traverse" #:to "map2" #:msg 4)
;;;; (state #:process "map2" #:value #f)
;;;; (packet #:from "map2" #:to "traverse" #:msg 5)
;;;; (state #:process "traverse" #:value '(5 3 4 2))
'(("traverse" (5 3 4 2)) ("map2" #f) ("map1" #f))
Here we have made a helper function, traverse, that goes through the list and alternates which other process it sends the packet to.
Exercise: further generalize to have more than two "map" processes.
(define (par-map4 f l) (local [(define (next-workers ps) (append (rest ps) (list (first ps)))) (define (traverse ps l) (let ([next (next-workers ps)]) (cond [(empty? l) empty] [(cons? l) (cons (send-packet (first next) (first l)) (traverse next (rest l)))]))) (define (map-process nm) (process (name nm) (on-start (lambda (others) (action #f (list)))) (on-receive (lambda (st pkt) (action #f (list (send-packet "traverse" (f (receive-packet-msg pkt)))))))))] (start random-ref (list (process (name "traverse") (on-start (lambda (others) (action empty (traverse others l)))) (on-receive (lambda (state pkt) (action (cons (receive-packet-msg pkt) state) (list))))) (map-process "map1") (map-process "map2") (map-process "map3"))))) (par-map4 add1 (list 1 2 3 4)) '(("traverse" (5 2 4 3)) ("map2" #f) ("map1" #f) ("map3" #f))
Here we’ve amended traverse to take a list of process names to work through, and at each step, we rotate the list.
There is one more version of this we can make: rather than having traverse go through the whole list at the beginning, what it can do is just go through as many elements as their are processes. Then, when it receives a message, it will send back another element to the process which completed it. This would be a much better approach if certain elements took much longer to process than others.
Exercise: change par-map4 to only start send out as many elements as there are processes at first, and change the on-receive handler for traverse to send another element back, provided there are any to send.
(define-struct pm [orig transformed])
(define (par-map5 f l) (local [(define (traverse ms ps l) (cond [(empty? l) (action (make-pm l empty) ms)] [(empty? ps) (action (make-pm l empty) ms)] [(cons? l) (traverse (cons (send-packet (first ps) (first l)) ms) (rest ps) (rest l))])) (define (map-process nm) (process (name nm) (on-start (lambda (others) (action #f (list)))) (on-receive (lambda (st pkt) (action #f (list (send-packet "traverse" (f (receive-packet-msg pkt)))))))))] (start random-ref (list (process (name "traverse") (on-start (lambda (others) (traverse empty others l))) (on-receive (lambda (state pkt) (let ([new-tr (cons (receive-packet-msg pkt) (pm-transformed state))]) (if (empty? (pm-orig state)) (action (make-pm empty new-tr) (list)) (action (make-pm (rest (pm-orig state)) new-tr) (list (send-packet (receive-packet-from pkt) (first (pm-orig state)))))))))) (map-process "map1") (map-process "map2") (map-process "map3"))))) (par-map5 add1 (list 1 2 3 4))
(list
(list "traverse" (make-pm '() '(2 4 5 3)))
'("map2" #f)
'("map1" #f)
'("map3" #f))
Moving from parallel map to map reduce
Part of why we weren’t concerned about the order of elements being wrong is that our ultimate goal was not parallel map, but rather, map followed by reduce; i.e., we intend to have a single value at the end, which is made by combining, pairwise, all values that are produced by map.
We can make a first version of this by starting with out last version of par-map and having our traverse process combine values as it receives them:
(define-struct pm [orig transformed])
(define (map-reduce map-fn reduce-fn l) (local [(define (traverse ms ps l) (cond [(empty? l) (action (make-pm l #f) ms)] [(empty? ps) (action (make-pm l #f) ms)] [(cons? l) (traverse (cons (send-packet (first ps) (first l)) ms) (rest ps) (rest l))])) (define (map-process nm) (process (name nm) (on-start (lambda (others) (action #f (list)))) (on-receive (lambda (st pkt) (action #f (list (send-packet "traverse" (map-fn (receive-packet-msg pkt)))))))))] (start random-ref (list (process (name "traverse") (on-start (lambda (others) (traverse empty others l))) (on-receive (lambda (state pkt) (let ([new-tr (if (false? (pm-transformed state)) (receive-packet-msg pkt) (reduce-fn (receive-packet-msg pkt) (pm-transformed state)))]) (if (empty? (pm-orig state)) (action (make-pm empty new-tr) (list)) (action (make-pm (rest (pm-orig state)) new-tr) (list (send-packet (receive-packet-from pkt) (first (pm-orig state)))))))))) (map-process "map1") (map-process "map2") (map-process "map3"))))) (map-reduce add1 + (list 1 2 3 4))
(list
(list "traverse" (make-pm '() 14))
'("map2" #f)
'("map1" #f)
'("map3" #f))
But this is doing all the reduce work in one process! We want the reducing to be concurrent as well! Let’s first do this by renaming out "map" processes to "worker" processes and have them take, as a message, a Job that tells them to either map or reduce.
(define-struct pm [orig queue]) (define-struct map-job [v]) (define-struct reduce-job [v1 v2]) (define-contract (Job X) (OneOf (MapJob X) (ReduceJob X X)))
(define (map-reduce map-fn reduce-fn l) (local [(define (traverse ms ps l) (cond [(empty? l) (action (make-pm l empty) ms)] [(empty? ps) (action (make-pm l empty) ms)] [(cons? l) (traverse (cons (send-packet (first ps) (make-map-job (first l))) ms) (rest ps) (rest l))])) (define (worker-process nm) (process (name nm) (on-start (lambda (others) (action #f (list)))) (on-receive (lambda (st pkt) (let ([msg (receive-packet-msg pkt)]) (action #f (list (send-packet "traverse" (cond [(map-job? msg) (map-fn (map-job-v msg))] [(reduce-job? msg) (reduce-fn (reduce-job-v1 msg) (reduce-job-v2 msg))])))))))))] (start random-ref (list (process (name "traverse") (on-start (lambda (others) (traverse empty others l))) (on-receive (lambda (state pkt) (cond [(and (empty? (pm-orig state)) (empty? (pm-queue state))) (action (make-pm empty (list (receive-packet-msg pkt))) (list))] [(and (cons? (pm-orig state)) (empty? (pm-queue state))) (action (make-pm (rest (pm-orig state)) (list (receive-packet-msg pkt))) (list (send-packet (receive-packet-from pkt) (make-map-job (first (pm-orig state))))))] [(and (empty? (pm-orig state)) (cons? (pm-queue state))) (action (make-pm empty empty) (list (send-packet (receive-packet-from pkt) (make-reduce-job (receive-packet-msg pkt) (first (pm-queue state))))))] [(and (cons? (pm-orig state)) (cons? (pm-queue state))) (action (make-pm (rest (pm-orig state)) empty) (list (send-packet (receive-packet-from pkt) (make-reduce-job (cons (receive-packet-msg pkt) (pm-queue state)))) (send-packet (receive-packet-from pkt) (make-map-job (first (pm-orig state))))))])))) (worker-process "worker1") (worker-process "worker2") (worker-process "worker3"))))) (map-reduce add1 + (list 1 2 3 4))
(list
(list "traverse" (make-pm '() '(14)))
'("worker1" #f)
'("worker2" #f)
'("worker3" #f))
So, when we initially traverse the list, and when "traverse" sends addition jobs, these should all be "map" jobs. This means we need to do some extra work deciding how to send messages from the "traverse" process. One odd thing about this design; in the case that there are still values to send out, and we have two values to reduce, in this design we send both back to the same worker.
We could fix this by, instead, not starting reducing until all the map jobs are finished. There isn’t any reason we need to wait, but it would mean that we are only sending one job to each worker at a time, which might be a good design.
(define-struct pm [orig queue]) (define-struct map-job [v]) (define-struct reduce-job [v1 v2]) (define-contract (Job X) (OneOf (MapJob X) (ReduceJob X X)))
(define (map-reduce map-fn reduce-fn l) (local [(define (traverse ms ps l) (cond [(empty? l) (action (make-pm l empty) ms)] [(empty? ps) (action (make-pm l empty) ms)] [(cons? l) (traverse (cons (send-packet (first ps) (make-map-job (first l))) ms) (rest ps) (rest l))])) (define (worker-process nm) (process (name nm) (on-start (lambda (others) (action #f (list)))) (on-receive (lambda (st pkt) (let ([msg (receive-packet-msg pkt)]) (action #f (list (send-packet "traverse" (cond [(map-job? msg) (map-fn (map-job-v msg))] [(reduce-job? msg) (reduce-fn (reduce-job-v1 msg) (reduce-job-v2 msg))])))))))))] (start random-ref (list (process (name "traverse") (on-start (lambda (others) (traverse empty others l))) (on-receive (lambda (state pkt) (cond [(cons? (pm-orig state)) (action (make-pm (rest (pm-orig state)) (cons (receive-packet-msg pkt) (pm-queue state))) (list (send-packet (receive-packet-from pkt) (make-map-job (first (pm-orig state))))))] [(empty? (pm-queue state)) (action (make-pm empty (list (receive-packet-msg pkt))) (list))] [else (action (make-pm empty (rest (pm-queue state))) (list (send-packet (receive-packet-from pkt) (make-reduce-job (receive-packet-msg pkt) (first (pm-queue state))))))])))) (worker-process "worker1") (worker-process "worker2") (worker-process "worker3"))))) (map-reduce add1 + (list 1 2 3 4))
(list
(list "traverse" (make-pm '() '(14)))
'("worker1" #f)
'("worker2" #f)
'("worker3" #f))
Note this did not change the code in the "worker" process at all – it just changed how the "traverse" process managed the workers. We could change it further, if we wanted, to, e.g., have different pools of workers for map and for reduce (as perhaps these tasks, in reality, have different hardware requirements: it might be that "map" requires lots of memory, but not a lot of cpu, but "reduce" is the opposite. Running these on the same hardware is wasteful.)