package MqttServer import ( "Cold_mqtt/conf" "Cold_mqtt/logs" "Cold_mqtt/models/Device" "Cold_mqtt/models/Warning" "encoding/json" "fmt" "github.com/astaxie/beego/cache" beego "github.com/beego/beego/v2/server/web" "github.com/yosssi/gmq/mqtt" "github.com/yosssi/gmq/mqtt/client" "math" "strconv" "strings" "sync" "time" ) var ( cli *client.Client DeviceSn map[string]Device.Device /*创建集合 */ test = true Task_Time int64 // 执行时间 ) type DeviceMqttMap_Struct struct { T_sn string T_data string T_num int T_tonum int } var redisCache_Mid cache.Cache var DeviceMqttMap map[string]DeviceMqttMap_Struct /*创建集合 */ var lock sync.Mutex func init() { Task_Time = time.Now().UnixMilli() config := fmt.Sprintf(`{"key":"%s","conn":"%s","dbNum":"%s","password":"%s"}`, "redisCache_Mid", conf.Redis_address, conf.Redis_dbNum, conf.Redis_password) logs.Println("", config) var err error redisCache_Mid, err = cache.NewCache("redis", config) if err != nil || redisCache_Mid == nil { errMsg := "failed to init redis" logs.Println(errMsg, err) panic(any(errMsg)) } //Task_Etection_Time //Task_Etection_x DeviceMqttMap = make(map[string]DeviceMqttMap_Struct) } // 缓存数据发送-确保设备在休眠后 能收到数据 func DeviceMqttMap_go() { logs.Println("=====================DeviceMqttMap_go GO===============") time.Sleep(time.Second * 10) for true { for k, v := range DeviceMqttMap { data, ok := DeviceMqttMapGet(k) if !ok { logs.Println("DeviceMqttMapGet:", k, " !!不存在,跳过!! ") continue } v = data logs.Println("DeviceMqttMap:", k, " T_sn:", v.T_sn, " T_data:", v.T_data, " T_num:", v.T_num, " T_tonum:", v.T_tonum) if v.T_tonum >= 1 { Mqtt_publish(v.T_sn, v.T_data) } v.T_tonum += 1 lock.Lock() DeviceMqttMap[k] = v if v.T_num <= v.T_tonum { delete(DeviceMqttMap, k) } lock.Unlock() time.Sleep(time.Millisecond * 100) } time.Sleep(time.Second * 3) } } func DeviceMqttMapGet(key string) (DeviceMqttMap_Struct, bool) { lock.Lock() defer lock.Unlock() data, ok := DeviceMqttMap[key] return data, ok } func DeviceMqttMapAdd(key string, data DeviceMqttMap_Struct) { lock.Lock() defer lock.Unlock() DeviceMqttMap[key] = data // 缓存数据 logs.Println("DeviceMqttMapAdd:", key, " T_sn:", data.T_sn, " T_data:", data.T_data, " T_num:", data.T_num, " T_tonum:", data.T_tonum) } // 获取GetMid缓存 func GetMid(key string, Type int) bool { if !redisCache_Mid.IsExist(key) { //redisCache_Mid.Put(key, "", time.Minute*4) // 24-3-26 = 1 if Type == 2 { redisCache_Mid.Put(key, "", time.Minute*10) } else { redisCache_Mid.Put(key, "", time.Minute*1) } return false } return true } func Run_MqttServer() { time.Sleep(3 * time.Second) logs.Println("============Run_MqttServer=============", "") HTTPPort, _ := beego.AppConfig.String("HTTPPort") // Create an MQTT Client. cli = client.New(&client.Options{ // Define the processing of the error handler. ErrorHandler: func(err error) { logs.PrintlnError("err!!!!!! Run_MqttServer:", err.Error()) time.Sleep(3 * time.Second) go Run_MqttServer() // MQTT 通讯 return }, }) // Terminate the Client. defer cli.Terminate() c := client.ConnectOptions{ Network: "tcp", Address: conf.MqttServer_Url, ClientID: []byte(conf.MqttServer_ClientID + HTTPPort), UserName: []byte(conf.MqttServer_Username), Password: []byte(conf.MqttServer_Password), } logs.Println("Address:", c.Address) logs.Println("ClientID:", string(c.ClientID)) // Connect to the MQTT Server. err := cli.Connect(&c) if err != nil { logs.Println("MqttServer", "连接MQTT失败 [cli.Connect]", "") logs.Println("err!!!!!! Run_MqttServer:", "连接MQTT失败:", err) fmt.Println("err!!!!!! 连接MQTT失败:", err) cli.Terminate() time.Sleep(3 * time.Second) go Run_MqttServer() // MQTT 通讯 return } // Subscribe to topics. err = cli.Subscribe(&client.SubscribeOptions{ SubReqs: []*client.SubReq{ &client.SubReq{ TopicFilter: []byte("/sub/#"), QoS: mqtt.QoS0, // Define the processing of the message handler. Handler: func(topicName, message []byte) { logs.PrintlnMqtt("<-" + string(topicName) + " " + string(message)) messagePubHandlerV3(string(topicName), message) }, }, &client.SubReq{ // 设备断开 TopicFilter: []byte("$SYS/brokers/+/clients/+/disconnected"), QoS: mqtt.QoS0, // Define the processing of the message handler. Handler: func(topicName, message []byte) { logs.PrintlnMqtt("<-" + string(topicName) + " " + string(message)) MessageDisconnected(string(topicName), message) }, }, &client.SubReq{ // 设备上线 TopicFilter: []byte("$SYS/brokers/+/clients/+/connected"), QoS: mqtt.QoS0, // Define the processing of the message handler. Handler: func(topicName, message []byte) { logs.PrintlnMqtt("<-" + string(topicName) + " " + string(message)) MessageConnected(string(topicName), message) }, }, }, }) if err != nil { logs.Println("MqttServer", "订阅消息 [cli.Subscribe]", "") logs.Println("err!!!!!! Run_MqttServer:", "连接MQTT失败:", err) } fmt.Println("MQTT ok!") } // 冷链 3.0 协议 func messagePubHandlerV3(topicName string, message []byte) { // 本地测试 logs.Println("=", "============= Mqtt2 JSON =============") logs.Println("<-"+topicName, string(message)) start := time.Now() var Ms2_project Ms2m_Project err := json.Unmarshal(message, &Ms2_project) if err != nil { logs.Println("MqttServer", "JSON反序列化失败[Ms_Project]", string(message), err.Error()) return } // 过滤 topicNameS := strings.Split(topicName, "/") if len(topicNameS) != 3 { logs.Println("MqttServer", "订阅地址错误 len(topicNameS) != 3", strconv.Itoa(len(topicNameS))) return } Sn := topicNameS[2] if len(Sn) < 4 { logs.Println("MqttServer", "订阅地址错误 len(Sn) < 4", Sn) return } // 过滤重复数据 SnMid := fmt.Sprintf("%s|%d", Sn, Ms2_project.Msid) if GetMid(SnMid, Ms2_project.Type) { logs.Println("MqttServer", "SnMid 重复发送!!", SnMid) ms2_Return := Ms2_Return{ Type: Ms2_project.Type, Msid: Ms2_project.Msid, Status: 1, } // 回复 jsonStu, err := json.Marshal(ms2_Return) if err != nil { logs.Println("回复失败 [Ms_project_0],err=", err) return } Mqtt_publish(Sn, string(jsonStu)) return } r_Device, err := Device.Read_Device_ByT_sn(Sn) if err != nil { Device.Add_DeviceSnOld(Sn) logs.Println("没有该设备:", Sn) return } //误差 10秒 if math.Abs(float64(Ms2_project.Dut-time.Now().Unix())) > 10 && Ms2_project.Dut != 0 { logs.Println(fmt.Sprintf("%s 时间相差:%d", r_Device.T_sn, int(math.Abs(float64(Ms2_project.Dut-time.Now().Unix()))))) Warning.Add_DeviceLogs(103, r_Device, fmt.Sprintf("时间相差:%d", int(math.Abs(float64(Ms2_project.Dut-time.Now().Unix()))))) RTC_TimeStr(r_Device.T_sn, time.Now().Unix(), 99) } logs.Println("1r_Device:", r_Device.T_sn, "T_devName:", r_Device.T_devName, "T_Dattery:", r_Device.T_Dattery, "T_Site:", r_Device.T_Site, "T_monitor:", r_Device.T_monitor, "T_online:", r_Device.T_online, "T_online_s:", r_Device.T_online_s) //// 更新 状态 为了防止中间有变化,重新获取最新数据 //r_Device_new, err := Device.Read_Device_ByT_sn(r_Device.T_sn) if r_Device.T_online_s == 0 { // 如果 备用网络没有启动 r_Device.T_online = 1 Device.Update_Device(r_Device, "T_online", "T_online_s") } if r_Device.T_online == 0 { // 如果 备用网络没有启动 r_Device.T_online_s = 1 Device.Update_Device(r_Device, "T_online", "T_online_s") } if r_Device.T_online != 1 && r_Device.T_online_s != 1 { r_Device.T_online = 1 Device.Update_Device(r_Device, "T_online", "T_online_s") } // 开始出来 AsyncFuncV3(&r_Device, Ms2_project, message) // 开始处理 logs.Println("========= END Mqtt2 JSON========== ", " mid:", Ms2_project.Msid, " RunTime :", time.Since(start)) go func() { time.Sleep(time.Second) // 清除数据缓存 数据 logs.Println("清除数据缓存:", r_Device.T_sn+"-"+strconv.FormatInt(Ms2_project.Msid, 10)) lock.Lock() delete(DeviceMqttMap, r_Device.T_sn+"-"+strconv.FormatInt(Ms2_project.Msid, 10)) lock.Unlock() }() } // 发送数据 func Mqtt_publish(topic string, text string) { // Publish a message. err := cli.Publish(&client.PublishOptions{ QoS: mqtt.QoS0, TopicName: []byte("/pub/" + topic), Message: []byte(text), }) logs.PrintlnMqtt("-> /pub/" + topic + " " + text) if err != nil { logs.PrintlnError("MqttServer", "发送消息失败 [cli.Publish]", text) } }