LOADING

加载过慢请开启缓存,浏览器默认开启

MIT 8.5840 Lab I

Finish the lab today

In this lab, we are supposed to implement a Map-Reduce version WordCount

The structure is shown below

avator

the common data structure of Request and Reply are shown below

type JobArgType int

const (
    JobArgUndef JobArgType = iota
    JobArgMap
    JobArgReduce
    JobArgExit
    JobArgTimeOut
    JobArgSleep
)

type JobReq struct {
    Type     JobArgType
    JobId    int //corresponding job id, None when first send
    FileName []string
}
type JobResp struct {
    Type     JobArgType //job type
    JobId    int        //corresponding job id
    NReduce  int
    FileName []string
}

how we arrange the coordinator

there is another optimization that I did not do, in this implementation, I just iterate over all jobs and pick up the first one, this is ${O(n)}$ in fact we could use a single int to indicate the smallest number that has not been used

func (c *Coordinator) HandleJob(req *JobReq, resp *JobResp) error {
    c.lock.Lock()
    defer c.lock.Unlock()
    jobId := req.JobId
    switch req.Type {
    case JobArgMap: //map job finished and schedule a new reduce job
        {
            if c.mapTasks[jobId].status == TaskNotDone {
                c.mapTasks[jobId].status = TaskDone
                for reduceId, file := range req.FileName {
                    if len(file) > 0 {
                        c.reduceTasks[reduceId].fileName = append(c.reduceTasks[reduceId].fileName, file)
                    }
                }
                c.mapRemain--
                //fmt.Println("mapRemain ", c.mapRemain)
            }
        }
    case JobArgReduce: //reduce job finished and
        {
            if c.reduceTasks[jobId].status == TaskNotDone {
                c.reduceTasks[jobId].status = TaskDone
                c.reduceRemain--
                //fmt.Println("reduceRemain ", c.reduceRemain)
            }
        }
    }
    nowTime := time.Now()
    invalidTime := nowTime.Add(-10 * time.Second)
    if c.mapRemain > 0 { //schedule a new map task
        //now directly iterate the map array, could be optimized by a optimal number instead
        for id := range c.mapTasks {
            task := &c.mapTasks[id]
            if task.status != TaskDone && task.startTime.Before(invalidTime) {
                //fmt.Println("scheduled a new map task!")
                resp.Type = JobArgMap
                resp.FileName = []string{task.fileName}
                resp.JobId = task.taskId
                resp.NReduce = len(c.reduceTasks)
                // resp.JobId = 122
                // resp.NReduce = 122
                task.startTime = nowTime
                //fmt.Println("filename NReduce jobId", task.fileName, resp.NReduce, resp.JobId)
                return nil
            }
        }
        resp.Type = JobArgSleep //if did not find a suitable task to schedule, let the worker sleep
    } else if c.reduceRemain > 0 { //schedule a new reduce task
        for id := range c.reduceTasks {
            task := c.reduceTasks[id]
            if task.status != TaskDone && task.startTime.Before(invalidTime) {
                //fmt.Println("schedule a new reduce task!")
                resp.FileName = task.fileName
                resp.JobId = id
                resp.Type = JobArgReduce
                task.startTime = nowTime
                return nil
            }
        }
        resp.Type = JobArgSleep
    } else {
        resp.Type = JobArgExit
    }
    return nil
}

here is how we arrange the workers and choose the corresponding actions

func Worker(mapf func(string, string) []KeyValue,
    reducef func(string, []string) string) {
    var req JobReq = JobReq{Type: JobArgUndef}
    for {
        resp := GetResp(&req)
        NReduce := resp.NReduce
        switch resp.Type {
        case JobArgMap: //do map job
            {
                fileName := resp.FileName[0] //job name
                file, err := os.Open(fileName)
                if err != nil {
                    log.Fatal("cannot open file %v", fileName)
                    os.Exit(0)
                }
                defer file.Close()
                content, err := ioutil.ReadAll(file)
                if err != nil {
                    log.Fatal("cannot read file %v", fileName)
                    os.Exit(0)
                }
                tmpRes := mapf(fileName, string(content))
                reduceFileMap := make(map[int][]KeyValue)
                for _, kv := range tmpRes {
                    reduceId := ihash(kv.Key) % NReduce
                    reduceFileMap[reduceId] = append(reduceFileMap[reduceId], kv)
                }
                files := make([]string, NReduce)
                // adding kvs to the new file
                for reduceId, kvs := range reduceFileMap {
                    tmpFileName := fmt.Sprintf("mr-%d-%d", resp.JobId, reduceId)
                    tmpFile, _ := os.Create(tmpFileName)
                    defer tmpFile.Close()
                    enc := json.NewEncoder(tmpFile)
                    for _, kv := range kvs {
                        err := enc.Encode(&kv)
                        if err != nil {
                            break
                        }
                    }
                    files[reduceId] = tmpFileName
                }
                req.JobId = resp.JobId
                req.Type = JobArgMap
                req.FileName = files //return the intermidiate file names
            }
        case JobArgReduce: //do reduce job
            {
                interContent := []KeyValue{}
                for _, fileName := range resp.FileName {
                    file, err := os.Open(fileName)
                    if err != nil {
                        log.Fatal("In reduce phase open file failed", fileName)
                    }
                    defer file.Close()
                    dec := json.NewDecoder(file)
                    for {
                        var kv KeyValue
                        if err := dec.Decode(&kv); err != nil {
                            break
                        }
                        interContent = append(interContent, kv)
                    }
                }
                //sort the key
                sort.Sort(ByKey(interContent))
                tmpFileName := fmt.Sprintf("mr-out-%d", resp.JobId)
                tmpFile, _ := os.Create(tmpFileName)
                defer tmpFile.Close()

                for i := 0; i < len(interContent); {
                    j := i + 1
                    for j < len(interContent) && interContent[j].Key == interContent[i].Key {
                        j++
                    }
                    values := []string{}
                    for k := i; k < j; k++ {
                        values = append(values, interContent[k].Value)
                    }
                    //open a file and write the bytes in
                    cnt := reducef(interContent[i].Key, values)
                    fmt.Fprintf(tmpFile, "%v %v\n", interContent[i].Key, cnt)
                    i = j
                }
                req.FileName = []string{tmpFileName}
                req.Type = JobArgReduce
                req.JobId = resp.JobId
            }
        case JobArgSleep:
            {
                time.Sleep(1000 * time.Millisecond)
                req.Type = JobArgUndef
            }
        case JobArgExit:
            {
                return
            }
        }
    }
}

it is a comparably easy task and let’s crack the lab2