On this page:
1 Purpose
2 Outline
3 Part 1
4 Part 2
5 Part 3
6 Part 4
7 Part 5
8 Part 6
8.15

Lecture 27: Concurrency Practice🔗

1 Purpose🔗

Model a parallel file traversal program that has early exit once it finds a match.

2 Outline🔗

Today, we are going to define a search procedure that searches the contents of each file in a list of files for a given string (using string-contains?), but does the actual string search in separate processes (a set of them) from the coordination process (which traverses the files, creating jobs for each file it encounters).

The coordinating process should only send files to search to workers that are not currently searching through a file, and once a matching file has been found, it should terminate with the path to that file, not searching any more files.

We can express the path to a particular file as a list of strings, which are the sequence of directories that lead to the file, followed by the files name. As a contract:

(define-struct path [elts])
(define-struct file [path contents])
(define-contract Path (Struct path [(List String)]))
(define-contract File (Struct file [Path String]))
(define FS0 (list (make-file (make-path (list "D" "e")) "yow!")
                  (make-file (make-path (list "D" "f")) "eek")))
(define FS1 (list (make-file (make-path (list "A" "b")) "hello there")
                  (make-file (make-path (list "A" "c")) "goodbye")
                  (make-file (make-path (list "A" "D" "e")) "yow!")
                  (make-file (make-path (list "A" "D" "f")) "eek")))
3 Part 1🔗

First, we define the state for the coordinating process. Unlike with parallel map (see Lecture 26: Concurrency II), we do not have to collect multiple results: our final state is a single path or #f if there was no match. Our intermediate state, however, is more complicated, since we need to only go through the (List File) enough to find the next job for a worker.

The state for our coordinating process can thus be expressed as:

(define-contract CoordState (OneOf Path (List File) False))

i.e., either we have found the desired file and it is at path Path (and are done), or we still have files to look at in (List File), or we have searched everything and found nothing, so have state False.

The next task is to produce an initial set of jobs. i.e., given a list of workers, we want to give each of them one a Path to a file and the String that is its contents. Your function should also return the remaining unvisited files as the state.

Define a function, initial-jobs, with signature (-> (List String) (List File) (Tuple (List (SendPacket File)) (List File))). You may want to define a helper (local or otherwise) with accumulators to do this.

Note: this should have a lot of similarity to some versions of our map reduce program from Lecture 27: Concurrency Practice.

Solution:
(: initial-jobs (-> (List String) (List File) (Tuple (List (SendPacket File)) (List File))))
(define (initial-jobs ps fs)
  (local ((define (ijs ms ps fs)
            (cond [(empty? fs) (list ms fs)]
                  [(empty? ps) (list ms fs)]
                  [else (ijs (cons (send-packet (first ps) (first fs)) ms)
                             (rest ps)
                             (rest fs))])))
    (ijs empty ps fs)))

4 Part 2🔗

At this point, we can define a function coord-start, that takes a (List File) and returns a function from (List String) to (Action CoordState), i.e., (-> (List File) (-> (List String) (Action CoordState))); this is the initial set of messages and state for the coordinating process.

(: coord-start (-> (List File) (-> (List String) (Action CoordState))))
(define (coord-start fs)
  (lambda (others)
    (let ((js (initial-jobs others fs)))
      (action (second js) (first js)))))
5 Part 3🔗

Now we want to defined coord-receive. Our worker processes will always send a message when they finish processing a file, which will be a (Maybe Path): if it is a Path, that means there was a match, and we should stop traversing and return that Path. If it is #f, it means they did not find a match, and we should give them another file to look at. If there are no more files to search, we should return our final state of #f.

This is somewhat tricky, as we need to think about all the possible states the coordinating process can be in, and how it can respond to each of the possible values. The easiest way to do this is to enumerate all six; if we later want to compress the code (i.e., realizing the first three cases can all be the same), we can, but easier to get it correct this way.

