根据第五期的 cron 项目,理解 cron.v2 项目的源码,主要涉及的知识点

  • go 关键字
  • chan select
  • 闭包
  • 接口

先看下源码的结构

├── LICENSE
├── README.md
├── constantdelay.go 处理特殊时间格式 struct
├── cron.go 负责整个流程控制和调度的 struct
├── doc.go 文档文件
├── parser.go 将输入的 crontab 格式进行转换
└── spec.go Schedule 数据定义和转换

整个代码的执行流程

        NewCron()
          ⇊
Add(Schedule, Command)
          ⇊
    Parse(Schedule)
          ⇊
         Run()

主要的相关的源码 cron.go

type Cron struct {
    entries  []*Entry // 一个任务,就是一个 entry
    stop     chan struct{} // 停止任务
    add      chan *Entry // 添加任务
    remove   chan EntryID // 移除任务,每个 Entry 都有一个 ID,稍后介绍
    snapshot chan []Entry // 当前任务
    running  bool // 整个 Cron 是否处于运行状态
    nextID   EntryID // 下一个 Entry
}

// 一个具体的任务
type Entry struct {
    // ID is the cron-assigned ID of this entry, which may be used to look up a
    // snapshot or remove it.
    ID EntryID

    // Schedule on which this job should be run.
    Schedule Schedule

    // Next time the job will run, or the zero time if Cron has not been
    // started or this entry's schedule is unsatisfiable
    Next time.Time

    // Prev is the last time this job was run, or the zero time if never.
    Prev time.Time

    // Job is the thing to run when the Schedule is activated.
    Job Job
}

上面的 Entry 里面有两个 struct,我们先来看一下 Schedule,Schedule 是一个 interface,主要是描述了一个 Job 具体运行的时间

// 一个接口
// Schedule describes a job's duty cycle.
type Schedule interface {
    // Next returns the next activation time, later than the given time.
    // Next is invoked initially, and then each time the job is run.
    Next(time.Time) time.Time
}

Golang 的接口和其它面向对象语言有所不同,接口和具体实现类不是一个强关联,如上面 Schedule 定义了一个方法 Next,那么任何一个实现了该方法的 struct 都相当于实现了这个接口,如代码中 SpecSchedule 这个 struct,我们可以说他实现了 Schedule 那个接口。如果是一个空接口,没有任何方法,那么所有的 struct 都实现了这个空接口。

// SpecSchedule specifies a duty cycle (to the second granularity), based on a
// traditional crontab specification. It is computed initially and stored as bit sets.
type SpecSchedule struct {
    Second, Minute, Hour, Dom, Month, Dow uint64
    Location                              *time.Location
}

// Next returns the next time this schedule is activated, greater than the given
// time.  If no time can be found to satisfy the schedule, return the zero time.
func (s *SpecSchedule) Next(t time.Time) time.Time {
    // 此处代码省略
}

我们真正加入到 Entry 的 Schedule 都是 SpecSchedule,SpecSchedule 的 Next 方法主要是处理下一次 Job 运行时间的方法,该方法里还使用了 goto 关键字,在 Parse 阶段调用的 Parse() 方法将 crontab 格式的数据转换为 SpecSchedule 的实例。

WRAP:
    if t.Year() > yearLimit {
        return time.Time{}
    }

    // Find the first applicable month.
    // If it's this month, then do nothing.
    for 1<<uint(t.Month())&s.Month == 0 {
        // If we have to add a month, reset the other parts to 0.
        if !added {
            added = true
            // Otherwise, set the date at the beginning (since the current time is irrelevant).
            t = time.Date(t.Year(), t.Month(), 1, 0, 0, 0, 0, s.Location)
        }
        t = t.AddDate(0, 1, 0)

        // Wrapped around.
        if t.Month() == time.January {
            goto WRAP
        }
    }

goto 关键字是 Golang 控制流程的关键字之一,用起来比较简单,就是在函数某个位置写个标签,后续流程里 goto 到这个标签,跟 C 语言里的 goto 类似。

接下来我们来说一说闭包,闭包这个概念大家多少都有些理解了,结合 cron.v2 的代码,我们来看一看

// FuncJob is a wrapper that turns a func() into a cron.Job
type FuncJob func() // FuncJob 是一个函数类型,好比 type EntryId int

func (f FuncJob) Run() { f() } // 给 FuncJob 增加了一个 Run 方法

// AddFunc adds a func to the Cron to be run on the given schedule.
// spec 就是 crontab 类似的时间格式 */1 * * * 1 或者 @every 5s 这类的
// cmd 这里传进来就是函数例如 AddFunc("@every 1s", func () { fmt.Println("Hello Func") })
func (c *Cron) AddFunc(spec string, cmd func()) (EntryID, error) {
    return c.AddJob(spec, FuncJob(cmd))
}

