package Nats import ( "Cold_Api/conf" "Cold_Api/controllers/lib" "Cold_Api/logs" "Cold_Api/models/Account" "Cold_Api/models/Company" "Cold_Api/models/Device" "Cold_Api/models/System" "encoding/xml" "fmt" "github.com/astaxie/beego/cache" "github.com/nats-io/nats.go" "github.com/vmihailenco/msgpack/v5" "strconv" "strings" "time" ) var redisCache_NatsServer cache.Cache func init() { logs.Println("============Nats init============") var err error // 连接Nats服务器 lib.Nats, err = nats.Connect("nats://" + conf.NatsServer_Url) if err != nil { fmt.Println("nats 连接失败!") panic(err) } logs.Println("nats OK!") go NatsInit() config := fmt.Sprintf(`{"key":"%s","conn":"%s","dbNum":"%s","password":"%s"}`, "redisCache_NatsServer", conf.Redis_address, conf.Redis_dbNum, conf.Redis_password) fmt.Println(config) redisCache_NatsServer, err = cache.NewCache("redis", config) if err != nil || redisCache_NatsServer == nil { errMsg := "failed to init redis" fmt.Println(errMsg, err) panic(errMsg) } } func NatsInit() { // 请求-响应, 响应 test3 消息。 _, _ = lib.Nats.Subscribe("Wx_BasicMessage_Event_QRCode", func(m *nats.Msg) { fmt.Printf(" => Nats Wx_BasicMessage_Event_QRCode message: %s\n", string(m.Data)) type Person_QRCode struct { ToUserName string `xml:"ToUserName"` //注意这里有个反引号 FromUserName string `xml:"FromUserName"` //注意这里有个反引号 CreateTime string `xml:"CreateTime"` //注意这里有个反引号 EventKey string `xml:"EventKey"` } var person_QRCode Person_QRCode err1 := xml.Unmarshal(m.Data, &person_QRCode) if err1 != nil { fmt.Println("Unmarshal error") _ = lib.Nats.Publish(m.Reply, []byte("")) return } // 进入 二维码配对 fmt.Println("FromUserName-", person_QRCode.FromUserName) fmt.Println("EventKey-", person_QRCode.EventKey) // 开始 处理消息 if strings.Contains(person_QRCode.EventKey, "@宝智达 微信公众号通知") { //"请将本内容发送到 深圳市宝智达科技有限公司 微信公众号-|"+lib.AesEncryptCBC(T_calss_id, "0123456789012345")+"|-", Content_r := lib.GetBetweenStr(person_QRCode.EventKey, "-|", "|-") decryptCode := lib.AesDecryptCBC(Content_r, "0123456789012345") decryptCode_int, err := strconv.Atoi(decryptCode) fmt.Println("解密结果:", decryptCode, " decryptCode_int", decryptCode_int, " Content_r:", Content_r) R_DeviceNotice, err := Company.Read_CompanyNotice_ById(decryptCode_int) if err != nil { _ = lib.Nats.Publish(m.Reply, []byte("")) return } if strings.Contains(R_DeviceNotice.T_Notice_wx, person_QRCode.FromUserName) { _ = lib.Nats.Publish(m.Reply, []byte(R_DeviceNotice.T_name+" 已绑定,无需重复绑定!")) return } R_DeviceNotice.T_Notice_wx = R_DeviceNotice.T_Notice_wx + person_QRCode.FromUserName + "/重令名|" Company.Update_CompanyNotice(R_DeviceNotice, "T_Notice_wx") _ = lib.Nats.Publish(m.Reply, []byte(R_DeviceNotice.T_name+" 绑定成功!")) return } _ = lib.Nats.Publish(m.Reply, []byte("")) }) // 请求-响应, 响应 test3 消息。 _, _ = lib.Nats.Subscribe("Wx2_BasicMessage_Event_QRCode", func(m *nats.Msg) { fmt.Printf(" => Nats Wx2_BasicMessage_Event_QRCode message: %s\n", string(m.Data)) type Person_QRCode struct { ToUserName string `xml:"ToUserName"` //注意这里有个反引号 FromUserName string `xml:"FromUserName"` //注意这里有个反引号 CreateTime string `xml:"CreateTime"` //注意这里有个反引号 EventKey string `xml:"EventKey"` } var person_QRCode Person_QRCode err1 := xml.Unmarshal(m.Data, &person_QRCode) if err1 != nil { fmt.Println("Unmarshal error") _ = lib.Nats.Publish(m.Reply, []byte("")) return } // 进入 二维码配对 fmt.Println("FromUserName-", person_QRCode.FromUserName) fmt.Println("EventKey-", person_QRCode.EventKey) // 开始 处理消息 if strings.Contains(person_QRCode.EventKey, "@宝智达 微信公众号通知") { //"请将本内容发送到 深圳市宝智达科技有限公司 微信公众号-|"+lib.AesEncryptCBC(T_calss_id, "0123456789012345")+"|-", Content_r := lib.GetBetweenStr(person_QRCode.EventKey, "-|", "|-") decryptCode := lib.AesDecryptCBC(Content_r, "0123456789012345") decryptCode_int, err := strconv.Atoi(decryptCode) fmt.Println("解密结果:", decryptCode, " decryptCode_int", decryptCode_int, " Content_r:", Content_r) R_DeviceNotice, err := Company.Read_CompanyNotice_ById(decryptCode_int) if err != nil { _ = lib.Nats.Publish(m.Reply, []byte("")) return } if strings.Contains(R_DeviceNotice.T_Notice_wx2, person_QRCode.FromUserName+"/重令名|") { _ = lib.Nats.Publish(m.Reply, []byte(R_DeviceNotice.T_name+" 重复扫码!请在 5分钟内 回复您的名字,否则将 无法收到报警消息!如回复:张三")) return } if strings.Contains(R_DeviceNotice.T_Notice_wx2, person_QRCode.FromUserName) { _ = lib.Nats.Publish(m.Reply, []byte(R_DeviceNotice.T_name+" 已绑定,无需重复绑定!")) return } R_DeviceNotice.T_Notice_wx2 = R_DeviceNotice.T_Notice_wx2 + person_QRCode.FromUserName + "/重令名|" Company.Update_CompanyNotice(R_DeviceNotice, "T_Notice_wx2") redisCache_NatsServer.Put(person_QRCode.FromUserName, decryptCode_int, 5*time.Minute) _ = lib.Nats.Publish(m.Reply, []byte(R_DeviceNotice.T_name+" 扫码成功!请在 5分钟内 回复您的名字,否则将 无法收到报警消息!如回复:张三")) return } _ = lib.Nats.Publish(m.Reply, []byte("")) }) // 请求-响应, 响应 test3 消息。 _, _ = lib.Nats.Subscribe("Wx2_BasicMessage_Text", func(m *nats.Msg) { logs.Println(" => Nats Wx2_BasicMessage_Text message: %s\n", string(m.Data)) type Person_Text struct { ToUserName string `xml:"ToUserName"` //注意这里有个反引号 FromUserName string `xml:"FromUserName"` //注意这里有个反引号 CreateTime string `xml:"CreateTime"` //注意这里有个反引号 MsgType string `xml:"MsgType"` Content string `xml:"Content"` } var person_Text Person_Text err1 := xml.Unmarshal(m.Data, &person_Text) if err1 != nil { fmt.Println("Unmarshal error") _ = lib.Nats.Publish(m.Reply, []byte("")) return } // 进入 二维码配对 fmt.Println("FromUserName-", person_Text.FromUserName) fmt.Println("Content-", person_Text.Content) if len(person_Text.Content) > 8*3 || len(person_Text.Content) <= 1*3 { _ = lib.Nats.Publish(m.Reply, []byte("请正确输入您的名字!")) return } if !redisCache_NatsServer.IsExist(person_Text.FromUserName) { return } Class_ById := lib.To_int(redisCache_NatsServer.Get(person_Text.FromUserName)) R_DeviceNotice, err := Company.Read_CompanyNotice_ById(Class_ById) if err != nil { _ = lib.Nats.Publish(m.Reply, []byte("")) return } R_DeviceNotice.T_Notice_wx2 = strings.Replace(R_DeviceNotice.T_Notice_wx2, person_Text.FromUserName+"/重令名|", person_Text.FromUserName+"/"+person_Text.Content+"|", -1) Company.Update_CompanyNotice(R_DeviceNotice, "T_Notice_wx2") // 删除 缓存 redisCache_NatsServer.Delete(person_Text.FromUserName) _ = lib.Nats.Publish(m.Reply, []byte("尊敬的 "+person_Text.Content+",您以成功绑定 "+R_DeviceNotice.T_name)) }) //// 发布-订阅 模式,异步订阅 test1 //_, _ = Nats.Subscribe("test1", func(m *nats.Msg) { // fmt.Printf("Received a message: %s\n", string(m.Data)) //}) //// 队列 模式,订阅 test2, 队列为queue, test2 发向所有队列,同一队列只有一个能收到消息 //_, _ = Nats.QueueSubscribe("test2", "queue", func(msg *nats.Msg) { // fmt.Printf("Queue a message: %s\n", string(msg.Data)) //}) //// 请求-响应, 响应 test3 消息。 //_, _ = Nats.Subscribe("test3", func(m *nats.Msg) { // fmt.Printf("Reply a message: %s\n", string(m.Data)) // _ = Nats.Publish(m.Reply, []byte("I can help!!")) //}) // 请求-响应 验证登录 _, _ = lib.Nats.Subscribe("Cold_User_verification", func(m *nats.Msg) { fmt.Printf("Cold_User_verification message: %s\n", string(m.Data)) type T_R struct { Code int16 `xml:"Code"` Msg string `xml:"Msg"` Data Account.Admin_R `xml:"Data"` // 泛型 } var t_R T_R // 验证登录 b_, admin_r := lib.Verification(string(m.Data), "") if !b_ { fmt.Println("Unmarshal error") t_R.Code = 201 t_R.Msg = "User_tokey Err!" b, _ := msgpack.Marshal(&t_R) _ = lib.Nats.Publish(m.Reply, b) return } t_R.Code = 200 t_R.Msg = "ok" t_R.Data = Account.AdminToAdmin_R(admin_r) b, _ := msgpack.Marshal(&t_R) _ = lib.Nats.Publish(m.Reply, b) }) // 请求-响应 通过公司名获取所有公司信息 _, _ = lib.Nats.Subscribe("Cold_User_CompanyListAllByT_name", func(m *nats.Msg) { fmt.Printf("CompanyListAll message: %s\n", string(m.Data)) type T_R struct { Code int16 `xml:"Code"` Msg string `xml:"Msg"` Data []Account.Company `xml:"Data"` // 泛型 } var t_R T_R t_R.Code = 200 t_R.Msg = "ok" t_R.Data = Account.Read_Company_List_All_ByT_name(string(m.Data)) b, _ := msgpack.Marshal(&t_R) _ = lib.Nats.Publish(m.Reply, b) }) // 请求-响应 获取所有用户列表 _, _ = lib.Nats.Subscribe("Cold_User_UserListAll", func(m *nats.Msg) { type T_R struct { Code int16 `xml:"Code"` Msg string `xml:"Msg"` Data []Account.Admin_R `xml:"Data"` // 泛型 } var t_R T_R t_R.Code = 200 t_R.Msg = "ok" t_R.Data = Account.Read_Admin_List_All() b, _ := msgpack.Marshal(&t_R) _ = lib.Nats.Publish(m.Reply, b) }) // 请求-响应 检查用户权限 _, _ = lib.Nats.Subscribe("Cold_User_CheckUserPermissions", func(m *nats.Msg) { fmt.Printf("Cold_User_CheckUserPermissions message: %s\n", string(m.Data)) type T_Req struct { Power_Id int `xml:"Power_Id"` // 权限id Req_Url string `xml:"Req_Url"` // 请求url } type T_R struct { Code int16 `xml:"Code"` Msg string `xml:"Msg"` Pass bool `xml:"Pass"` // 泛型 } var t_Req T_Req var t_R T_R err := msgpack.Unmarshal(m.Data, &t_Req) if err != nil { t_R.Code = 201 t_R.Msg = "Unmarshal error" b, _ := msgpack.Marshal(&t_R) _ = lib.Nats.Publish(m.Reply, b) return } power, _ := Account.Read_Power_ById(t_Req.Power_Id) flag := false if power.T_Menu_Bind == "*" { flag = true } if !flag { api := Account.Read_API_List_ByPower_Id(power.Id, power.T_Menu_Bind) for _, v := range api { if conf.Version+v.T_uri == t_Req.Req_Url { flag = true break } } } t_R.Code = 200 t_R.Msg = "ok" t_R.Pass = flag b, _ := msgpack.Marshal(&t_R) _ = lib.Nats.Publish(m.Reply, b) }) // 发布-订阅 模式,异步订阅 AddSysLogs _, _ = lib.Nats.Subscribe("Cold_User_AddSysLogs", func(m *nats.Msg) { fmt.Printf("AddSysLogs message: %s\n", string(m.Data)) type T_S struct { T_class string T_title string T_txt interface{} } var t_S T_S err := msgpack.Unmarshal(m.Data, &t_S) if err != nil { System.Add_Logs("Nats", "Nats AddSysLogs 解析失败", string(m.Data)) return } System.Add_Logs_T(t_S.T_class, t_S.T_title, t_S.T_txt) }) // 发布-订阅 模式,异步订阅 AddUserLogs _, _ = lib.Nats.Subscribe("Cold_User_AddUserLogs", func(m *nats.Msg) { fmt.Printf("AddUserLogs message: %s\n", string(m.Data)) type T_S struct { T_uuid string T_class string T_title string T_txt interface{} } var t_S T_S err := msgpack.Unmarshal(m.Data, &t_S) if err != nil { System.Add_Logs("Nats", "Nats AddUserLogs 解析失败", string(m.Data)) return } System.Add_UserLogs_T(t_S.T_uuid, t_S.T_class, t_S.T_title, t_S.T_txt) }) // 请求-响应 获取设备 _, _ = lib.Nats.Subscribe("Cold_ReadDeviceByT_sn", func(m *nats.Msg) { fmt.Printf("CheckUserPermissions message: %s\n", string(m.Data)) type T_R struct { Code int16 `xml:"Code"` Msg string `xml:"Msg"` Data Device.Device `xml:"Data"` // 泛型 } var t_R T_R device, err := Device.Read_Device_ByT_sn(string(m.Data)) if err != nil { t_R.Code = 202 t_R.Msg = "查询失败" b, _ := msgpack.Marshal(&t_R) _ = lib.Nats.Publish(m.Reply, b) } t_R.Code = 200 t_R.Msg = "ok" t_R.Data = device b, _ := msgpack.Marshal(&t_R) _ = lib.Nats.Publish(m.Reply, b) }) // 请求-响应 获取传感器列表 _, _ = lib.Nats.Subscribe("Cold_ReadDeviceSensorALLByT_sn", func(m *nats.Msg) { fmt.Printf("CheckUserPermissions message: %s\n", string(m.Data)) type T_R struct { Code int16 `xml:"Code"` Msg string `xml:"Msg"` Data []Device.DeviceSensor `xml:"Data"` // 泛型 } var t_R T_R device := Device.Read_DeviceSensor_ALL_List_T_sn(string(m.Data)) t_R.Code = 200 t_R.Msg = "ok" t_R.Data = device b, _ := msgpack.Marshal(&t_R) _ = lib.Nats.Publish(m.Reply, b) }) // 请求-响应 获取设备数据列表 _, _ = lib.Nats.Subscribe("Cold_ReadDeviceDataListBy_T_snid", func(m *nats.Msg) { fmt.Printf("Cold_ReadDeviceDataListBy_T_snid message: %s\n", string(m.Data)) type T_Req struct { T_snid string `xml:"T_snid"` Time_start string `xml:"Time_start"` Time_end string `xml:"Time_end"` Page int `xml:"Page"` Page_z int `xml:"Page_z"` } type T_R struct { Code int16 `xml:"Code"` Msg string `xml:"Msg"` Count int64 `xml:"Count"` Data []Device.DeviceData_R `xml:"Data"` // 泛型 } var t_Req T_Req var t_R T_R err := msgpack.Unmarshal(m.Data, &t_Req) if err != nil { t_R.Code = 201 t_R.Msg = "Unmarshal error" b, _ := msgpack.Marshal(&t_R) _ = lib.Nats.Publish(m.Reply, b) return } deviceData, cnt := Device.Read_DeviceData_By_T_snid_List(t_Req.T_snid, t_Req.Time_start, t_Req.Time_end, t_Req.Page, t_Req.Page_z) t_R.Code = 200 t_R.Msg = "ok" t_R.Count = cnt t_R.Data = deviceData b, _ := msgpack.Marshal(&t_R) _ = lib.Nats.Publish(m.Reply, b) }) }