Lecture 28: Concurrency Lab
1 Purpose
Model a parallel file traversal program that has early exit once it finds a match.
2 Outline
This class will be structured as a lab, just like Lecture 18: Programming Against Interfaces.
Consider the following tree that represents a file system:
(define-struct file (name contents)) (define-struct directory (name elements))
(define-contract FileSystem (OneOf (File String String) (Directory String (List FileSystem))))
(define FS0 (make-directory "D" (list (make-file "e" "yow!") (make-file "f" "eek"))))
(define FS1 (make-directory "A" (list (make-file "b" "hello there") (make-file "c" "goodbye") FS0 (make-directory "G" (list)))))
Your task is to define a search procedure that searches the contents of each file for a given string (using string-contains?), but does the actual string search in separate processes (a set of them) from the coordination process (which traverses the tree, creating jobs for each file it encounters).
The coordinating process should only send files to search to workers that are not currently searching through a file, and once a matching file has been found, it should terminate with the path to that file, not searching any more files.
We can express the path to a particular file as a list of strings, which are the sequence of directories that lead to the file, followed by the files name. As a contract:
(define-struct path [elts]) (define-contract Path~ (Path (List String)))
3 Task 1
First, we define the state for the coordinating process. Unlike with parallel map (see Lecture 27: Concurrency II), we do not have to collect multiple results: our final state is a single path or #f if there was no match. Our intermediate state, however, is more complicated, since we need to only traverse the FileSystem enough to find the next job for a worker.
In order to handle this, we want to first flatten the FileSystem into a list of Path~s paired with file contents.
This way, we can represent the files to be processed as a (List (File Path~ String)).
So your first task is to define a function, flatten-fs, that has signature (-> FileSystem (List (File Path~ String))).
(: flatten-fs (-> FileSystem (List (File Path~ String))))
(define (flatten-fs fs) (cond [(file? fs) (list (make-file (make-path (list (file-name fs))) (file-contents fs)))] [(directory? fs) (apply append (map (lambda (de) (map (lambda (x) (make-file (make-path (cons (directory-name fs) (path-elts (file-name x)))) (file-contents x))) (flatten-fs de))) (directory-elements fs)))]))
(check-expect (flatten-fs FS0) (list (make-file (make-path (list "D" "e")) "yow!") (make-file (make-path (list "D" "f")) "eek")))
(check-expect (flatten-fs FS1) (list (make-file (make-path (list "A" "b")) "hello there") (make-file (make-path (list "A" "c")) "goodbye") (make-file (make-path (list "A" "D" "e")) "yow!") (make-file (make-path (list "A" "D" "f")) "eek")))
4 Task 2
The state for our coordinating process can now be expressed as:
(define-contract CoordState (OneOf Path~ (List (File Path~ String)) False))
i.e., either we have found the desired file, and it is at path Path~, or we still have files to look at in (List (File Path String)), or we have searched everything and found nothing, so have state False.
The next task is to produce an initial set of jobs. i.e., given a list of workers, we want to give each of them one a Path~ to a file and the String that is its contents. Your function should also return the remaining unvisited files.
Define a function, initial-jobs, with signature (-> (List String) (List (File Path~ String)) (Tuple (List (SendPacket (File Path~ String))) (List (File Path~ String)))). You may want to define a helper (local or otherwise) with accumulators to do this.
Note: this should have a lot of similarity to some versions of your map reduce program from Lecture 27: Concurrency II.
(: initial-jobs (-> (List String) (List (File Path~ String)) (Tuple (List (SendPacket (File Path~ String))) (List (File Path~ String)))))
(define (initial-jobs ps fs) (local ((define (ijs ms ps fs) (cond [(empty? fs) (list ms fs)] [(empty? ps) (list ms fs)] [else (ijs (cons (send-packet (first ps) (first fs)) ms) (rest ps) (rest fs))]))) (ijs empty ps fs)))
5 Task 3
At this point, we can define a function coord-start, that takes a FileSystem and returns a function from (List String) to (Action CoordState), i.e., (-> FileSystem (-> (List String) (Action CoordState))); this is the initial set of messages and state for the coordinating process.
(: coord-start (-> FileSystem (-> (List String) (Action CoordState))))
(define (coord-start fs) (lambda (others) (let ((js (initial-jobs others (flatten-fs fs)))) (action (second js) (first js)))))
6 Task 4
Now we want to defined coord-receive. Our worker processes will always send a message when they finish processing a file, which will be a (Maybe Path~): if it is a Path~, that means there was a match, and we should stop traversing and return that Path~. If it is #f, it means they did not find a match, and we should give them another file to look at. If there are no more files to search, we should return our final state of #f.
(: coord-receive (-> CoordState (ReceivePacket (Maybe Path~)) (Action CoordState)))
(define (coord-receive st pkt) (cond [(path? (receive-packet-msg pkt)) (action (receive-packet-msg pkt) (list))] [(empty? st) (action #f (list))] [(cons? st) (action (rest st) (list (send-packet (receive-packet-from pkt) (first st))))] [else (action st (list))]))
7 Task 5
Now we need to define the state for our worker process. Thinking about the task, we realize that the worker doesn’t need to remember anything: each job that it receives has the contents to search, and after searching, it can send back to the coordinating process its findings. So the state for the worker can be:
(define-contract WorkerState False)
This means our first task is to define worker-start. The worker shouldn’t send any messages initially, so this is quite easy.
(: worker-start (-> (List String) (Action False)))
(define (worker-start others) (action #f (list)))
8 Task 6
Next let’s define worker-receive. Define this as a function that takes the string to search for and returns a function that conforms to the receive protocol. i.e., it should have signature (-> String (-> WorkerState (ReceiveMessage ...) (Action WorkerState))).
The worker expects to receive messages from the coordinating process, formated according to Task 2. The worker should then search through the file contents with string-contains?, and then either send a message back with the Path~ if there was a match or a message with #f if there was no match.
(: worker-receive (-> String (-> WorkerState (ReceivePacket (File Path~ String)) (Action WorkerState))))
(define (worker-receive str) (lambda (st pkt) (action #f (list (send-packet (receive-packet-from pkt) (if (string-contains? (file-contents (receive-packet-msg pkt)) str) (file-name (receive-packet-msg pkt)) #f))))))
9 Task 7
Now, put all of these together in a function find-file-with with signature (-> FileSystem String (Maybe Path~)).
It should define, locally, processes for "coord" and at least three "worker" procesess (name them "worker1", "worker2", "worker3").
Then invoke start, using any scheduler you want, and extract out the final result from the "coord" process for the return type.
(: find-file-with (-> FileSystem String (Maybe Path~)))
(define (find-file-with fs str) (second (assoc "coord" (start-debug first (list (process (name "coord") (on-start (coord-start fs)) (on-receive coord-receive)) (process (name "worker1") (on-start worker-start) (on-receive (worker-receive str))) (process (name "worker2") (on-start worker-start) (on-receive (worker-receive str))) (process (name "worker3") (on-start worker-start) (on-receive (worker-receive str)))))))) (check-expect (find-file-with FS1 "hello") (make-path '("A" "b")))
;;;; (packet #:from "coord" #:to "worker1" #:msg '#<make-file: #<make-path: ("A" "b")> "hello there">)
;;;; (state #:process "worker1" #:value #f)
;;;; (packet #:from "worker1" #:to "coord" #:msg '#<make-path: ("A" "b")>)
;;;; (state #:process "coord" #:value '#<make-path: ("A" "b")>)
;;;; (packet #:from "coord" #:to "worker2" #:msg '#<make-file: #<make-path: ("A" "c")> "goodbye">)
;;;; (state #:process "worker2" #:value #f)
;;;; (packet #:from "worker2" #:to "coord" #:msg #f)
;;;; (state #:process "coord" #:value '#<make-path: ("A" "b")>)
;;;; (packet #:from "coord" #:to "worker3" #:msg '#<make-file: #<make-path: ("A" "D" "e")> "yow!">)
;;;; (state #:process "worker3" #:value #f)
;;;; (packet #:from "worker3" #:to "coord" #:msg #f)
;;;; (state #:process "coord" #:value '#<make-path: ("A" "b")>)
(check-expect (find-file-with FS1 "blah") #f)
;;;; (packet #:from "coord" #:to "worker1" #:msg '#<make-file: #<make-path: ("A" "b")> "hello there">)
;;;; (state #:process "worker1" #:value #f)
;;;; (packet #:from "worker1" #:to "coord" #:msg #f)
;;;; (state #:process "coord" #:value '())
;;;; (packet #:from "coord" #:to "worker1" #:msg '#<make-file: #<make-path: ("A" "D" "f")> "eek">)
;;;; (state #:process "worker1" #:value #f)
;;;; (packet #:from "worker1" #:to "coord" #:msg #f)
;;;; (state #:process "coord" #:value #f)
;;;; (packet #:from "coord" #:to "worker2" #:msg '#<make-file: #<make-path: ("A" "c")> "goodbye">)
;;;; (state #:process "worker2" #:value #f)
;;;; (packet #:from "worker2" #:to "coord" #:msg #f)
;;;; (state #:process "coord" #:value #f)
;;;; (packet #:from "coord" #:to "worker3" #:msg '#<make-file: #<make-path: ("A" "D" "e")> "yow!">)
;;;; (state #:process "worker3" #:value #f)
;;;; (packet #:from "worker3" #:to "coord" #:msg #f)
;;;; (state #:process "coord" #:value #f)