package Nats import ( "Cold_mqtt/MqttServer" "Cold_mqtt/MqttServer/WarningNotice" "Cold_mqtt/conf" "Cold_mqtt/lib" "Cold_mqtt/logs" "Cold_mqtt/models/Device" "Cold_mqtt/models/Product" "Cold_mqtt/models/Warning" "github.com/nats-io/nats.go" "github.com/vmihailenco/msgpack/v5" "time" ) func NatsInit() { time.Sleep(time.Second * 3) var err error // 连接Nats服务器 lib.Nats, err = nats.Connect("nats://"+conf.NatsServer_Url, nats.MaxReconnects(10), nats.ReconnectWait(10*time.Second), nats.DisconnectErrHandler(func(nc *nats.Conn, err error) { // handle disconnect error event logs.PrintlnError("nats.DisconnectErrHandler 断开 ", err) }), nats.ReconnectHandler(func(nc *nats.Conn) { // handle reconnect event logs.PrintlnError("nats.ReconnectHandler, 已经重新连接 ") })) if err != nil { logs.Println("nats 连接失败!") panic(any("nats 连接失败!" + "nats://" + conf.NatsServer_Url)) } if err != nil { logs.Println("nats 连接失败!") panic(any(err)) } logs.Println("nats OK!") // 请求-响应, 向 test3 发布一个 `help me` 请求数据,设置超时间3秒,如果有多个响应,只接收第一个收到的消息 //msg, err := lib.Nats.Request("Wx_GenerateQR", []byte("T_uuid"), 3*time.Second) //if err != nil { // logs.PrintlnError("Wx_GenerateQR :",err) //} else { // logs.Println("Wx_GenerateQR : %s\n", string(msg.Data)) // //} // 发布-订阅 模式,异步订阅 test1 _, _ = lib.Nats.Subscribe("Read_DeviceParameter"+conf.MqttServer_id, func(m *nats.Msg) { sn := string(m.Data) logs.Println("Nats Read_DeviceParameter: %s\n", sn) MqttServer.Read_DeviceParameter(sn) }) // 发布-订阅 模式,异步订阅 test1 _, _ = lib.Nats.Subscribe("Read_DeviceSensorParameter"+conf.MqttServer_id, func(m *nats.Msg) { sn := string(m.Data) logs.Println("Nats Read_DeviceSensorParameter: %s\n", sn) MqttServer.Read_DeviceSensorParameter(sn) }) // 发布-订阅 模式,异步订阅 test1 _, _ = lib.Nats.Subscribe("Get_Device_Realtime"+conf.MqttServer_id, func(m *nats.Msg) { sn := string(m.Data) logs.Println("Nats Get_Device_Realtime: 【%s】 ", sn) if len(sn) <= 6 { return } go MqttServer.Get_Device_Realtime(sn) }) // 发布-订阅 模式,异步订阅 test1 _, _ = lib.Nats.Subscribe("Pu_DeviceParameter"+conf.MqttServer_id, func(m *nats.Msg) { logs.Println("Nats Pu_DeviceParameter: %s\n", string(m.Data)) var item Device.DeviceParameter err = msgpack.Unmarshal(m.Data, &item) if err != nil { logs.Println("Pu_DeviceParameter", err) return } MqttServer.Pu_DeviceParameter(item) }) // 发布-订阅 模式,异步订阅 test1 _, _ = lib.Nats.Subscribe("Pu_DeviceParameter_Sensor"+conf.MqttServer_id, func(m *nats.Msg) { logs.Println("Nats Pu_DeviceParameter_Sensor: %s\n", string(m.Data)) var item Device.DeviceSensorParameter err = msgpack.Unmarshal(m.Data, &item) if err != nil { logs.Println("Pu_DeviceParameter_Sensor", err) return } MqttServer.Pu_DeviceParameter_Sensor(item) }) // 发布-订阅 模式,异步订阅 test1 _, _ = lib.Nats.Subscribe("Set_DeviceTask"+conf.MqttServer_id, func(m *nats.Msg) { logs.Println("Nats Set_DeviceTask: %s\n", string(m.Data)) var item Device.Device_task err = msgpack.Unmarshal(m.Data, &item) if err != nil { logs.Println("Set_DeviceTask", err) return } MqttServer.Set_DeviceTask(item) }) // 重启/关机 命令 //1:重启,0:关机; _, _ = lib.Nats.Subscribe("Set_RestartShutdown"+conf.MqttServer_id, func(m *nats.Msg) { logs.Println("Nats Set_RestartShutdown: %s\n", string(m.Data)) var item Device.Device_task err = msgpack.Unmarshal(m.Data, &item) if err != nil { logs.Println("Set_RestartShutdown", err) return } MqttServer.Set_RestartShutdown(item) }) // 版本升级 _, _ = lib.Nats.Subscribe("Up_ProductUpgrade"+conf.MqttServer_id, func(m *nats.Msg) { logs.Println("Nats Up_ProductUpgrade: %s\n", string(m.Data)) var item Product.ProductUpgrade_T err = msgpack.Unmarshal(m.Data, &item) if err != nil { logs.Println("Up_ProductUpgrade", err) return } MqttServer.Up_ProductUpgrade(item) }) //=========================================================================== // 请求-响应, 响应 test3 消息。 _, _ = lib.Nats.Subscribe("AddWarning"+conf.MqttServer_id, func(m *nats.Msg) { logs.Println("Nats AddWarning: %s\n", string(m.Data)) type T_Warning struct { T_tp int `xml:"T_tp"` // 报警类型 ->WarningList T_sn string `xml:"T_sn"` // 设备序列号 T_D_name string `xml:"T_D_name"` // 设备名称 T_id int `xml:"T_id"` // 传感器 ID T_DS_name string `xml:"T_DS_name"` // 传感器名称 T_Remark string `xml:"T_Remark"` // 采集内容 T_Ut time.Time `xml:"T_Ut"` // 采集时间 T_ToAdmin []int `xml:"T_ToAdmin"` // 发送给谁 Admin.Id T_State int `xml:"T_State"` // 0 删除 1 不处理 2 已处理 3 未处理 } type T_R struct { Code int16 `xml:"Code"` Msg string `xml:"Msg"` } var t_R T_R var T_Warning_r T_Warning err = msgpack.Unmarshal(m.Data, &T_Warning_r) if err != nil { println("AddWarning:", err) return } var Warning_r Warning.Warning Warning_r.T_pid = 0 Warning_r.T_tp = T_Warning_r.T_tp Warning_r.T_sn = T_Warning_r.T_sn Warning_r.T_D_name = T_Warning_r.T_D_name Warning_r.T_id = T_Warning_r.T_id Warning_r.T_DS_name = T_Warning_r.T_DS_name Warning_r.T_Remark = T_Warning_r.T_Remark Warning_r.T_Ut = T_Warning_r.T_Ut Warning_r.T_State = T_Warning_r.T_State if Warning_r.T_tp == 1012 { Warning_r.T_State = 3 } WarningNotice.WarningToAdminId(&Warning_r, T_Warning_r.T_ToAdmin) t_R.Code = 200 t_R.Msg = "ok" // 添加报警 _, err = Warning.Add_Warning(Warning_r) if err != nil { t_R.Code = 202 t_R.Msg = "Err" } b, _ := msgpack.Marshal(&t_R) _ = lib.Nats.Publish(m.Reply, b) }) }