Solution:
(: coord-receive (-> CoordState (ReceivePacket (Maybe Path)) (Action CoordState)))
(define (coord-receive st pkt)
  (cond [(and (path? st) (path? (receive-packet-msg pkt)))
         ; Already found a match, disregard new one.
         (action st (list))]
        [(and (list? st) (path? (receive-packet-msg pkt)))
         ; Found a match, stop processing.
         (action (receive-packet-msg pkt) (list))]
        [(and (equal? st #f) (path? (receive-packet-msg pkt)))
         ; Was out of files, but found a match
         (action (receive-packet-msg pkt) (list))]
        [(and (path? st) (equal? (receive-packet-msg pkt) #f))
         ; Already found a match, message.
         (action st (list))]
        [(and (list? st) (equal? (receive-packet-msg pkt) #f))
         ; Didn't find a match, action depends on whether files left
         (if (cons? st)
           ; More files to search, send one back
           (action (rest st) (list (send-packet (receive-packet-from pkt)
                                                (first st))))
           ; No more files, so state is #f
           (action #f (list)))]
        [(and (equal? st #f) (equal? (receive-packet-msg pkt) #f))
         ; No more files and no match
         (action st (list))]))

6 Part 4🔗

Now we need to define the state for our worker process. Thinking about the task, we realize that the worker doesn’t need to remember anything: each job that it receives has the contents to search, and after searching, it can send back to the coordinating process its findings. So the state for the worker can be:

(define-contract WorkerState False)

This means our first task is to define worker-start. The worker shouldn’t send any messages initially, so this is quite easy.

Solution:
(: worker-start (-> (List String) (Action False)))
(define (worker-start others)
  (action #f (list)))

7 Part 5🔗

Next let’s define worker-receive. Define this as a function that takes the string to search for and returns a function that conforms to the receive protocol. i.e., it should have signature (-> String (-> WorkerState (ReceiveMessage ...) (Action WorkerState))).

The worker expects to receive messages from the coordinating process, formated according to Part 1. The worker should then search through the file contents with string-contains?, and then either send a message back with the Path if there was a match or a message with #f if there was no match.

(: worker-receive (-> String (-> WorkerState (ReceivePacket File) (Action WorkerState))))
(define (worker-receive str)
  (lambda (st pkt)
    (action #f
            (list (send-packet
                   (receive-packet-from pkt)
                   (if (string-contains? (file-contents (receive-packet-msg pkt)) str)
                       (file-path (receive-packet-msg pkt))
                       #f))))))
8 Part 6🔗

Now, put all of these together in a function find-file-with with signature (-> (List File) String (Maybe Path)).

It should define, locally, processes for "coord" and at least three "worker" procesess (name them "worker1", "worker2", "worker3").

Then invoke start, using any scheduler you want (e.g., random-ref from Lecture 26: Concurrency II), and extract out the final result from the "coord" process for the return type.

(: find-file-with (-> (List File) String (Maybe Path)))
(define (find-file-with fs str)
  (second
    (assoc "coord"
           (start-debug first (list (process (name "coord")
                                             (on-start (coord-start fs))
                                             (on-receive coord-receive))
                                    (process (name "worker1")
                                             (on-start worker-start)
                                             (on-receive (worker-receive str)))
                                    (process (name "worker2")
                                             (on-start worker-start)
                                             (on-receive (worker-receive str)))
                                    (process (name "worker3")
                                             (on-start worker-start)
                                             (on-receive (worker-receive str))))))))
(check-expect (find-file-with FS1 "hello") (make-path '("A" "b")))

;;;; (packet #:from "coord" #:to "worker1" #:msg (make-file (make-path '("A" "b")) "hello there"))

;;;; (state #:process "worker1" #:value #f)

;;;; (packet #:from "worker1" #:to "coord" #:msg (make-path '("A" "b")))

;;;; (state #:process "coord" #:value (make-path '("A" "b")))

;;;; (packet #:from "coord" #:to "worker2" #:msg (make-file (make-path '("A" "c")) "goodbye"))

;;;; (state #:process "worker2" #:value #f)

;;;; (packet #:from "worker2" #:to "coord" #:msg #f)

;;;; (state #:process "coord" #:value (make-path '("A" "b")))

;;;; (packet #:from "coord" #:to "worker3" #:msg (make-file (make-path '("A" "D" "e")) "yow!"))

;;;; (state #:process "worker3" #:value #f)

;;;; (packet #:from "worker3" #:to "coord" #:msg #f)

;;;; (state #:process "coord" #:value (make-path '("A" "b")))

(check-expect (find-file-with FS1 "blah") #f)

;;;; (packet #:from "coord" #:to "worker1" #:msg (make-file (make-path '("A" "b")) "hello there"))

;;;; (state #:process "worker1" #:value #f)

;;;; (packet #:from "worker1" #:to "coord" #:msg #f)

;;;; (state #:process "coord" #:value '())

;;;; (packet #:from "coord" #:to "worker1" #:msg (make-file (make-path '("A" "D" "f")) "eek"))

;;;; (state #:process "worker1" #:value #f)

;;;; (packet #:from "worker1" #:to "coord" #:msg #f)

;;;; (state #:process "coord" #:value #f)

;;;; (packet #:from "coord" #:to "worker2" #:msg (make-file (make-path '("A" "c")) "goodbye"))

;;;; (state #:process "worker2" #:value #f)

;;;; (packet #:from "worker2" #:to "coord" #:msg #f)

;;;; (state #:process "coord" #:value #f)

;;;; (packet #:from "coord" #:to "worker3" #:msg (make-file (make-path '("A" "D" "e")) "yow!"))

;;;; (state #:process "worker3" #:value #f)

;;;; (packet #:from "worker3" #:to "coord" #:msg #f)

;;;; (state #:process "coord" #:value #f)