subscribe.go 7.3 KB


  1. package cron
  2. import (
  3. "DataShare/dao"
  4. "DataShare/global"
  5. "DataShare/model"
  6. "DataShare/util"
  7. "encoding/json"
  8. "errors"
  9. "fmt"
  10. "github.com/sirupsen/logrus"
  11. "strings"
  12. )
  13. //完成订阅功能
  14. func ReadSubscribeDesc()([]model.Subscribe){
  15. res,_:= global.RedisClient.Get("subscribe").Result()
  16. var arraySub []model.Subscribe
  17. if err := json.Unmarshal([]byte(res),&arraySub);err ==nil {
  18. return arraySub
  19. }
  20. return nil
  21. }
  22. //获取最近缓存的最大值
  23. func ReadLatestMaxVal(accessId string)(map[string]interface{},error) {
  24. res,err := global.RedisClient.Get(global.RedisDataShare+":"+"subscribe"+":"+accessId).Result()
  25. if err != nil {
  26. return nil,err
  27. }
  28. var mapData map[string]interface{}
  29. err = json.Unmarshal([]byte(res),&mapData)
  30. return mapData,err
  31. }
  32. func getDataBaseInfoById(id int)(*global.DbInfo){
  33. for i:=0;i<len(global.Databases);i++{
  34. if global.Databases[i].Id == id {
  35. return &global.Databases[i]
  36. }
  37. }
  38. return nil
  39. }
  40. //获取max
  41. func DatabaseGetMaxFieldValue(req *model.Access_Data_Struct,fieldName string)(map[string]interface{},error){
  42. dbInfo := getDataBaseInfoById(req.ResData.DbId)
  43. if dbInfo==nil{
  44. return nil,errors.New("数据库不存在!")
  45. }
  46. if dbInfo.Status == false {
  47. return nil,errors.New(fmt.Sprintf("database:%s not connect",dbInfo.DbName))
  48. }
  49. if dbInfo.SqlType == "mysql" || dbInfo.SqlType == "postgres" || dbInfo.SqlType == "kingbase" || dbInfo.SqlType=="dm8" {
  50. args := make([]interface{},0)
  51. sql := "select %s"
  52. sql += " from %s"
  53. args = append(args,fieldName)
  54. args = append(args,req.ResData.TbName)
  55. where := 0
  56. //拼接默认参数
  57. cols := req.ResData.Colnums
  58. for zz:=0;zz<len(cols);zz++{
  59. //if cols[zz].IsSearchable != nil && *cols[zz].IsSearchable == 1 {
  60. if cols[zz].FilterOperator != nil && len(*cols[zz].FilterOperator) > 0 && cols[zz].FilterValue != nil && len(*cols[zz].FilterValue) > 0 {
  61. if where == 0 {
  62. sql += " where"
  63. where = 1
  64. }else{
  65. sql += " and"
  66. }
  67. //0-等于 1-大于 2-小于
  68. if *cols[zz].FilterOperator == "0" {
  69. sql += " %s='%s'"
  70. }else if *cols[zz].FilterOperator == "1" {
  71. sql += " %s>'%s'"
  72. }else if *cols[zz].FilterOperator == "2" {
  73. sql += " %s<'%s'"
  74. }
  75. args = append(args,cols[zz].Name)
  76. args = append(args,*cols[zz].FilterValue)
  77. }
  78. //}
  79. }
  80. sql += " order by %s desc limit 1"
  81. args = append(args,fieldName)
  82. sqlStr := fmt.Sprintf(sql,args...)
  83. //fmt.Println("----------AAAAAAAAAAAA-----------")
  84. //fmt.Println(sqlStr)
  85. //查询
  86. return dao.GetOneData(dbInfo.DbHandle,sqlStr)
  87. }
  88. return nil,errors.New("not support database type")
  89. }
  90. //根据key过滤
  91. func DatabaseUpdateCheck(req *model.Access_Data_Struct,fieldName string,fieldValue interface{})(bool,error){
  92. //第一步:去出要查询的字段
  93. dbInfo := getDataBaseInfoById(req.ResData.DbId)
  94. if dbInfo==nil{
  95. return false,errors.New("数据库不存在!")
  96. }
  97. if dbInfo.Status == false {
  98. return false,errors.New(fmt.Sprintf("database:%s not connect",dbInfo.DbName))
  99. }
  100. if dbInfo.SqlType == "mysql" || dbInfo.SqlType == "postgres" || dbInfo.SqlType == "kingbase" || dbInfo.SqlType=="dm8" {
  101. args := make([]interface{},0)
  102. sql := "select count(*)"
  103. sql += " from %s"
  104. args = append(args,req.ResData.TbName)
  105. where := 0
  106. sql += " where %s>'%v'"
  107. args = append(args,fieldName,fieldValue)
  108. where = 1
  109. //拼接默认参数
  110. cols := req.ResData.Colnums
  111. for zz:=0;zz<len(cols);zz++{
  112. //if cols[zz].IsSearchable != nil && *cols[zz].IsSearchable == 1 {
  113. if cols[zz].FilterOperator != nil && len(*cols[zz].FilterOperator) > 0 && cols[zz].FilterValue != nil && len(*cols[zz].FilterValue) > 0 {
  114. if where == 0 {
  115. sql += " where"
  116. where = 1
  117. }else{
  118. sql += " and"
  119. }
  120. //0-等于 1-大于 2-小于
  121. if *cols[zz].FilterOperator == "0" {
  122. sql += " %s='%s'"
  123. }else if *cols[zz].FilterOperator == "1" {
  124. sql += " %s>'%s'"
  125. }else if *cols[zz].FilterOperator == "2" {
  126. sql += " %s<'%s'"
  127. }
  128. args = append(args,cols[zz].Name)
  129. args = append(args,*cols[zz].FilterValue)
  130. }
  131. //}
  132. }
  133. sqlStr := fmt.Sprintf(sql,args...)
  134. //fmt.Println("----------BBBBBBBBBBBBBBBBBBBBBBBB-----------")
  135. //fmt.Println(sqlStr)
  136. //查询
  137. cnt,_ := dao.GetCount(dbInfo.DbHandle,sqlStr)
  138. if cnt>0{
  139. return true,nil
  140. }
  141. return false,nil
  142. }
  143. return false,nil
  144. }
  145. func ServiceUpdateCheck(req *model.Access_Service_Struct, fieldId interface{}){
  146. }
  147. func getAccessInfo(accessId string) (string,*model.Access_Service_Struct,*model.Access_Data_Struct,error) {
  148. key := fmt.Sprintf("%s", accessId)
  149. body, err := global.RedisClient.Get(key).Result()
  150. if err != nil {
  151. return "", nil, nil, err
  152. }
  153. var tempMap map[string]interface{}
  154. err = json.Unmarshal([]byte(body), &tempMap)
  155. if err != nil {
  156. //panic(err)
  157. return "", nil, nil, err
  158. }
  159. //for k, values := range tempMap {
  160. // fmt.Printf("%s: %s\n", k, values) // ... and this line
  161. //}
  162. if tempMap["resType"] == "service" {
  163. //接口服务
  164. var accessValue model.Access_Service_Struct
  165. err = json.NewDecoder(strings.NewReader(body)).Decode(&accessValue)
  166. if err != nil {
  167. global.SystemLogger.Log(logrus.ErrorLevel, fmt.Sprintf("json转结构体出错, err:%s", err.Error()))
  168. return "", nil, nil, err
  169. }
  170. return "Service", &accessValue, nil, nil
  171. } else {
  172. //数据资源
  173. var accessValue model.Access_Data_Struct
  174. err = json.NewDecoder(strings.NewReader(body)).Decode(&accessValue)
  175. if err != nil {
  176. global.SystemLogger.Log(logrus.ErrorLevel, fmt.Sprintf("json转结构体出错, err:%s", err.Error()))
  177. return "", nil, nil, err
  178. }
  179. return "Data", nil, &accessValue, nil
  180. }
  181. }
  182. func CallSubscribe() error{
  183. subArray := ReadSubscribeDesc()
  184. //fmt.Printf("----------------------------sub size:%d--------------------------",len(subArray))
  185. //fmt.Println()
  186. for i:=0;i<len(subArray);i++{
  187. //更加addField 去 获取上一次的最大值
  188. mapData,err := ReadLatestMaxVal(subArray[i].AccessId)
  189. if err != nil {
  190. //可能 key 不存在
  191. //return err
  192. }
  193. if subArray[i].ResType == "data" {
  194. _,_,dataInfo, err := getAccessInfo(subArray[i].AccessId)
  195. if err != nil {
  196. global.SystemLogger.Log(logrus.ErrorLevel,"get access id failed,err:",err.Error())
  197. return err
  198. }
  199. //检测是否存在更新
  200. filedVal := interface{}(0)
  201. if mapData != nil && mapData[subArray[i].AddField] != nil {
  202. filedVal = mapData[subArray[i].AddField]
  203. }
  204. //fmt.Printf("------------------filedVal:%v-------------------",filedVal)
  205. //fmt.Println()
  206. ret,err := DatabaseUpdateCheck(dataInfo,subArray[i].AddField,filedVal)
  207. if ret == true{
  208. //获取最大值
  209. maxVal,err := DatabaseGetMaxFieldValue(dataInfo,subArray[i].AddField)
  210. //将最大值保存到缓存里面
  211. if err != nil {
  212. global.SystemLogger.Log(logrus.ErrorLevel,"get max field value failed,err:",err.Error())
  213. }else{
  214. if maxVal != nil && maxVal[subArray[i].AddField] != nil {
  215. jsData, _ := json.Marshal(maxVal)
  216. global.RedisClient.Set(global.RedisDataShare+":"+"subscribe"+":"+subArray[i].AccessId,string(jsData),0)
  217. //通知客户来取数据
  218. byteData := []byte("new data arrived")
  219. global.SystemLogger.Log(logrus.InfoLevel,fmt.Sprintf("--------------------------post subscribe to url:%s-----------------------",dataInfo.SubscribeUrl))
  220. util.HttpPost(dataInfo.SubscribeUrl,byteData,"aaa")
  221. }
  222. }
  223. }
  224. }else if subArray[i].ResType == "service" {
  225. }
  226. }
  227. return nil
  228. }