package Nats import ( "Cold_Api/conf" "Cold_Api/controllers/lib" "Cold_Api/models/Account" "Cold_Api/models/Device" "Cold_Api/models/System" "encoding/xml" "fmt" "github.com/astaxie/beego/cache" "github.com/beego/beego/v2/core/logs" "github.com/nats-io/nats.go" "github.com/vmihailenco/msgpack/v5" "strconv" "strings" ) var redisCache_NatsServer cache.Cache func init() { logs.Info("============Nats init============") var err error // 连接Nats服务器 lib.Nats, err = nats.Connect("nats://" + conf.NatsServer_Url) if err != nil { logs.Error("nats 连接失败!") panic(err) } logs.Info("nats OK!") if conf.RunMode == "prod" { go NatsInit() } config := fmt.Sprintf(`{"key":"%s","conn":"%s","dbNum":"%s","password":"%s"}`, "redisCache_NatsServer", conf.Redis_address, conf.Redis_dbNum, conf.Redis_password) logs.Info(config) redisCache_NatsServer, err = cache.NewCache("redis", config) if err != nil || redisCache_NatsServer == nil { errMsg := "failed to init redis" logs.Error(errMsg, err) panic(errMsg) } } func NatsInit() { // 获取微信二维码 返回结果 _, _ = lib.Nats.QueueSubscribe("Wx_BasicMessage_Event_QRCode", "Wx_BasicMessage_Event_QRCode", func(m *nats.Msg) { //logs.Debug(fmt.Sprintf(" => Nats Wx_BasicMessage_Event_QRCode message: %s\n", string(m.Data))) type Person_QRCode struct { ToUserName string `xml:"ToUserName"` //注意这里有个反引号 FromUserName string `xml:"FromUserName"` //注意这里有个反引号 CreateTime string `xml:"CreateTime"` //注意这里有个反引号 EventKey string `xml:"EventKey"` } var person_QRCode Person_QRCode err1 := xml.Unmarshal(m.Data, &person_QRCode) if err1 != nil { logs.Error("Unmarshal error") _ = lib.Nats.Publish(m.Reply, []byte("")) return } // 进入 二维码配对 //logs.Info("FromUserName-", person_QRCode.FromUserName) //logs.Info("EventKey-", person_QRCode.EventKey) // 开始 处理消息 if strings.Contains(person_QRCode.EventKey, "@宝智达冷链 微信公众号通知") { //"请将本内容发送到 深圳市宝智达科技有限公司 微信公众号-|"+lib.AesEncryptCBC(T_calss_id, "0123456789012345")+"|-", Content_r := lib.GetBetweenStr(person_QRCode.EventKey, "-|", "|-") decryptCode := lib.AesDecryptCBC(Content_r, "0123456789012345") T_uuid_name := strings.Split(decryptCode, "/") var uuid, name string if len(T_uuid_name) == 2 { uuid = strings.Split(decryptCode, "/")[0] name = strings.Split(decryptCode, "/")[1] } logs.Info("解密结果 UUID name:", decryptCode, " Content_r:", Content_r) Admin_r, err := Account.Read_Admin_ByUuid(uuid) if err != nil { _ = lib.Nats.Publish(m.Reply, []byte("")) return } T_wx := fmt.Sprintf("%s/%s", person_QRCode.FromUserName, name) // 已绑定 name相同 提示已绑定 // 已绑定 name不同 在数据库修改name if strings.Contains(Admin_r.T_wx, person_QRCode.FromUserName) { list := strings.Split(Admin_r.T_wx, "|") for _, v := range list { if strings.Split(v, "/")[0] == person_QRCode.FromUserName { if v == T_wx { _ = lib.Nats.Publish(m.Reply, []byte(name+" 已绑定!")) return } Admin_r.T_wx = strings.Replace(Admin_r.T_wx, v, T_wx, 1) } } } else { Admin_r.T_wx += T_wx + "|" } Account.Update_Admin(Admin_r, "T_wx") _ = lib.Nats.Publish(m.Reply, []byte(name+" 绑定成功!")) return } _ = lib.Nats.Publish(m.Reply, []byte("")) }) // 请求-响应 验证登录 _, _ = lib.Nats.QueueSubscribe("Cold_User_verification", "Cold_User_verification", func(m *nats.Msg) { logs.Debug(fmt.Sprintf("Cold_User_verification message: %s\n", string(m.Data))) type T_R struct { Code int16 `xml:"Code"` Msg string `xml:"Msg"` Pid int `xml:"Pid"` // 公司id Data Account.Admin_R `xml:"Data"` // 泛型 } var t_R T_R // 验证登录 b_, admin_r := Account.Verification(string(m.Data), "") if !b_ { logs.Error("Unmarshal error") t_R.Code = 201 t_R.Msg = "User_tokey Err!" b, _ := msgpack.Marshal(&t_R) _ = lib.Nats.Publish(m.Reply, b) return } T_pid, is := Account.Redis_Tokey_T_pid_Get(string(m.Data)) if !is { t_R.Code = 201 t_R.Msg = "User_tokey Err!" b, _ := msgpack.Marshal(&t_R) _ = lib.Nats.Publish(m.Reply, b) return } t_R.Pid = T_pid t_R.Code = 200 t_R.Msg = "ok" t_R.Data = Account.AdminToAdmin_R(admin_r) b, _ := msgpack.Marshal(&t_R) _ = lib.Nats.Publish(m.Reply, b) }) // 请求-响应 通过公司名获取所有公司信息 _, _ = lib.Nats.QueueSubscribe("Cold_User_CompanyListAllByT_name", "Cold_User_CompanyListAllByT_name", func(m *nats.Msg) { logs.Debug(fmt.Sprintf("CompanyListAll message: %s\n", string(m.Data))) type T_R struct { Code int16 `xml:"Code"` Msg string `xml:"Msg"` Data []Account.Company `xml:"Data"` // 泛型 } var t_R T_R t_R.Code = 200 t_R.Msg = "ok" t_R.Data = Account.Read_Company_List_All_ByT_name(string(m.Data)) b, _ := msgpack.Marshal(&t_R) _ = lib.Nats.Publish(m.Reply, b) }) // 请求-响应 获取公司 _, _ = lib.Nats.QueueSubscribe("Cold_ReadCompanyByT_id", "Cold_ReadCompanyByT_id", func(m *nats.Msg) { logs.Debug(fmt.Sprintf("Cold_ReadCompanyByT_id message: %s\n", string(m.Data))) type T_R struct { Code int16 `xml:"Code"` Msg string `xml:"Msg"` Data Account.Company `xml:"Data"` // 泛型 } var t_R T_R id, _ := strconv.Atoi(string(m.Data)) company, err := Account.Read_Company_ById(id) if err != nil { t_R.Code = 202 t_R.Msg = err.Error() b, _ := msgpack.Marshal(&t_R) _ = lib.Nats.Publish(m.Reply, b) } t_R.Code = 200 t_R.Msg = "ok" t_R.Data = company b, _ := msgpack.Marshal(&t_R) _ = lib.Nats.Publish(m.Reply, b) }) // 请求-响应 获取公司 通过秘钥 _, _ = lib.Nats.QueueSubscribe("Cold_ReadCompanyByT_key", "Cold_ReadCompanyByT_key", func(m *nats.Msg) { logs.Debug(fmt.Sprintf("Cold_ReadCompanyByT_key message: %s\n", string(m.Data))) type T_R struct { Code int16 `xml:"Code"` Msg string `xml:"Msg"` Data Account.Company `xml:"Data"` // 泛型 } var t_R T_R company, err := Account.Read_Company_ByKey(string(m.Data)) if err != nil { t_R.Code = 202 t_R.Msg = err.Error() b, _ := msgpack.Marshal(&t_R) _ = lib.Nats.Publish(m.Reply, b) } t_R.Code = 200 t_R.Msg = "ok" t_R.Data = company b, _ := msgpack.Marshal(&t_R) _ = lib.Nats.Publish(m.Reply, b) }) // 请求-响应 获取所有用户列表 _, _ = lib.Nats.QueueSubscribe("Cold_User_UserListAll", "Cold_User_UserListAll", func(m *nats.Msg) { type T_R struct { Code int16 `xml:"Code"` Msg string `xml:"Msg"` Data []Account.Admin_R `xml:"Data"` // 泛型 } var t_R T_R t_R.Code = 200 t_R.Msg = "ok" t_R.Data = Account.Read_User_List_All() b, _ := msgpack.Marshal(&t_R) _ = lib.Nats.Publish(m.Reply, b) }) // 请求-响应 检查用户权限 _, _ = lib.Nats.QueueSubscribe("Cold_User_CheckUserPermissions", "Cold_User_CheckUserPermissions", func(m *nats.Msg) { logs.Debug(fmt.Sprintf("Cold_User_CheckUserPermissions message: %s\n", string(m.Data))) type T_Req struct { Power_Id int `xml:"Power_Id"` // 权限id Req_Url string `xml:"Req_Url"` // 请求url } type T_R struct { Code int16 `xml:"Code"` Msg string `xml:"Msg"` Pass bool `xml:"Pass"` // 泛型 } var t_Req T_Req var t_R T_R err := msgpack.Unmarshal(m.Data, &t_Req) if err != nil { t_R.Code = 202 t_R.Msg = "Unmarshal error" b, _ := msgpack.Marshal(&t_R) _ = lib.Nats.Publish(m.Reply, b) return } power, err := Account.Read_Power_ById(t_Req.Power_Id) if err != nil { t_R.Code = 202 t_R.Msg = err.Error() b, _ := msgpack.Marshal(&t_R) _ = lib.Nats.Publish(m.Reply, b) } flag := false if power.T_menu == "*" { flag = true } if !flag { api := Account.Read_API_List_ByPower_Id(power.Id, power.T_menu) for _, v := range api { if strings.Contains(conf.Version+v.T_uri, t_Req.Req_Url) { flag = true break } } } t_R.Code = 200 t_R.Msg = "ok" t_R.Pass = flag b, _ := msgpack.Marshal(&t_R) _ = lib.Nats.Publish(m.Reply, b) }) // 发布-订阅 模式,异步订阅 AddSysLogs _, _ = lib.Nats.QueueSubscribe("Cold_User_AddSysLogs", "Cold_User_AddSysLogs", func(m *nats.Msg) { logs.Debug(fmt.Sprintf("AddSysLogs message: %s\n", string(m.Data))) type T_S struct { T_class string T_title string T_txt interface{} } var t_S T_S err := msgpack.Unmarshal(m.Data, &t_S) if err != nil { System.Add_Logs("Nats", "Nats AddSysLogs 解析失败", string(m.Data)) return } System.Add_Logs_T(t_S.T_class, t_S.T_title, t_S.T_txt) }) // 发布-订阅 模式,异步订阅 AddUserLogs _, _ = lib.Nats.QueueSubscribe("Cold_User_AddUserLogs", "Cold_User_AddUserLogs", func(m *nats.Msg) { logs.Debug(fmt.Sprintf("AddUserLogs message: %s\n", string(m.Data))) type T_S struct { T_uuid string T_class string T_title string T_txt interface{} } var t_S T_S err := msgpack.Unmarshal(m.Data, &t_S) if err != nil { System.Add_Logs("Nats", "Nats AddUserLogs 解析失败", string(m.Data)) return } System.Add_UserLogs_T(t_S.T_uuid, t_S.T_class, t_S.T_title, t_S.T_txt) }) // 请求-响应 获取设备 _, _ = lib.Nats.QueueSubscribe("Cold_ReadDeviceByT_sn", "Cold_ReadDeviceByT_sn", func(m *nats.Msg) { logs.Debug(fmt.Sprintf("CheckUserPermissions message: %s\n", string(m.Data))) type T_R struct { Code int16 `xml:"Code"` Msg string `xml:"Msg"` Data Device.Device `xml:"Data"` // 泛型 } var t_R T_R device, err := Device.Read_Device_ByT_sn(string(m.Data)) if err != nil { t_R.Code = 202 t_R.Msg = "查询失败" b, _ := msgpack.Marshal(&t_R) _ = lib.Nats.Publish(m.Reply, b) } t_R.Code = 200 t_R.Msg = "ok" t_R.Data = device b, _ := msgpack.Marshal(&t_R) _ = lib.Nats.Publish(m.Reply, b) }) // 请求-响应 获取传感器列表 _, _ = lib.Nats.QueueSubscribe("Cold_ReadDeviceSensorALLByT_sn", "Cold_ReadDeviceSensorALLByT_sn", func(m *nats.Msg) { logs.Debug(fmt.Sprintf("CheckUserPermissions message: %s\n", string(m.Data))) type T_R struct { Code int16 `xml:"Code"` Msg string `xml:"Msg"` Data []Device.DeviceSensor `xml:"Data"` // 泛型 } var t_R T_R device := Device.Read_DeviceSensor_ALL_List_T_sn(string(m.Data)) t_R.Code = 200 t_R.Msg = "ok" t_R.Data = device b, _ := msgpack.Marshal(&t_R) _ = lib.Nats.Publish(m.Reply, b) }) // 请求-响应 获取设备数据列表 _, _ = lib.Nats.QueueSubscribe("Cold_ReadDeviceDataListBy_T_snid", "Cold_ReadDeviceDataListBy_T_snid", func(m *nats.Msg) { logs.Debug(fmt.Sprintf("Cold_ReadDeviceDataListBy_T_snid message: %s\n", string(m.Data))) type T_Req struct { T_snid string `xml:"T_snid"` Time_start string `xml:"Time_start"` Time_end string `xml:"Time_end"` Page int `xml:"Page"` Page_z int `xml:"Page_z"` } type T_R struct { Code int16 `xml:"Code"` Msg string `xml:"Msg"` Count int64 `xml:"Count"` Data []Device.DeviceData_R `xml:"Data"` // 泛型 } var t_Req T_Req var t_R T_R err := msgpack.Unmarshal(m.Data, &t_Req) if err != nil { t_R.Code = 202 t_R.Msg = "Unmarshal error" b, _ := msgpack.Marshal(&t_R) _ = lib.Nats.Publish(m.Reply, b) return } deviceData, cnt := Device.Read_DeviceData_By_T_snid_List(t_Req.T_snid, t_Req.Time_start, t_Req.Time_end, t_Req.Page, t_Req.Page_z) t_R.Code = 200 t_R.Msg = "ok" t_R.Count = cnt t_R.Data = deviceData b, _ := msgpack.Marshal(&t_R) _ = lib.Nats.Publish(m.Reply, b) }) // 请求-响应 通过key获取传感器列表 _, _ = lib.Nats.QueueSubscribe("Cold_CompanyDeviceSensor_List_ByKey", "Cold_CompanyDeviceSensor_List_ByKey", func(m *nats.Msg) { logs.Debug(fmt.Sprintf("Cold_CompanyDeviceSensor_List_ByKey message: %s\n", string(m.Data))) type T_Req struct { T_sn string `xml:"T_sn"` T_key string `xml:"T_key"` } type T_R struct { Code int16 `xml:"Code"` Msg string `xml:"Msg"` Count int64 `xml:"Count"` Data []Device.DeviceSensor_R `xml:"Data"` // 泛型 } var t_Req T_Req var t_R T_R err := msgpack.Unmarshal(m.Data, &t_Req) if err != nil { t_R.Code = 202 t_R.Msg = "Unmarshal error" b, _ := msgpack.Marshal(&t_R) _ = lib.Nats.Publish(m.Reply, b) return } // 查询公司 Company_r, err := Account.Read_Company_ByKey(t_Req.T_key) if err != nil { t_R.Code = 202 t_R.Msg = "T_key error" b, _ := msgpack.Marshal(&t_R) _ = lib.Nats.Publish(m.Reply, b) return } // 查询公司下面所有子公司id T_pids := Account.ReadCompanyIds_T_path(Company_r.T_path) Account.Read_Company_All_Maps() deviceData, cnt := Device.Read_CompanyDeviceSensor_List_For_Data_ByKey(T_pids, t_Req.T_sn) t_R.Code = 200 t_R.Msg = "ok" t_R.Count = cnt t_R.Data = deviceData b, _ := msgpack.Marshal(&t_R) _ = lib.Nats.Publish(m.Reply, b) }) // 请求-响应 根据时间获取设备数据(单条)最新数据 _, _ = lib.Nats.QueueSubscribe("Cold_ReadDeviceDataBy_T_snid_T_time", "Cold_ReadDeviceDataBy_T_snid_T_time", func(m *nats.Msg) { logs.Debug(fmt.Sprintf("Cold_ReadDeviceDataBy_T_snid_T_time message: %s\n", string(m.Data))) type T_Req struct { T_sn string `xml:"T_sn"` T_id int `xml:"T_id"` Time string `xml:"Time"` } type T_R struct { Code int16 `xml:"Code"` Msg string `xml:"Msg"` Data Device.DeviceData_ `xml:"Data"` // 泛型 } var t_Req T_Req var t_R T_R err := msgpack.Unmarshal(m.Data, &t_Req) if err != nil { t_R.Code = 202 t_R.Msg = "Unmarshal error" b, _ := msgpack.Marshal(&t_R) _ = lib.Nats.Publish(m.Reply, b) return } deviceData := Device.Read_DeviceData_By_Time(t_Req.T_sn, t_Req.T_id, t_Req.Time) t_R.Code = 200 t_R.Msg = "ok" t_R.Data = deviceData b, _ := msgpack.Marshal(&t_R) _ = lib.Nats.Publish(m.Reply, b) }) // 请求-响应 获取设备 _, _ = lib.Nats.QueueSubscribe("Cold_ReadDevice_List", "Cold_ReadDevice_List", func(m *nats.Msg) { logs.Debug(fmt.Sprintf("Cold_ReadDevice_List message: %s\n", string(m.Data))) type T_Req struct { Key string `xml:"Key"` Name string `xml:"Name"` Page int `xml:"Page"` Page_z int `xml:"Page_z"` } var t_Req T_Req type T_R struct { Code int16 `xml:"Code"` Msg string `xml:"Msg"` Count int64 `xml:"Count"` Data []Device.Device_R `xml:"Data"` // 泛型 } var t_R T_R err := msgpack.Unmarshal(m.Data, &t_Req) if err != nil { t_R.Code = 202 t_R.Msg = "Unmarshal error" b, _ := msgpack.Marshal(&t_R) _ = lib.Nats.Publish(m.Reply, b) return } // 查询公司 Company_r, err := Account.Read_Company_ByKey(t_Req.Key) if err != nil { t_R.Code = 202 t_R.Msg = "T_key error" b, _ := msgpack.Marshal(&t_R) _ = lib.Nats.Publish(m.Reply, b) return } deviceList, count := Device.Read_Device_List(&Account.Admin{T_pid: Company_r.Id}, []string{}, Company_r.Id, t_Req.Name, "", "", 0, t_Req.Page, t_Req.Page_z) t_R.Code = 200 t_R.Msg = "ok" t_R.Count = count t_R.Data = deviceList b, _ := msgpack.Marshal(&t_R) _ = lib.Nats.Publish(m.Reply, b) }) // 发布-订阅 修改设备校准时间 _, _ = lib.Nats.QueueSubscribe("Cold_UpdateDevice_CalibrationTime", "Cold_UpdateDevice_CalibrationTime", func(m *nats.Msg) { logs.Debug(fmt.Sprintf("Cold_UpdateDevice_CalibrationTime message: %s\n", string(m.Data))) type T_Req struct { T_sn string T_CalibrationTime string } type T_R struct { Code int16 `xml:"Code"` Msg string `xml:"Msg"` Data interface{} `xml:"Data"` // 泛型 } var t_R T_R var t_req T_Req err := msgpack.Unmarshal(m.Data, &t_req) if err != nil { t_R.Code = 202 t_R.Msg = "Unmarshal error" b, _ := msgpack.Marshal(&t_R) _ = lib.Nats.Publish(m.Reply, b) return } device, err := Device.Read_Device_ByT_sn(t_req.T_sn) if err != nil { if err.Error() == "record not found" { t_R.Code = 202 t_R.Msg = "SN不存在" b, _ := msgpack.Marshal(&t_R) _ = lib.Nats.Publish(m.Reply, b) return } t_R.Code = 202 t_R.Msg = "查询失败" b, _ := msgpack.Marshal(&t_R) _ = lib.Nats.Publish(m.Reply, b) return } // 校准时间 CalibrationTime, CalibrationTime_is := lib.DateStrToDate(t_req.T_CalibrationTime) if CalibrationTime_is { device.T_CalibrationTime = CalibrationTime } if is := Device.Update_Device(device, "T_CalibrationTime"); !is { t_R.Code = 202 t_R.Msg = "修改校准时间失败" b, _ := msgpack.Marshal(&t_R) _ = lib.Nats.Publish(m.Reply, b) return } t_R.Code = 200 t_R.Msg = "ok" b, _ := msgpack.Marshal(&t_R) _ = lib.Nats.Publish(m.Reply, b) return }) }