Finish the lab today
In this lab, we are supposed to implement a Map-Reduce version WordCount
The structure is shown below
the common data structure of Request and Reply are shown below
type JobArgType int
const (
JobArgUndef JobArgType = iota
JobArgMap
JobArgReduce
JobArgExit
JobArgTimeOut
JobArgSleep
)
type JobReq struct {
Type JobArgType
JobId int //corresponding job id, None when first send
FileName []string
}
type JobResp struct {
Type JobArgType //job type
JobId int //corresponding job id
NReduce int
FileName []string
}
how we arrange the coordinator
there is another optimization that I did not do, in this implementation, I just iterate over all jobs and pick up the first one, this is ${O(n)}$ in fact we could use a single int to indicate the smallest number that has not been used
func (c *Coordinator) HandleJob(req *JobReq, resp *JobResp) error {
c.lock.Lock()
defer c.lock.Unlock()
jobId := req.JobId
switch req.Type {
case JobArgMap: //map job finished and schedule a new reduce job
{
if c.mapTasks[jobId].status == TaskNotDone {
c.mapTasks[jobId].status = TaskDone
for reduceId, file := range req.FileName {
if len(file) > 0 {
c.reduceTasks[reduceId].fileName = append(c.reduceTasks[reduceId].fileName, file)
}
}
c.mapRemain--
//fmt.Println("mapRemain ", c.mapRemain)
}
}
case JobArgReduce: //reduce job finished and
{
if c.reduceTasks[jobId].status == TaskNotDone {
c.reduceTasks[jobId].status = TaskDone
c.reduceRemain--
//fmt.Println("reduceRemain ", c.reduceRemain)
}
}
}
nowTime := time.Now()
invalidTime := nowTime.Add(-10 * time.Second)
if c.mapRemain > 0 { //schedule a new map task
//now directly iterate the map array, could be optimized by a optimal number instead
for id := range c.mapTasks {
task := &c.mapTasks[id]
if task.status != TaskDone && task.startTime.Before(invalidTime) {
//fmt.Println("scheduled a new map task!")
resp.Type = JobArgMap
resp.FileName = []string{task.fileName}
resp.JobId = task.taskId
resp.NReduce = len(c.reduceTasks)
// resp.JobId = 122
// resp.NReduce = 122
task.startTime = nowTime
//fmt.Println("filename NReduce jobId", task.fileName, resp.NReduce, resp.JobId)
return nil
}
}
resp.Type = JobArgSleep //if did not find a suitable task to schedule, let the worker sleep
} else if c.reduceRemain > 0 { //schedule a new reduce task
for id := range c.reduceTasks {
task := c.reduceTasks[id]
if task.status != TaskDone && task.startTime.Before(invalidTime) {
//fmt.Println("schedule a new reduce task!")
resp.FileName = task.fileName
resp.JobId = id
resp.Type = JobArgReduce
task.startTime = nowTime
return nil
}
}
resp.Type = JobArgSleep
} else {
resp.Type = JobArgExit
}
return nil
}
here is how we arrange the workers and choose the corresponding actions
func Worker(mapf func(string, string) []KeyValue,
reducef func(string, []string) string) {
var req JobReq = JobReq{Type: JobArgUndef}
for {
resp := GetResp(&req)
NReduce := resp.NReduce
switch resp.Type {
case JobArgMap: //do map job
{
fileName := resp.FileName[0] //job name
file, err := os.Open(fileName)
if err != nil {
log.Fatal("cannot open file %v", fileName)
os.Exit(0)
}
defer file.Close()
content, err := ioutil.ReadAll(file)
if err != nil {
log.Fatal("cannot read file %v", fileName)
os.Exit(0)
}
tmpRes := mapf(fileName, string(content))
reduceFileMap := make(map[int][]KeyValue)
for _, kv := range tmpRes {
reduceId := ihash(kv.Key) % NReduce
reduceFileMap[reduceId] = append(reduceFileMap[reduceId], kv)
}
files := make([]string, NReduce)
// adding kvs to the new file
for reduceId, kvs := range reduceFileMap {
tmpFileName := fmt.Sprintf("mr-%d-%d", resp.JobId, reduceId)
tmpFile, _ := os.Create(tmpFileName)
defer tmpFile.Close()
enc := json.NewEncoder(tmpFile)
for _, kv := range kvs {
err := enc.Encode(&kv)
if err != nil {
break
}
}
files[reduceId] = tmpFileName
}
req.JobId = resp.JobId
req.Type = JobArgMap
req.FileName = files //return the intermidiate file names
}
case JobArgReduce: //do reduce job
{
interContent := []KeyValue{}
for _, fileName := range resp.FileName {
file, err := os.Open(fileName)
if err != nil {
log.Fatal("In reduce phase open file failed", fileName)
}
defer file.Close()
dec := json.NewDecoder(file)
for {
var kv KeyValue
if err := dec.Decode(&kv); err != nil {
break
}
interContent = append(interContent, kv)
}
}
//sort the key
sort.Sort(ByKey(interContent))
tmpFileName := fmt.Sprintf("mr-out-%d", resp.JobId)
tmpFile, _ := os.Create(tmpFileName)
defer tmpFile.Close()
for i := 0; i < len(interContent); {
j := i + 1
for j < len(interContent) && interContent[j].Key == interContent[i].Key {
j++
}
values := []string{}
for k := i; k < j; k++ {
values = append(values, interContent[k].Value)
}
//open a file and write the bytes in
cnt := reducef(interContent[i].Key, values)
fmt.Fprintf(tmpFile, "%v %v\n", interContent[i].Key, cnt)
i = j
}
req.FileName = []string{tmpFileName}
req.Type = JobArgReduce
req.JobId = resp.JobId
}
case JobArgSleep:
{
time.Sleep(1000 * time.Millisecond)
req.Type = JobArgUndef
}
case JobArgExit:
{
return
}
}
}
}
it is a comparably easy task and let’s crack the lab2