123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214 |
- package Nats
- import (
- "Cold_Api/conf"
- "Cold_Api/controllers/lib"
- "Cold_Api/logs"
- "Cold_Api/models/Device"
- "encoding/xml"
- "fmt"
- "github.com/astaxie/beego/cache"
- "github.com/nats-io/nats.go"
- "strconv"
- "strings"
- "time"
- )
- var redisCache_NatsServer cache.Cache
- func init() {
- logs.Println("============Nats init============")
- var err error
- // 连接Nats服务器
- lib.Nats, err = nats.Connect("nats://" + conf.NatsServer_Url)
- if err != nil {
- fmt.Println("nats 连接失败!")
- panic(err)
- }
- logs.Println("nats OK!")
- go NatsInit()
- config := fmt.Sprintf(`{"key":"%s","conn":"%s","dbNum":"%s","password":"%s"}`,
- "redisCache_NatsServer", conf.Redis_address, conf.Redis_dbNum, conf.Redis_password)
- fmt.Println(config)
- redisCache_NatsServer, err = cache.NewCache("redis", config)
- if err != nil || redisCache_NatsServer == nil {
- errMsg := "failed to init redis"
- fmt.Println(errMsg, err)
- panic(errMsg)
- }
- }
- func NatsInit() {
- // 请求-响应, 响应 test3 消息。
- _, _ = lib.Nats.Subscribe("Wx_BasicMessage_Event_QRCode", func(m *nats.Msg) {
- fmt.Printf(" => Nats Wx_BasicMessage_Event_QRCode message: %s\n", string(m.Data))
- type Person_QRCode struct {
- ToUserName string `xml:"ToUserName"` //注意这里有个反引号
- FromUserName string `xml:"FromUserName"` //注意这里有个反引号
- CreateTime string `xml:"CreateTime"` //注意这里有个反引号
- EventKey string `xml:"EventKey"`
- }
- var person_QRCode Person_QRCode
- err1 := xml.Unmarshal(m.Data, &person_QRCode)
- if err1 != nil {
- fmt.Println("Unmarshal error")
- _ = lib.Nats.Publish(m.Reply, []byte(""))
- return
- }
- // 进入 二维码配对
- fmt.Println("FromUserName-", person_QRCode.FromUserName)
- fmt.Println("EventKey-", person_QRCode.EventKey)
- // 开始 处理消息
- if strings.Contains(person_QRCode.EventKey, "@宝智达 微信公众号通知") {
- //"请将本内容发送到 深圳市宝智达科技有限公司 微信公众号-|"+lib.AesEncryptCBC(T_calss_id, "0123456789012345")+"|-",
- Content_r := lib.GetBetweenStr(person_QRCode.EventKey, "-|", "|-")
- decryptCode := lib.AesDecryptCBC(Content_r, "0123456789012345")
- decryptCode_int, err := strconv.Atoi(decryptCode)
- fmt.Println("解密结果:", decryptCode, " decryptCode_int", decryptCode_int, " Content_r:", Content_r)
- R_DeviceClass, err := Device.Read_Class_ById(decryptCode_int)
- if err != nil {
- _ = lib.Nats.Publish(m.Reply, []byte(""))
- return
- }
- if strings.Contains(R_DeviceClass.T_Notice_wx, person_QRCode.FromUserName) {
- _ = lib.Nats.Publish(m.Reply, []byte(R_DeviceClass.T_name+" 已绑定,无需重复绑定!"))
- return
- }
- R_DeviceClass.T_Notice_wx = R_DeviceClass.T_Notice_wx + person_QRCode.FromUserName + "/重令名|"
- Device.Update_Class_ById(R_DeviceClass)
- _ = lib.Nats.Publish(m.Reply, []byte(R_DeviceClass.T_name+" 绑定成功!"))
- return
- }
- _ = lib.Nats.Publish(m.Reply, []byte(""))
- })
- // 请求-响应, 响应 test3 消息。
- _, _ = lib.Nats.Subscribe("Wx2_BasicMessage_Event_QRCode", func(m *nats.Msg) {
- fmt.Printf(" => Nats Wx2_BasicMessage_Event_QRCode message: %s\n", string(m.Data))
- type Person_QRCode struct {
- ToUserName string `xml:"ToUserName"` //注意这里有个反引号
- FromUserName string `xml:"FromUserName"` //注意这里有个反引号
- CreateTime string `xml:"CreateTime"` //注意这里有个反引号
- EventKey string `xml:"EventKey"`
- }
- var person_QRCode Person_QRCode
- err1 := xml.Unmarshal(m.Data, &person_QRCode)
- if err1 != nil {
- fmt.Println("Unmarshal error")
- _ = lib.Nats.Publish(m.Reply, []byte(""))
- return
- }
- // 进入 二维码配对
- fmt.Println("FromUserName-", person_QRCode.FromUserName)
- fmt.Println("EventKey-", person_QRCode.EventKey)
- // 开始 处理消息
- if strings.Contains(person_QRCode.EventKey, "@宝智达 微信公众号通知") {
- //"请将本内容发送到 深圳市宝智达科技有限公司 微信公众号-|"+lib.AesEncryptCBC(T_calss_id, "0123456789012345")+"|-",
- Content_r := lib.GetBetweenStr(person_QRCode.EventKey, "-|", "|-")
- decryptCode := lib.AesDecryptCBC(Content_r, "0123456789012345")
- decryptCode_int, err := strconv.Atoi(decryptCode)
- fmt.Println("解密结果:", decryptCode, " decryptCode_int", decryptCode_int, " Content_r:", Content_r)
- R_DeviceClass, err := Device.Read_Class_ById(decryptCode_int)
- if err != nil {
- _ = lib.Nats.Publish(m.Reply, []byte(""))
- return
- }
- if strings.Contains(R_DeviceClass.T_Notice_wx2, person_QRCode.FromUserName+"/重令名|") {
- _ = lib.Nats.Publish(m.Reply, []byte(R_DeviceClass.T_name+" 重复扫码!请在 5分钟内 回复您的名字,否则将 无法收到报警消息!如回复:张三"))
- return
- }
- if strings.Contains(R_DeviceClass.T_Notice_wx2, person_QRCode.FromUserName) {
- _ = lib.Nats.Publish(m.Reply, []byte(R_DeviceClass.T_name+" 已绑定,无需重复绑定!"))
- return
- }
- R_DeviceClass.T_Notice_wx2 = R_DeviceClass.T_Notice_wx2 + person_QRCode.FromUserName + "/重令名|"
- Device.Update_Class_ById(R_DeviceClass)
- redisCache_NatsServer.Put(person_QRCode.FromUserName, decryptCode_int, 5*time.Minute)
- _ = lib.Nats.Publish(m.Reply, []byte(R_DeviceClass.T_name+" 扫码成功!请在 5分钟内 回复您的名字,否则将 无法收到报警消息!如回复:张三"))
- return
- }
- _ = lib.Nats.Publish(m.Reply, []byte(""))
- })
- // 请求-响应, 响应 test3 消息。
- _, _ = lib.Nats.Subscribe("Wx2_BasicMessage_Text", func(m *nats.Msg) {
- logs.Println(" => Nats Wx2_BasicMessage_Text message: %s\n", string(m.Data))
- type Person_Text struct {
- ToUserName string `xml:"ToUserName"` //注意这里有个反引号
- FromUserName string `xml:"FromUserName"` //注意这里有个反引号
- CreateTime string `xml:"CreateTime"` //注意这里有个反引号
- MsgType string `xml:"MsgType"`
- Content string `xml:"Content"`
- }
- var person_Text Person_Text
- err1 := xml.Unmarshal(m.Data, &person_Text)
- if err1 != nil {
- fmt.Println("Unmarshal error")
- _ = lib.Nats.Publish(m.Reply, []byte(""))
- return
- }
- // 进入 二维码配对
- fmt.Println("FromUserName-", person_Text.FromUserName)
- fmt.Println("Content-", person_Text.Content)
- if len(person_Text.Content) > 8*3 || len(person_Text.Content) <= 1*3 {
- _ = lib.Nats.Publish(m.Reply, []byte("请正确输入您的名字!"))
- return
- }
- if !redisCache_NatsServer.IsExist(person_Text.FromUserName) {
- return
- }
- Class_ById := lib.To_int(redisCache_NatsServer.Get(person_Text.FromUserName))
- R_DeviceClass, err := Device.Read_Class_ById(Class_ById)
- if err != nil {
- _ = lib.Nats.Publish(m.Reply, []byte(""))
- return
- }
- R_DeviceClass.T_Notice_wx2 = strings.Replace(R_DeviceClass.T_Notice_wx2,
- person_Text.FromUserName+"/重令名|",
- person_Text.FromUserName+"/"+person_Text.Content+"|",
- -1)
- Device.Update_Class_ById(R_DeviceClass)
- // 删除 缓存
- redisCache_NatsServer.Delete(person_Text.FromUserName)
- _ = lib.Nats.Publish(m.Reply, []byte("尊敬的 "+person_Text.Content+",您以成功绑定 "+R_DeviceClass.T_name))
- })
- //// 发布-订阅 模式,异步订阅 test1
- //_, _ = Nats.Subscribe("test1", func(m *nats.Msg) {
- // fmt.Printf("Received a message: %s\n", string(m.Data))
- //})
- //// 队列 模式,订阅 test2, 队列为queue, test2 发向所有队列,同一队列只有一个能收到消息
- //_, _ = Nats.QueueSubscribe("test2", "queue", func(msg *nats.Msg) {
- // fmt.Printf("Queue a message: %s\n", string(msg.Data))
- //})
- //// 请求-响应, 响应 test3 消息。
- //_, _ = Nats.Subscribe("test3", func(m *nats.Msg) {
- // fmt.Printf("Reply a message: %s\n", string(m.Data))
- // _ = Nats.Publish(m.Reply, []byte("I can help!!"))
- //})
- }
|