123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639 |
- package Nats
- import (
- "Cold_Api/conf"
- "Cold_Api/controllers/lib"
- "Cold_Api/models/Account"
- "Cold_Api/models/Device"
- "Cold_Api/models/System"
- "encoding/xml"
- "fmt"
- "github.com/astaxie/beego/cache"
- "github.com/beego/beego/v2/core/logs"
- "github.com/nats-io/nats.go"
- "github.com/vmihailenco/msgpack/v5"
- "strconv"
- "strings"
- )
- var redisCache_NatsServer cache.Cache
- func init() {
- logs.Info("============Nats init============")
- var err error
- // 连接Nats服务器
- lib.Nats, err = nats.Connect("nats://" + conf.NatsServer_Url)
- if err != nil {
- logs.Error("nats 连接失败!")
- panic(err)
- }
- logs.Info("nats OK!")
- if conf.RunMode == "prod" {
- go NatsInit()
- }
- config := fmt.Sprintf(`{"key":"%s","conn":"%s","dbNum":"%s","password":"%s"}`,
- "redisCache_NatsServer", conf.Redis_address, conf.Redis_dbNum, conf.Redis_password)
- logs.Info(config)
- redisCache_NatsServer, err = cache.NewCache("redis", config)
- if err != nil || redisCache_NatsServer == nil {
- errMsg := "failed to init redis"
- logs.Error(errMsg, err)
- panic(errMsg)
- }
- }
- func NatsInit() {
- // 获取微信二维码 返回结果
- _, _ = lib.Nats.QueueSubscribe("Wx_BasicMessage_Event_QRCode", "Wx_BasicMessage_Event_QRCode", func(m *nats.Msg) {
- //logs.Debug(fmt.Sprintf(" => Nats Wx_BasicMessage_Event_QRCode message: %s\n", string(m.Data)))
- type Person_QRCode struct {
- ToUserName string `xml:"ToUserName"` //注意这里有个反引号
- FromUserName string `xml:"FromUserName"` //注意这里有个反引号
- CreateTime string `xml:"CreateTime"` //注意这里有个反引号
- EventKey string `xml:"EventKey"`
- }
- var person_QRCode Person_QRCode
- err1 := xml.Unmarshal(m.Data, &person_QRCode)
- if err1 != nil {
- logs.Error("Unmarshal error")
- _ = lib.Nats.Publish(m.Reply, []byte(""))
- return
- }
- // 进入 二维码配对
- //logs.Info("FromUserName-", person_QRCode.FromUserName)
- //logs.Info("EventKey-", person_QRCode.EventKey)
- // 开始 处理消息
- if strings.Contains(person_QRCode.EventKey, "@宝智达冷链 微信公众号通知") {
- //"请将本内容发送到 深圳市宝智达科技有限公司 微信公众号-|"+lib.AesEncryptCBC(T_calss_id, "0123456789012345")+"|-",
- Content_r := lib.GetBetweenStr(person_QRCode.EventKey, "-|", "|-")
- decryptCode := lib.AesDecryptCBC(Content_r, "0123456789012345")
- T_uuid_name := strings.Split(decryptCode, "/")
- var uuid, name string
- if len(T_uuid_name) == 2 {
- uuid = strings.Split(decryptCode, "/")[0]
- name = strings.Split(decryptCode, "/")[1]
- }
- logs.Info("解密结果 UUID name:", decryptCode, " Content_r:", Content_r)
- Admin_r, err := Account.Read_Admin_ByUuid(uuid)
- if err != nil {
- _ = lib.Nats.Publish(m.Reply, []byte(""))
- return
- }
- T_wx := fmt.Sprintf("%s/%s", person_QRCode.FromUserName, name)
- // 已绑定 name相同 提示已绑定
- // 已绑定 name不同 在数据库修改name
- if strings.Contains(Admin_r.T_wx, person_QRCode.FromUserName) {
- list := strings.Split(Admin_r.T_wx, "|")
- for _, v := range list {
- if strings.Split(v, "/")[0] == person_QRCode.FromUserName {
- if v == T_wx {
- _ = lib.Nats.Publish(m.Reply, []byte(name+" 已绑定!"))
- return
- }
- Admin_r.T_wx = strings.Replace(Admin_r.T_wx, v, T_wx, 1)
- }
- }
- } else {
- Admin_r.T_wx += T_wx + "|"
- }
- Account.Update_Admin(Admin_r, "T_wx")
- _ = lib.Nats.Publish(m.Reply, []byte(name+" 绑定成功!"))
- return
- }
- _ = lib.Nats.Publish(m.Reply, []byte(""))
- })
- // 请求-响应 验证登录
- _, _ = lib.Nats.QueueSubscribe("Cold_User_verification", "Cold_User_verification", func(m *nats.Msg) {
- logs.Debug(fmt.Sprintf("Cold_User_verification message: %s\n", string(m.Data)))
- type T_R struct {
- Code int16 `xml:"Code"`
- Msg string `xml:"Msg"`
- Pid int `xml:"Pid"` // 公司id
- Data Account.Admin_R `xml:"Data"` // 泛型
- }
- var t_R T_R
- // 验证登录
- b_, admin_r := Account.Verification(string(m.Data), "")
- if !b_ {
- logs.Error("Unmarshal error")
- t_R.Code = 201
- t_R.Msg = "User_tokey Err!"
- b, _ := msgpack.Marshal(&t_R)
- _ = lib.Nats.Publish(m.Reply, b)
- return
- }
- T_pid, is := Account.Redis_Tokey_T_pid_Get(string(m.Data))
- if !is {
- t_R.Code = 201
- t_R.Msg = "User_tokey Err!"
- b, _ := msgpack.Marshal(&t_R)
- _ = lib.Nats.Publish(m.Reply, b)
- return
- }
- t_R.Pid = T_pid
- t_R.Code = 200
- t_R.Msg = "ok"
- t_R.Data = Account.AdminToAdmin_R(admin_r)
- b, _ := msgpack.Marshal(&t_R)
- _ = lib.Nats.Publish(m.Reply, b)
- })
- // 请求-响应 通过公司名获取所有公司信息
- _, _ = lib.Nats.QueueSubscribe("Cold_User_CompanyListAllByT_name", "Cold_User_CompanyListAllByT_name", func(m *nats.Msg) {
- logs.Debug(fmt.Sprintf("CompanyListAll message: %s\n", string(m.Data)))
- type T_R struct {
- Code int16 `xml:"Code"`
- Msg string `xml:"Msg"`
- Data []Account.Company `xml:"Data"` // 泛型
- }
- var t_R T_R
- t_R.Code = 200
- t_R.Msg = "ok"
- t_R.Data = Account.Read_Company_List_All_ByT_name(string(m.Data))
- b, _ := msgpack.Marshal(&t_R)
- _ = lib.Nats.Publish(m.Reply, b)
- })
- // 请求-响应 获取公司
- _, _ = lib.Nats.QueueSubscribe("Cold_ReadCompanyByT_id", "Cold_ReadCompanyByT_id", func(m *nats.Msg) {
- logs.Debug(fmt.Sprintf("Cold_ReadCompanyByT_id message: %s\n", string(m.Data)))
- type T_R struct {
- Code int16 `xml:"Code"`
- Msg string `xml:"Msg"`
- Data Account.Company `xml:"Data"` // 泛型
- }
- var t_R T_R
- id, _ := strconv.Atoi(string(m.Data))
- company, err := Account.Read_Company_ById(id)
- if err != nil {
- t_R.Code = 202
- t_R.Msg = err.Error()
- b, _ := msgpack.Marshal(&t_R)
- _ = lib.Nats.Publish(m.Reply, b)
- }
- t_R.Code = 200
- t_R.Msg = "ok"
- t_R.Data = company
- b, _ := msgpack.Marshal(&t_R)
- _ = lib.Nats.Publish(m.Reply, b)
- })
- // 请求-响应 获取公司 通过秘钥
- _, _ = lib.Nats.QueueSubscribe("Cold_ReadCompanyByT_key", "Cold_ReadCompanyByT_key", func(m *nats.Msg) {
- logs.Debug(fmt.Sprintf("Cold_ReadCompanyByT_key message: %s\n", string(m.Data)))
- type T_R struct {
- Code int16 `xml:"Code"`
- Msg string `xml:"Msg"`
- Data Account.Company `xml:"Data"` // 泛型
- }
- var t_R T_R
- company, err := Account.Read_Company_ByKey(string(m.Data))
- if err != nil {
- t_R.Code = 202
- t_R.Msg = err.Error()
- b, _ := msgpack.Marshal(&t_R)
- _ = lib.Nats.Publish(m.Reply, b)
- }
- t_R.Code = 200
- t_R.Msg = "ok"
- t_R.Data = company
- b, _ := msgpack.Marshal(&t_R)
- _ = lib.Nats.Publish(m.Reply, b)
- })
- // 请求-响应 获取所有用户列表
- _, _ = lib.Nats.QueueSubscribe("Cold_User_UserListAll", "Cold_User_UserListAll", func(m *nats.Msg) {
- type T_R struct {
- Code int16 `xml:"Code"`
- Msg string `xml:"Msg"`
- Data []Account.Admin_R `xml:"Data"` // 泛型
- }
- var t_R T_R
- t_R.Code = 200
- t_R.Msg = "ok"
- t_R.Data = Account.Read_User_List_All()
- b, _ := msgpack.Marshal(&t_R)
- _ = lib.Nats.Publish(m.Reply, b)
- })
- // 请求-响应 检查用户权限
- _, _ = lib.Nats.QueueSubscribe("Cold_User_CheckUserPermissions", "Cold_User_CheckUserPermissions", func(m *nats.Msg) {
- logs.Debug(fmt.Sprintf("Cold_User_CheckUserPermissions message: %s\n", string(m.Data)))
- type T_Req struct {
- Power_Id int `xml:"Power_Id"` // 权限id
- Req_Url string `xml:"Req_Url"` // 请求url
- }
- type T_R struct {
- Code int16 `xml:"Code"`
- Msg string `xml:"Msg"`
- Pass bool `xml:"Pass"` // 泛型
- }
- var t_Req T_Req
- var t_R T_R
- err := msgpack.Unmarshal(m.Data, &t_Req)
- if err != nil {
- t_R.Code = 202
- t_R.Msg = "Unmarshal error"
- b, _ := msgpack.Marshal(&t_R)
- _ = lib.Nats.Publish(m.Reply, b)
- return
- }
- power, err := Account.Read_Power_ById(t_Req.Power_Id)
- if err != nil {
- t_R.Code = 202
- t_R.Msg = err.Error()
- b, _ := msgpack.Marshal(&t_R)
- _ = lib.Nats.Publish(m.Reply, b)
- }
- flag := false
- if power.T_menu == "*" {
- flag = true
- }
- if !flag {
- api := Account.Read_API_List_ByPower_Id(power.Id, power.T_menu)
- for _, v := range api {
- if strings.Contains(conf.Version+v.T_uri, t_Req.Req_Url) {
- flag = true
- break
- }
- }
- }
- t_R.Code = 200
- t_R.Msg = "ok"
- t_R.Pass = flag
- b, _ := msgpack.Marshal(&t_R)
- _ = lib.Nats.Publish(m.Reply, b)
- })
- // 发布-订阅 模式,异步订阅 AddSysLogs
- _, _ = lib.Nats.QueueSubscribe("Cold_User_AddSysLogs", "Cold_User_AddSysLogs", func(m *nats.Msg) {
- logs.Debug(fmt.Sprintf("AddSysLogs message: %s\n", string(m.Data)))
- type T_S struct {
- T_class string
- T_title string
- T_txt interface{}
- }
- var t_S T_S
- err := msgpack.Unmarshal(m.Data, &t_S)
- if err != nil {
- System.Add_Logs("Nats", "Nats AddSysLogs 解析失败", string(m.Data))
- return
- }
- System.Add_Logs_T(t_S.T_class, t_S.T_title, t_S.T_txt)
- })
- // 发布-订阅 模式,异步订阅 AddUserLogs
- _, _ = lib.Nats.QueueSubscribe("Cold_User_AddUserLogs", "Cold_User_AddUserLogs", func(m *nats.Msg) {
- logs.Debug(fmt.Sprintf("AddUserLogs message: %s\n", string(m.Data)))
- type T_S struct {
- T_uuid string
- T_class string
- T_title string
- T_txt interface{}
- }
- var t_S T_S
- err := msgpack.Unmarshal(m.Data, &t_S)
- if err != nil {
- System.Add_Logs("Nats", "Nats AddUserLogs 解析失败", string(m.Data))
- return
- }
- System.Add_UserLogs_T(t_S.T_uuid, t_S.T_class, t_S.T_title, t_S.T_txt)
- })
- // 请求-响应 获取设备
- _, _ = lib.Nats.QueueSubscribe("Cold_ReadDeviceByT_sn", "Cold_ReadDeviceByT_sn", func(m *nats.Msg) {
- logs.Debug(fmt.Sprintf("CheckUserPermissions message: %s\n", string(m.Data)))
- type T_R struct {
- Code int16 `xml:"Code"`
- Msg string `xml:"Msg"`
- Data Device.Device `xml:"Data"` // 泛型
- }
- var t_R T_R
- device, err := Device.Read_Device_ByT_sn(string(m.Data))
- if err != nil {
- t_R.Code = 202
- t_R.Msg = "查询失败"
- b, _ := msgpack.Marshal(&t_R)
- _ = lib.Nats.Publish(m.Reply, b)
- }
- t_R.Code = 200
- t_R.Msg = "ok"
- t_R.Data = device
- b, _ := msgpack.Marshal(&t_R)
- _ = lib.Nats.Publish(m.Reply, b)
- })
- // 请求-响应 获取传感器列表
- _, _ = lib.Nats.QueueSubscribe("Cold_ReadDeviceSensorALLByT_sn", "Cold_ReadDeviceSensorALLByT_sn", func(m *nats.Msg) {
- logs.Debug(fmt.Sprintf("CheckUserPermissions message: %s\n", string(m.Data)))
- type T_R struct {
- Code int16 `xml:"Code"`
- Msg string `xml:"Msg"`
- Data []Device.DeviceSensor `xml:"Data"` // 泛型
- }
- var t_R T_R
- device := Device.Read_DeviceSensor_ALL_List_T_sn(string(m.Data))
- t_R.Code = 200
- t_R.Msg = "ok"
- t_R.Data = device
- b, _ := msgpack.Marshal(&t_R)
- _ = lib.Nats.Publish(m.Reply, b)
- })
- // 请求-响应 获取设备数据列表
- _, _ = lib.Nats.QueueSubscribe("Cold_ReadDeviceDataListBy_T_snid", "Cold_ReadDeviceDataListBy_T_snid", func(m *nats.Msg) {
- logs.Debug(fmt.Sprintf("Cold_ReadDeviceDataListBy_T_snid message: %s\n", string(m.Data)))
- type T_Req struct {
- T_snid string `xml:"T_snid"`
- Time_start string `xml:"Time_start"`
- Time_end string `xml:"Time_end"`
- Page int `xml:"Page"`
- Page_z int `xml:"Page_z"`
- }
- type T_R struct {
- Code int16 `xml:"Code"`
- Msg string `xml:"Msg"`
- Count int64 `xml:"Count"`
- Data []Device.DeviceData_R `xml:"Data"` // 泛型
- }
- var t_Req T_Req
- var t_R T_R
- err := msgpack.Unmarshal(m.Data, &t_Req)
- if err != nil {
- t_R.Code = 202
- t_R.Msg = "Unmarshal error"
- b, _ := msgpack.Marshal(&t_R)
- _ = lib.Nats.Publish(m.Reply, b)
- return
- }
- deviceData, cnt := Device.Read_DeviceData_By_T_snid_List(t_Req.T_snid, t_Req.Time_start, t_Req.Time_end, t_Req.Page, t_Req.Page_z)
- t_R.Code = 200
- t_R.Msg = "ok"
- t_R.Count = cnt
- t_R.Data = deviceData
- b, _ := msgpack.Marshal(&t_R)
- _ = lib.Nats.Publish(m.Reply, b)
- })
- // 请求-响应 通过key获取传感器列表
- _, _ = lib.Nats.QueueSubscribe("Cold_CompanyDeviceSensor_List_ByKey", "Cold_CompanyDeviceSensor_List_ByKey", func(m *nats.Msg) {
- logs.Debug(fmt.Sprintf("Cold_CompanyDeviceSensor_List_ByKey message: %s\n", string(m.Data)))
- type T_Req struct {
- T_sn string `xml:"T_sn"`
- T_key string `xml:"T_key"`
- }
- type T_R struct {
- Code int16 `xml:"Code"`
- Msg string `xml:"Msg"`
- Count int64 `xml:"Count"`
- Data []Device.DeviceSensor_R `xml:"Data"` // 泛型
- }
- var t_Req T_Req
- var t_R T_R
- err := msgpack.Unmarshal(m.Data, &t_Req)
- if err != nil {
- t_R.Code = 202
- t_R.Msg = "Unmarshal error"
- b, _ := msgpack.Marshal(&t_R)
- _ = lib.Nats.Publish(m.Reply, b)
- return
- }
- // 查询公司
- Company_r, err := Account.Read_Company_ByKey(t_Req.T_key)
- if err != nil {
- t_R.Code = 202
- t_R.Msg = "T_key error"
- b, _ := msgpack.Marshal(&t_R)
- _ = lib.Nats.Publish(m.Reply, b)
- return
- }
- // 查询公司下面所有子公司id
- T_pids := Account.ReadCompanyIds_T_path(Company_r.T_path)
- Account.Read_Company_All_Maps()
- deviceData, cnt := Device.Read_CompanyDeviceSensor_List_For_Data_ByKey(T_pids, t_Req.T_sn)
- t_R.Code = 200
- t_R.Msg = "ok"
- t_R.Count = cnt
- t_R.Data = deviceData
- b, _ := msgpack.Marshal(&t_R)
- _ = lib.Nats.Publish(m.Reply, b)
- })
- // 请求-响应 根据时间获取设备数据(单条)最新数据
- _, _ = lib.Nats.QueueSubscribe("Cold_ReadDeviceDataBy_T_snid_T_time", "Cold_ReadDeviceDataBy_T_snid_T_time", func(m *nats.Msg) {
- logs.Debug(fmt.Sprintf("Cold_ReadDeviceDataBy_T_snid_T_time message: %s\n", string(m.Data)))
- type T_Req struct {
- T_sn string `xml:"T_sn"`
- T_id int `xml:"T_id"`
- Time string `xml:"Time"`
- }
- type T_R struct {
- Code int16 `xml:"Code"`
- Msg string `xml:"Msg"`
- Data Device.DeviceData_ `xml:"Data"` // 泛型
- }
- var t_Req T_Req
- var t_R T_R
- err := msgpack.Unmarshal(m.Data, &t_Req)
- if err != nil {
- t_R.Code = 202
- t_R.Msg = "Unmarshal error"
- b, _ := msgpack.Marshal(&t_R)
- _ = lib.Nats.Publish(m.Reply, b)
- return
- }
- deviceData := Device.Read_DeviceData_By_Time(t_Req.T_sn, t_Req.T_id, t_Req.Time)
- t_R.Code = 200
- t_R.Msg = "ok"
- t_R.Data = deviceData
- b, _ := msgpack.Marshal(&t_R)
- _ = lib.Nats.Publish(m.Reply, b)
- })
- // 请求-响应 获取设备
- _, _ = lib.Nats.QueueSubscribe("Cold_ReadDevice_List", "Cold_ReadDevice_List", func(m *nats.Msg) {
- logs.Debug(fmt.Sprintf("Cold_ReadDevice_List message: %s\n", string(m.Data)))
- type T_Req struct {
- Key string `xml:"Key"`
- Name string `xml:"Name"`
- Page int `xml:"Page"`
- Page_z int `xml:"Page_z"`
- }
- var t_Req T_Req
- type T_R struct {
- Code int16 `xml:"Code"`
- Msg string `xml:"Msg"`
- Count int64 `xml:"Count"`
- Data []Device.Device_R `xml:"Data"` // 泛型
- }
- var t_R T_R
- err := msgpack.Unmarshal(m.Data, &t_Req)
- if err != nil {
- t_R.Code = 202
- t_R.Msg = "Unmarshal error"
- b, _ := msgpack.Marshal(&t_R)
- _ = lib.Nats.Publish(m.Reply, b)
- return
- }
- // 查询公司
- Company_r, err := Account.Read_Company_ByKey(t_Req.Key)
- if err != nil {
- t_R.Code = 202
- t_R.Msg = "T_key error"
- b, _ := msgpack.Marshal(&t_R)
- _ = lib.Nats.Publish(m.Reply, b)
- return
- }
- deviceList, count := Device.Read_Device_List(&Account.Admin{T_pid: Company_r.Id}, []string{}, Company_r.Id, t_Req.Name, "", "", 0, t_Req.Page, t_Req.Page_z)
- t_R.Code = 200
- t_R.Msg = "ok"
- t_R.Count = count
- t_R.Data = deviceList
- b, _ := msgpack.Marshal(&t_R)
- _ = lib.Nats.Publish(m.Reply, b)
- })
- // 发布-订阅 修改设备校准时间
- _, _ = lib.Nats.QueueSubscribe("Cold_UpdateDevice_CalibrationTime", "Cold_UpdateDevice_CalibrationTime", func(m *nats.Msg) {
- logs.Debug(fmt.Sprintf("Cold_UpdateDevice_CalibrationTime message: %s\n", string(m.Data)))
- type T_Req struct {
- T_sn string
- T_CalibrationTime string
- }
- type T_R struct {
- Code int16 `xml:"Code"`
- Msg string `xml:"Msg"`
- Data interface{} `xml:"Data"` // 泛型
- }
- var t_R T_R
- var t_req T_Req
- err := msgpack.Unmarshal(m.Data, &t_req)
- if err != nil {
- t_R.Code = 202
- t_R.Msg = "Unmarshal error"
- b, _ := msgpack.Marshal(&t_R)
- _ = lib.Nats.Publish(m.Reply, b)
- return
- }
- device, err := Device.Read_Device_ByT_sn(t_req.T_sn)
- if err != nil {
- if err.Error() == "record not found" {
- t_R.Code = 202
- t_R.Msg = "SN不存在"
- b, _ := msgpack.Marshal(&t_R)
- _ = lib.Nats.Publish(m.Reply, b)
- return
- }
- t_R.Code = 202
- t_R.Msg = "查询失败"
- b, _ := msgpack.Marshal(&t_R)
- _ = lib.Nats.Publish(m.Reply, b)
- return
- }
- // 校准时间
- CalibrationTime, CalibrationTime_is := lib.DateStrToDate(t_req.T_CalibrationTime)
- if CalibrationTime_is {
- device.T_CalibrationTime = CalibrationTime
- }
- if is := Device.Update_Device(device, "T_CalibrationTime"); !is {
- t_R.Code = 202
- t_R.Msg = "修改校准时间失败"
- b, _ := msgpack.Marshal(&t_R)
- _ = lib.Nats.Publish(m.Reply, b)
- return
- }
- t_R.Code = 200
- t_R.Msg = "ok"
- b, _ := msgpack.Marshal(&t_R)
- _ = lib.Nats.Publish(m.Reply, b)
- return
- })
- }
|