123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211 |
- package Nats
- import (
- "Cold_mqtt/MqttServer"
- "Cold_mqtt/MqttServer/WarningNotice"
- "Cold_mqtt/conf"
- "Cold_mqtt/lib"
- "Cold_mqtt/logs"
- "Cold_mqtt/models/Device"
- "Cold_mqtt/models/Product"
- "Cold_mqtt/models/Warning"
- "github.com/nats-io/nats.go"
- "github.com/vmihailenco/msgpack/v5"
- "time"
- )
- func NatsInit() {
- time.Sleep(time.Second * 3)
- var err error
- // 连接Nats服务器
- lib.Nats, err = nats.Connect("nats://"+conf.NatsServer_Url, nats.MaxReconnects(10), nats.ReconnectWait(10*time.Second),
- nats.DisconnectErrHandler(func(nc *nats.Conn, err error) {
- // handle disconnect error event
- logs.PrintlnError("nats.DisconnectErrHandler 断开 ", err)
- }),
- nats.ReconnectHandler(func(nc *nats.Conn) {
- // handle reconnect event
- logs.PrintlnError("nats.ReconnectHandler, 已经重新连接 ")
- }))
- if err != nil {
- logs.Println("nats 连接失败!")
- panic(any("nats 连接失败!" + "nats://" + conf.NatsServer_Url))
- }
- if err != nil {
- logs.Println("nats 连接失败!")
- panic(any(err))
- }
- logs.Println("nats OK!")
- // 请求-响应, 向 test3 发布一个 `help me` 请求数据,设置超时间3秒,如果有多个响应,只接收第一个收到的消息
- //msg, err := lib.Nats.Request("Wx_GenerateQR", []byte("T_uuid"), 3*time.Second)
- //if err != nil {
- // logs.PrintlnError("Wx_GenerateQR :",err)
- //} else {
- // logs.Println("Wx_GenerateQR : %s\n", string(msg.Data))
- //
- //}
- // 发布-订阅 模式,异步订阅 test1
- _, _ = lib.Nats.Subscribe("Read_DeviceParameter"+conf.MqttServer_id, func(m *nats.Msg) {
- sn := string(m.Data)
- logs.Println("Nats Read_DeviceParameter: %s\n", sn)
- MqttServer.Read_DeviceParameter(sn)
- })
- // 发布-订阅 模式,异步订阅 test1
- _, _ = lib.Nats.Subscribe("Read_DeviceSensorParameter"+conf.MqttServer_id, func(m *nats.Msg) {
- sn := string(m.Data)
- logs.Println("Nats Read_DeviceSensorParameter: %s\n", sn)
- MqttServer.Read_DeviceSensorParameter(sn)
- })
- // 发布-订阅 模式,异步订阅 test1
- _, _ = lib.Nats.Subscribe("Get_Device_Realtime"+conf.MqttServer_id, func(m *nats.Msg) {
- sn := string(m.Data)
- logs.Println("Nats Get_Device_Realtime: 【%s】 ", sn)
- if len(sn) <= 6 {
- return
- }
- go MqttServer.Get_Device_Realtime(sn)
- })
- // 发布-订阅 模式,异步订阅 test1
- _, _ = lib.Nats.Subscribe("Pu_DeviceParameter"+conf.MqttServer_id, func(m *nats.Msg) {
- logs.Println("Nats Pu_DeviceParameter: %s\n", string(m.Data))
- var item Device.DeviceParameter
- err = msgpack.Unmarshal(m.Data, &item)
- if err != nil {
- logs.Println("Pu_DeviceParameter", err)
- return
- }
- MqttServer.Pu_DeviceParameter(item)
- })
- // 发布-订阅 模式,异步订阅 test1
- _, _ = lib.Nats.Subscribe("Pu_DeviceParameter_Sensor"+conf.MqttServer_id, func(m *nats.Msg) {
- logs.Println("Nats Pu_DeviceParameter_Sensor: %s\n", string(m.Data))
- var item Device.DeviceSensorParameter
- err = msgpack.Unmarshal(m.Data, &item)
- if err != nil {
- logs.Println("Pu_DeviceParameter_Sensor", err)
- return
- }
- MqttServer.Pu_DeviceParameter_Sensor(item)
- })
- // 发布-订阅 模式,异步订阅 test1
- _, _ = lib.Nats.Subscribe("Set_DeviceTask"+conf.MqttServer_id, func(m *nats.Msg) {
- logs.Println("Nats Set_DeviceTask: %s\n", string(m.Data))
- var item Device.Device_task
- err = msgpack.Unmarshal(m.Data, &item)
- if err != nil {
- logs.Println("Set_DeviceTask", err)
- return
- }
- MqttServer.Set_DeviceTask(item)
- })
- // 重启/关机 命令 //1:重启,0:关机;
- _, _ = lib.Nats.Subscribe("Set_RestartShutdown"+conf.MqttServer_id, func(m *nats.Msg) {
- logs.Println("Nats Set_RestartShutdown: %s\n", string(m.Data))
- var item Device.Device_task
- err = msgpack.Unmarshal(m.Data, &item)
- if err != nil {
- logs.Println("Set_RestartShutdown", err)
- return
- }
- MqttServer.Set_RestartShutdown(item)
- })
- // 版本升级
- _, _ = lib.Nats.Subscribe("Up_ProductUpgrade"+conf.MqttServer_id, func(m *nats.Msg) {
- logs.Println("Nats Up_ProductUpgrade: %s\n", string(m.Data))
- var item Product.ProductUpgrade_T
- err = msgpack.Unmarshal(m.Data, &item)
- if err != nil {
- logs.Println("Up_ProductUpgrade", err)
- return
- }
- MqttServer.Up_ProductUpgrade(item)
- })
- //===========================================================================
- // 请求-响应, 响应 test3 消息。
- _, _ = lib.Nats.Subscribe("AddWarning"+conf.MqttServer_id, func(m *nats.Msg) {
- logs.Println("Nats AddWarning: %s\n", string(m.Data))
- type T_Warning struct {
- T_tp int `xml:"T_tp"` // 报警类型 ->WarningList
- T_sn string `xml:"T_sn"` // 设备序列号
- T_D_name string `xml:"T_D_name"` // 设备名称
- T_id int `xml:"T_id"` // 传感器 ID
- T_DS_name string `xml:"T_DS_name"` // 传感器名称
- T_Remark string `xml:"T_Remark"` // 采集内容
- T_Ut time.Time `xml:"T_Ut"` // 采集时间
- T_ToAdmin []int `xml:"T_ToAdmin"` // 发送给谁 Admin.Id
- T_State int `xml:"T_State"` // 0 删除 1 不处理 2 已处理 3 未处理
- }
- type T_R struct {
- Code int16 `xml:"Code"`
- Msg string `xml:"Msg"`
- }
- var t_R T_R
- var T_Warning_r T_Warning
- err = msgpack.Unmarshal(m.Data, &T_Warning_r)
- if err != nil {
- println("AddWarning:", err)
- return
- }
- var Warning_r Warning.Warning
- Warning_r.T_pid = 0
- Warning_r.T_tp = T_Warning_r.T_tp
- Warning_r.T_sn = T_Warning_r.T_sn
- Warning_r.T_D_name = T_Warning_r.T_D_name
- Warning_r.T_id = T_Warning_r.T_id
- Warning_r.T_DS_name = T_Warning_r.T_DS_name
- Warning_r.T_Remark = T_Warning_r.T_Remark
- Warning_r.T_Ut = T_Warning_r.T_Ut
- Warning_r.T_State = T_Warning_r.T_State
- if Warning_r.T_tp == 1012 {
- Warning_r.T_State = 3
- }
- WarningNotice.WarningToAdminId(&Warning_r, T_Warning_r.T_ToAdmin)
- t_R.Code = 200
- t_R.Msg = "ok"
- // 添加报警
- _, err = Warning.Add_Warning(Warning_r)
- if err != nil {
- t_R.Code = 202
- t_R.Msg = "Err"
- }
- b, _ := msgpack.Marshal(&t_R)
- _ = lib.Nats.Publish(m.Reply, b)
- })
- }
|