看下这个例子

package main

import "fmt"

type FFF func()

func (f FFF) printFFF() {
    f()
    fmt.Println("printFFF")
}
func main() {
    fp := FFF(func() {
        fmt.Println("YoYoYo")
    })
    fp.printFFF()
    fmt.Println("hello")
}
// 结果
// YoYoYo
// printFFF
// hello

接下来我们来看看 Run 部分的 go 关键字和 chan,这两个关键字可以说是 Golang 语言的精华所在了。go 关键字主要是用来创建一个 goroutine(协程),而 goroutinechan 协作使用,来完成不同 goroutine 之间的通信。先看一个例子:

package main

import "fmt"
import "time"

func main() {
    var amChan chan int
    fmt.Println("来通个信")
    amChan = make(chan int, 5)
    for i := 0; i < 5; i++ {
        // 注释1 go func() {
            amChan <- i

        // 注释1 }()
        // 注释2 time.Sleep(100 * time.Millisecond)
    }
    for i := 0; i < 5; i++ {
        result := <-amChan
        fmt.Println(result)
    }
}

上面的代码,如果注释 1 和 2部分不打开,输出结果 0 1 2 3 4,如果只是打开注释 1,我们看到的结果会是 5 5 5 5 5,如果注释 1 和 2两部分都打开,则会输出 0 1 2 3 4

chan 在我理解起来更像是一个队列,输入数据,取出数据,这里面 注释 1 和 2 带来不同的输出结果主要是因为 go 这个关键字开启协程导致的,协程是独立的处理单元,你无法知道他们什么时候真正的启动执行,而注释带来的就是代码逻辑顺序依赖变量 i,执行整个 for 循环以后,可能 go 的协程才执行,这样必然带来输出结果不一致的问题。下面我们看下 Cron.Run 方法调用的代码:

// Start the cron scheduler in its own go-routine.
func (c *Cron) Start() {
    c.running = true // 设置状态
    go c.run() // 通过协程来执行 run()
}

// run the scheduler.. this is private just due to the need to synchronize
// access to the 'running' state variable.
func (c *Cron) run() {
    // Figure out the next activation times for each entry.
    now := time.Now().Local()
    // 设置下次执行时间
    for _, entry := range c.entries {
        entry.Next = entry.Schedule.Next(now)
    }

    for {
        // Determine the next entry to run.
        // 排序当前任务的执行时间,排序方法见下面代码
        sort.Sort(byTime(c.entries))

        // effective 就是下次任务要执行的时间
        var effective time.Time
        if len(c.entries) == 0 || c.entries[0].Next.IsZero() {
            // 此处是任务为空时处理逻辑
            // If there are no entries yet, just sleep - it still handles new entries
            // and stop requests.
            effective = now.AddDate(10, 0, 0)
        } else {
            effective = c.entries[0].Next
        }
        // Golang 针对 chan 的读写操作,提供了 select 语句,
        // 可以用 select 来检测或者阻塞 chan 操作的,
        // select 会评估 case 语句块,只要有任何一个评估通过,
        // 就会执行对应的语句,如果都无法通过评估,
        // 则会一直阻塞,直到有语句可以评估通过。
        // 如果都无法通过评估且语句有 `default` 块,则执行 `default` 部分
        select {
            case now = <-time.After(effective.Sub(now)):
                // Run every entry whose next time was this effective time.
                for _, e := range c.entries {
                    if e.Next != effective {
                        break
                    }
                    go e.Job.Run()
                    e.Prev = e.Next
                    e.Next = e.Schedule.Next(effective)
                }
                continue

            case newEntry := <-c.add:
                c.entries = append(c.entries, newEntry)
                newEntry.Next = newEntry.Schedule.Next(now)

            case <-c.snapshot:
                c.snapshot <- c.entrySnapshot()

            case id := <-c.remove:
                c.removeEntry(id)

            case <-c.stop:
                return
        }

        now = time.Now().Local()
    }
}

整个 Cron 运行的时候,就是个无限循环,不停的循环 Cron.entries,每次循环一个 Entry 任务出来,并计算该任务下次的执行时间,在循环 entries 前对其排序,将最近要执行的任务取出来进行和当前时间对比,反复如此,下面就是排序相关代码 sort.Sort(byTime(c.entries))

type byTime []*Entry

func (s byTime) Len() int      { return len(s) }
func (s byTime) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
func (s byTime) Less(i, j int) bool {
    // Two zero times should return false.
    // Otherwise, zero is "greater" than any other time.
    // (To sort it at the end of the list.)
    if s[i].Next.IsZero() {
        return false
    }
    if s[j].Next.IsZero() {
        return true
    }
    return s[i].Next.Before(s[j].Next)
}

EOF;