cron项目源码分析

cron 是一个定时任务库,功能类似于linux下的contab,使用go开发的。利用cron可以很方便的在 这里的项目服务中集成定时任务。 gocron也是基于cron开发的一个定时任务系统。

Cron 结构体

type Cron struct {
    entries   []*Entry
    chain     Chain
    stop      chan struct{}
    add       chan *Entry
    remove    chan EntryID
    snapshot  chan chan []Entry
    running   bool
    logger    Logger
    runningMu sync.Mutex
    location  *time.Location
    parser    Parser
    nextID    EntryID
    jobWaiter sync.WaitGroup
}

Entry 结构体

// Entry consists of a schedule and the func to execute on that schedule.
type Entry struct {
    // ID is the cron-assigned ID of this entry, which may be used to look up a
    // snapshot or remove it.
    ID EntryID

    // Schedule on which this job should be run.
    Schedule Schedule

    // Next time the job will run, or the zero time if Cron has not been
    // started or this entry's schedule is unsatisfiable
    Next time.Time

    // Prev is the last time this job was run, or the zero time if never.
    Prev time.Time

    // WrappedJob is the thing to run when the Schedule is activated.
    WrappedJob Job

    // Job is the thing that was submitted to cron.
    // It is kept around so that user code that needs to get at the job later,
    // e.g. via Entries() can do so.
    Job Job
}

入口是 cron::run() 方法

func (c *Cron) run() {
    c.logger.Info("start")

    // Figure out the next activation times for each entry.
    now := c.now()
    for _, entry := range c.entries {
        entry.Next = entry.Schedule.Next(now)
        c.logger.Info("schedule", "now", now, "entry", entry.ID, "next", entry.Next)
    }

    for {
        // Determine the next entry to run.
        sort.Sort(byTime(c.entries))

        var timer *time.Timer
        if len(c.entries) == 0 || c.entries[0].Next.IsZero() {
            // If there are no entries yet, just sleep - it still handles new entries
            // and stop requests.
            timer = time.NewTimer(100000 * time.Hour)
        } else {
            timer = time.NewTimer(c.entries[0].Next.Sub(now))
        }

        for {
            select {
            case now = <-timer.C:
                now = now.In(c.location)
                c.logger.Info("wake", "now", now)

                // Run every entry whose next time was less than now
                for _, e := range c.entries {
                    if e.Next.After(now) || e.Next.IsZero() {
                        break
                    }
                    c.startJob(e.WrappedJob)
                    e.Prev = e.Next
                    e.Next = e.Schedule.Next(now)
                    c.logger.Info("run", "now", now, "entry", e.ID, "next", e.Next)
                }

            case newEntry := <-c.add: // 有新任务的时候,添加新的任务
                timer.Stop()
                now = c.now()
                newEntry.Next = newEntry.Schedule.Next(now)
                c.entries = append(c.entries, newEntry)
                c.logger.Info("added", "now", now, "entry", newEntry.ID, "next", newEntry.Next)

            case replyChan := <-c.snapshot:
                replyChan <- c.entrySnapshot()
                continue

            case <-c.stop:
                timer.Stop()
                c.logger.Info("stop")
                return

            case id := <-c.remove:
                timer.Stop()
                now = c.now()
                c.removeEntry(id)
                c.logger.Info("removed", "entry", id)
            }

            break
        }
    }
}

run 方法实现了cron程序的主控制流程,主要的事件有:

  • 定时器触发最近的定时任务, 并且准备了下一次执行的任务。
  • 当有新任务追加进来的时候处理逻辑。
  • 当有任务需要删除的时候的处理逻辑。
  • 但接收到需要停止的通知的时候的逻辑。

cron.Start() 可以调用多次,只要cron已经启动了,再次调用是没有作用的。

Leave a Comment

Your email address will not be published. Required fields are marked *

PHP 8.1.1 - 13.706 ms, 0 Q