dataCache.go 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147
  1. package cron
  2. import (
  3. "DataShare/dao"
  4. "DataShare/global"
  5. "DataShare/model"
  6. "DataShare/service"
  7. "fmt"
  8. "github.com/jasonlvhit/gocron"
  9. "github.com/sirupsen/logrus"
  10. "strconv"
  11. "sync"
  12. )
  13. type CacheTimer struct{
  14. sc *gocron.Scheduler
  15. q chan bool
  16. Id []int
  17. }
  18. var once sync.Once
  19. var singleInstance *CacheTimer
  20. func GetInstance() *CacheTimer {
  21. if singleInstance == nil {
  22. once.Do(
  23. func() {
  24. singleInstance = &CacheTimer{}
  25. })
  26. }
  27. return singleInstance
  28. }
  29. func (this *CacheTimer)Start(){
  30. //每次开机运行都执行一下daytask
  31. this.q = make(chan bool)
  32. this.Id = make([]int,0)
  33. go this.jobs(this.q)
  34. }
  35. func (this *CacheTimer)Stop(){
  36. this.q <- true
  37. }
  38. func (this *CacheTimer)jobs(quit <-chan bool) {
  39. for {
  40. //cron jobs
  41. this.sc = gocron.NewScheduler()
  42. //每天凌晨00:00 同步配置文件
  43. //this.sc.Every(1).Day().At("00:00:00").Do(this.CacheTask)
  44. fmt.Println("---------------------------timer jobs------------------------")
  45. this.sc.Every(5).Seconds().Do(this.CacheTask)
  46. this.sc.Every(10).Seconds().Do(this.SubscribeTask)
  47. select {
  48. case <-quit:
  49. // here we receive from quit and exit
  50. // if `g` has started, we may want to `g.Clear()`
  51. // or the scheduled jobs will continue, we will just not be waiting for them to finish.
  52. return
  53. case <-this.sc.Start():
  54. // here we know all the jobs are done, and go round the loop again
  55. }
  56. }
  57. }
  58. func (this *CacheTimer)CacheTask(){
  59. //this.sc.Clear()
  60. data_cache_list := dao.LoadDataCacheList()
  61. if len(this.Id) != len(data_cache_list) {
  62. this.sc.Clear()
  63. this.Id = make([]int,0)
  64. this.sc.Every(5).Seconds().Do(this.CacheTask)
  65. }
  66. for i:=0;i<len(data_cache_list);i++{
  67. exist := false
  68. for m:=0;m<len(this.Id);m++ {
  69. if this.Id[m] == data_cache_list[i].Id {
  70. exist = true
  71. break
  72. }
  73. }
  74. if exist == true {
  75. continue
  76. }
  77. global.SystemLogger.Log(logrus.InfoLevel,"-------------add new cache task-----------------")
  78. global.SystemLogger.Log(logrus.InfoLevel,data_cache_list[i])
  79. //fmt.Println(data_cache_list[i])
  80. cacheCycle := data_cache_list[i].CacheCycle
  81. //"unit": "d", // 周期单位,m分钟,h小时,d天,w周,m月,y年
  82. //"value": 1, // 周期值,每间隔多少周期单位执行一次
  83. val,err:= strconv.Atoi(cacheCycle.Value)
  84. if err != nil {
  85. global.SystemLogger.Log(logrus.ErrorLevel,"add cache task failed,err:"+err.Error())
  86. continue
  87. }
  88. this.Id = append(this.Id,data_cache_list[i].Id)
  89. fmt.Println("-----ids--------",this.Id)
  90. fmt.Println("----i-----------",i)
  91. //err = service.Business2_DataCache(&data_cache_list[i])
  92. var cacheTask model.DataCacheOfRedis
  93. cacheTask = data_cache_list[i]
  94. if cacheCycle.Unit == "d"{
  95. this.sc.Every(uint64(val)).Days().Do(func() {
  96. err = service.Business2_DataCache(&cacheTask)
  97. })
  98. }else if cacheCycle.Unit == "h"{
  99. this.sc.Every(uint64(val)).Hours().Do(func() {
  100. err = service.Business2_DataCache(&cacheTask)
  101. })
  102. }else if cacheCycle.Unit == "m"{
  103. this.sc.Every(uint64(val)).Minutes().Do(func() {
  104. err = service.Business2_DataCache(&cacheTask)
  105. })
  106. }else if cacheCycle.Unit == "w"{
  107. this.sc.Every(uint64(val)).Weeks().Do(func() {
  108. err = service.Business2_DataCache(&cacheTask)
  109. })
  110. }else if cacheCycle.Unit == "s"{
  111. this.sc.Every(uint64(val)).Seconds().Do(func() {
  112. err = service.Business2_DataCache(&cacheTask)
  113. })
  114. }
  115. if err != nil {
  116. global.SystemLogger.Log(logrus.ErrorLevel,fmt.Sprintf("serviceUrl:%s ,cache data failed,err msg:%s.",
  117. data_cache_list[i].ServiceConfig.ServiceUrl,err.Error()))
  118. }
  119. }
  120. //this.sc.Every(1).Day().At("00:00:00").Do(this.CacheTask)
  121. }
  122. func (this *CacheTimer)SubscribeTask(){
  123. CallSubscribe()
  124. }