jobbase.go 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205
  1. package jobs
  2. import (
  3. "fmt"
  4. "github.com/robfig/cron/v3"
  5. "gorm.io/gorm"
  6. "sync"
  7. "time"
  8. models2 "gas-cylinder-api/app/jobs/model"
  9. log "gogs.baozhida.cn/zoie/OAuth-core/logger"
  10. "gogs.baozhida.cn/zoie/OAuth-core/pkg"
  11. "gogs.baozhida.cn/zoie/OAuth-core/pkg/cronjob"
  12. "gogs.baozhida.cn/zoie/OAuth-core/sdk"
  13. )
  14. var timeFormat = "2006-01-02 15:04:05"
  15. var retryCount = 3
  16. var jobList map[string]JobsExec
  17. var lock sync.Mutex
  18. type JobCore struct {
  19. InvokeTarget string
  20. Name string
  21. JobId int
  22. EntryId int
  23. CronExpression string
  24. Args string
  25. }
  26. // 任务类型 http
  27. type HttpJob struct {
  28. JobCore
  29. }
  30. type ExecJob struct {
  31. JobCore
  32. }
  33. func (e *ExecJob) Run() {
  34. startTime := time.Now()
  35. var obj = jobList[e.InvokeTarget]
  36. if obj == nil {
  37. log.Warn("[Job] ExecJob Run job nil")
  38. return
  39. }
  40. err := CallExec(obj.(JobsExec), e.Args)
  41. if err != nil {
  42. // 如果失败暂停一段时间重试
  43. fmt.Println(time.Now().Format(timeFormat), " [ERROR] mission failed! ", err)
  44. }
  45. // 结束时间
  46. endTime := time.Now()
  47. // 执行时间
  48. latencyTime := endTime.Sub(startTime)
  49. //TODO: 待完善部分
  50. //str := time.Now().Format(timeFormat) + " [INFO] JobCore " + string(e.EntryId) + "exec success , spend :" + latencyTime.String()
  51. //ws.SendAll(str)
  52. log.Infof("[Job] JobCore %s exec success , spend :%v", e.Name, latencyTime)
  53. return
  54. }
  55. // http 任务接口
  56. func (h *HttpJob) Run() {
  57. startTime := time.Now()
  58. var count = 0
  59. var err error
  60. var str string
  61. /* 循环 */
  62. LOOP:
  63. if count < retryCount {
  64. /* 跳过迭代 */
  65. str, err = pkg.Get(h.InvokeTarget)
  66. if err != nil {
  67. // 如果失败暂停一段时间重试
  68. fmt.Println(time.Now().Format(timeFormat), " [ERROR] mission failed! ", err)
  69. fmt.Printf(time.Now().Format(timeFormat)+" [INFO] Retry after the task fails %d seconds! %s \n", (count+1)*5, str)
  70. time.Sleep(time.Duration(count+1) * 5 * time.Second)
  71. count = count + 1
  72. goto LOOP
  73. }
  74. }
  75. // 结束时间
  76. endTime := time.Now()
  77. // 执行时间
  78. latencyTime := endTime.Sub(startTime)
  79. //TODO: 待完善部分
  80. log.Infof("[Job] JobCore %s exec success , spend :%v", h.Name, latencyTime)
  81. return
  82. }
  83. // 初始化
  84. func Setup(dbs map[string]*gorm.DB) {
  85. fmt.Println(time.Now().Format(timeFormat), " [INFO] JobCore Starting...")
  86. for k, db := range dbs {
  87. sdk.Runtime.SetCrontab(k, cronjob.NewWithSeconds())
  88. setup(k, db)
  89. }
  90. }
  91. func setup(key string, db *gorm.DB) {
  92. crontab := sdk.Runtime.GetCrontabKey(key)
  93. sysJob := models2.SysJob{}
  94. jobs := make([]models2.SysJob, 0)
  95. err := sysJob.GetList(db, &jobs)
  96. if err != nil {
  97. fmt.Println(time.Now().Format(timeFormat), " [ERROR] JobCore init error", err)
  98. }
  99. if len(jobs) == 0 {
  100. fmt.Println(time.Now().Format(timeFormat), " [INFO] JobCore total:0")
  101. }
  102. _, err = sysJob.RemoveAllEntryID(db)
  103. if err != nil {
  104. fmt.Println(time.Now().Format(timeFormat), " [ERROR] JobCore remove entry_id error", err)
  105. }
  106. for i := 0; i < len(jobs); i++ {
  107. if !jobs[i].Auto {
  108. continue
  109. }
  110. if jobs[i].JobType == 1 {
  111. j := &HttpJob{}
  112. j.InvokeTarget = jobs[i].InvokeTarget
  113. j.CronExpression = jobs[i].CronExpression
  114. j.JobId = jobs[i].JobId
  115. j.Name = jobs[i].JobName
  116. sysJob.EntryId, err = AddJob(crontab, j)
  117. } else if jobs[i].JobType == 2 {
  118. j := &ExecJob{}
  119. j.InvokeTarget = jobs[i].InvokeTarget
  120. j.CronExpression = jobs[i].CronExpression
  121. j.JobId = jobs[i].JobId
  122. j.Name = jobs[i].JobName
  123. j.Args = jobs[i].Args
  124. sysJob.EntryId, err = AddJob(crontab, j)
  125. }
  126. err = sysJob.Update(db, jobs[i].JobId)
  127. }
  128. // 启动任务
  129. crontab.Start()
  130. fmt.Println(time.Now().Format(timeFormat), " [INFO] JobCore start success.")
  131. // 关闭任务
  132. defer crontab.Stop()
  133. select {}
  134. }
  135. // 添加任务 AddJob(invokeTarget string, jobId int, jobName string, cronExpression string)
  136. func AddJob(c *cron.Cron, job Job) (int, error) {
  137. if job == nil {
  138. fmt.Println("unknown")
  139. return 0, nil
  140. }
  141. return job.addJob(c)
  142. }
  143. func (h *HttpJob) addJob(c *cron.Cron) (int, error) {
  144. id, err := c.AddJob(h.CronExpression, h)
  145. if err != nil {
  146. fmt.Println(time.Now().Format(timeFormat), " [ERROR] JobCore AddJob error", err)
  147. return 0, err
  148. }
  149. EntryId := int(id)
  150. return EntryId, nil
  151. }
  152. func (h *ExecJob) addJob(c *cron.Cron) (int, error) {
  153. id, err := c.AddJob(h.CronExpression, h)
  154. if err != nil {
  155. fmt.Println(time.Now().Format(timeFormat), " [ERROR] JobCore AddJob error", err)
  156. return 0, err
  157. }
  158. EntryId := int(id)
  159. return EntryId, nil
  160. }
  161. // 移除任务
  162. func Remove(c *cron.Cron, entryID int) chan bool {
  163. ch := make(chan bool)
  164. go func() {
  165. c.Remove(cron.EntryID(entryID))
  166. fmt.Println(time.Now().Format(timeFormat), " [INFO] JobCore Remove success ,info entryID :", entryID)
  167. ch <- true
  168. }()
  169. return ch
  170. }
  171. // 任务停止
  172. //func Stop() chan bool {
  173. // ch := make(chan bool)
  174. // go func() {
  175. // global.GADMCron.Stop()
  176. // ch <- true
  177. // }()
  178. // return ch
  179. //}