123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257 |
- package cron
- import (
- "DataShare/dao"
- "DataShare/global"
- "DataShare/model"
- "DataShare/util"
- "encoding/json"
- "errors"
- "fmt"
- "github.com/sirupsen/logrus"
- "strings"
- )
- //完成订阅功能
- func ReadSubscribeDesc()([]model.Subscribe){
- res,_:= global.RedisClient.Get("subscribe").Result()
- var arraySub []model.Subscribe
- if err := json.Unmarshal([]byte(res),&arraySub);err ==nil {
- return arraySub
- }
- return nil
- }
- //获取最近缓存的最大值
- func ReadLatestMaxVal(accessId string)(map[string]interface{},error) {
- res,err := global.RedisClient.Get(global.RedisDataShare+":"+"subscribe"+":"+accessId).Result()
- if err != nil {
- return nil,err
- }
- var mapData map[string]interface{}
- err = json.Unmarshal([]byte(res),&mapData)
- return mapData,err
- }
- func getDataBaseInfoById(id int)(*global.DbInfo){
- for i:=0;i<len(global.Databases);i++{
- if global.Databases[i].Id == id {
- return &global.Databases[i]
- }
- }
- return nil
- }
- //获取max
- func DatabaseGetMaxFieldValue(req *model.Access_Data_Struct,fieldName string)(map[string]interface{},error){
- dbInfo := getDataBaseInfoById(req.ResData.DbId)
- if dbInfo==nil{
- return nil,errors.New("数据库不存在!")
- }
- if dbInfo.Status == false {
- return nil,errors.New(fmt.Sprintf("database:%s not connect",dbInfo.DbName))
- }
- if dbInfo.SqlType == "mysql" || dbInfo.SqlType == "postgres" || dbInfo.SqlType == "kingbase" || dbInfo.SqlType=="dm8" {
- args := make([]interface{},0)
- sql := "select %s"
- sql += " from %s"
- args = append(args,fieldName)
- args = append(args,req.ResData.TbName)
- where := 0
- //拼接默认参数
- cols := req.ResData.Colnums
- for zz:=0;zz<len(cols);zz++{
- //if cols[zz].IsSearchable != nil && *cols[zz].IsSearchable == 1 {
- if cols[zz].FilterOperator != nil && len(*cols[zz].FilterOperator) > 0 && cols[zz].FilterValue != nil && len(*cols[zz].FilterValue) > 0 {
- if where == 0 {
- sql += " where"
- where = 1
- }else{
- sql += " and"
- }
- //0-等于 1-大于 2-小于
- if *cols[zz].FilterOperator == "0" {
- sql += " %s='%s'"
- }else if *cols[zz].FilterOperator == "1" {
- sql += " %s>'%s'"
- }else if *cols[zz].FilterOperator == "2" {
- sql += " %s<'%s'"
- }
- args = append(args,cols[zz].Name)
- args = append(args,*cols[zz].FilterValue)
- }
- //}
- }
- sql += " order by %s desc limit 1"
- args = append(args,fieldName)
- sqlStr := fmt.Sprintf(sql,args...)
- //fmt.Println("----------AAAAAAAAAAAA-----------")
- //fmt.Println(sqlStr)
- //查询
- return dao.GetOneData(dbInfo.DbHandle,sqlStr)
- }
- return nil,errors.New("not support database type")
- }
- //根据key过滤
- func DatabaseUpdateCheck(req *model.Access_Data_Struct,fieldName string,fieldValue interface{})(bool,error){
- //第一步:去出要查询的字段
- dbInfo := getDataBaseInfoById(req.ResData.DbId)
- if dbInfo==nil{
- return false,errors.New("数据库不存在!")
- }
- if dbInfo.Status == false {
- return false,errors.New(fmt.Sprintf("database:%s not connect",dbInfo.DbName))
- }
- if dbInfo.SqlType == "mysql" || dbInfo.SqlType == "postgres" || dbInfo.SqlType == "kingbase" || dbInfo.SqlType=="dm8" {
- args := make([]interface{},0)
- sql := "select count(*)"
- sql += " from %s"
- args = append(args,req.ResData.TbName)
- where := 0
- sql += " where %s>'%v'"
- args = append(args,fieldName,fieldValue)
- where = 1
- //拼接默认参数
- cols := req.ResData.Colnums
- for zz:=0;zz<len(cols);zz++{
- //if cols[zz].IsSearchable != nil && *cols[zz].IsSearchable == 1 {
- if cols[zz].FilterOperator != nil && len(*cols[zz].FilterOperator) > 0 && cols[zz].FilterValue != nil && len(*cols[zz].FilterValue) > 0 {
- if where == 0 {
- sql += " where"
- where = 1
- }else{
- sql += " and"
- }
- //0-等于 1-大于 2-小于
- if *cols[zz].FilterOperator == "0" {
- sql += " %s='%s'"
- }else if *cols[zz].FilterOperator == "1" {
- sql += " %s>'%s'"
- }else if *cols[zz].FilterOperator == "2" {
- sql += " %s<'%s'"
- }
- args = append(args,cols[zz].Name)
- args = append(args,*cols[zz].FilterValue)
- }
- //}
- }
- sqlStr := fmt.Sprintf(sql,args...)
- //fmt.Println("----------BBBBBBBBBBBBBBBBBBBBBBBB-----------")
- //fmt.Println(sqlStr)
- //查询
- cnt,_ := dao.GetCount(dbInfo.DbHandle,sqlStr)
- if cnt>0{
- return true,nil
- }
- return false,nil
- }
- return false,nil
- }
- func ServiceUpdateCheck(req *model.Access_Service_Struct, fieldId interface{}){
- }
- func getAccessInfo(accessId string) (string,*model.Access_Service_Struct,*model.Access_Data_Struct,error) {
- key := fmt.Sprintf("%s", accessId)
- body, err := global.RedisClient.Get(key).Result()
- if err != nil {
- return "", nil, nil, err
- }
- var tempMap map[string]interface{}
- err = json.Unmarshal([]byte(body), &tempMap)
- if err != nil {
- //panic(err)
- return "", nil, nil, err
- }
- //for k, values := range tempMap {
- // fmt.Printf("%s: %s\n", k, values) // ... and this line
- //}
- if tempMap["resType"] == "service" {
- //接口服务
- var accessValue model.Access_Service_Struct
- err = json.NewDecoder(strings.NewReader(body)).Decode(&accessValue)
- if err != nil {
- global.SystemLogger.Log(logrus.ErrorLevel, fmt.Sprintf("json转结构体出错, err:%s", err.Error()))
- return "", nil, nil, err
- }
- return "Service", &accessValue, nil, nil
- } else {
- //数据资源
- var accessValue model.Access_Data_Struct
- err = json.NewDecoder(strings.NewReader(body)).Decode(&accessValue)
- if err != nil {
- global.SystemLogger.Log(logrus.ErrorLevel, fmt.Sprintf("json转结构体出错, err:%s", err.Error()))
- return "", nil, nil, err
- }
- return "Data", nil, &accessValue, nil
- }
- }
- func CallSubscribe() error{
- subArray := ReadSubscribeDesc()
- //fmt.Printf("----------------------------sub size:%d--------------------------",len(subArray))
- //fmt.Println()
- for i:=0;i<len(subArray);i++{
- //更加addField 去 获取上一次的最大值
- mapData,err := ReadLatestMaxVal(subArray[i].AccessId)
- if err != nil {
- //可能 key 不存在
- //return err
- }
- if subArray[i].ResType == "data" {
- _,_,dataInfo, err := getAccessInfo(subArray[i].AccessId)
- if err != nil {
- global.SystemLogger.Log(logrus.ErrorLevel,"get access id failed,err:",err.Error())
- return err
- }
- //检测是否存在更新
- filedVal := interface{}(0)
- if mapData != nil && mapData[subArray[i].AddField] != nil {
- filedVal = mapData[subArray[i].AddField]
- }
- //fmt.Printf("------------------filedVal:%v-------------------",filedVal)
- //fmt.Println()
- ret,err := DatabaseUpdateCheck(dataInfo,subArray[i].AddField,filedVal)
- if ret == true{
- //获取最大值
- maxVal,err := DatabaseGetMaxFieldValue(dataInfo,subArray[i].AddField)
- //将最大值保存到缓存里面
- if err != nil {
- global.SystemLogger.Log(logrus.ErrorLevel,"get max field value failed,err:",err.Error())
- }else{
- if maxVal != nil && maxVal[subArray[i].AddField] != nil {
- jsData, _ := json.Marshal(maxVal)
- global.RedisClient.Set(global.RedisDataShare+":"+"subscribe"+":"+subArray[i].AccessId,string(jsData),0)
- //通知客户来取数据
- byteData := []byte("new data arrived")
- global.SystemLogger.Log(logrus.InfoLevel,fmt.Sprintf("--------------------------post subscribe to url:%s-----------------------",dataInfo.SubscribeUrl))
- util.HttpPost(dataInfo.SubscribeUrl,byteData,"aaa")
- }
- }
- }
- }else if subArray[i].ResType == "service" {
- }
- }
- return nil
- }
|