123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305 |
- package MqttServer
- import (
- "Cold_mqtt/conf"
- "Cold_mqtt/logs"
- "Cold_mqtt/models/Device"
- "Cold_mqtt/models/Warning"
- "encoding/json"
- "fmt"
- "github.com/astaxie/beego/cache"
- beego "github.com/beego/beego/v2/server/web"
- "github.com/yosssi/gmq/mqtt"
- "github.com/yosssi/gmq/mqtt/client"
- "math"
- "strconv"
- "strings"
- "sync"
- "time"
- )
- var (
- cli *client.Client
- DeviceSn map[string]Device.Device /*创建集合 */
- test = true
- Task_Time int64 // 执行时间
- )
- type DeviceMqttMap_Struct struct {
- T_sn string
- T_data string
- T_num int
- T_tonum int
- }
- var redisCache_Mid cache.Cache
- var DeviceMqttMap map[string]DeviceMqttMap_Struct /*创建集合 */
- var lock sync.Mutex
- func init() {
- Task_Time = time.Now().UnixMilli()
- config := fmt.Sprintf(`{"key":"%s","conn":"%s","dbNum":"%s","password":"%s"}`,
- "redisCache_Mid", conf.Redis_address, conf.Redis_dbNum, conf.Redis_password)
- logs.Println("", config)
- var err error
- redisCache_Mid, err = cache.NewCache("redis", config)
- if err != nil || redisCache_Mid == nil {
- errMsg := "failed to init redis"
- logs.Println(errMsg, err)
- panic(any(errMsg))
- }
- //Task_Etection_Time
- //Task_Etection_x
- DeviceMqttMap = make(map[string]DeviceMqttMap_Struct)
- }
- // 缓存数据发送-确保设备在休眠后 能收到数据
- func DeviceMqttMap_go() {
- logs.Println("=====================DeviceMqttMap_go GO===============")
- time.Sleep(time.Second * 10)
- for true {
- for k, v := range DeviceMqttMap {
- data, ok := DeviceMqttMapGet(k)
- if !ok {
- logs.Println("DeviceMqttMapGet:", k, " !!不存在,跳过!! ")
- continue
- }
- v = data
- logs.Println("DeviceMqttMap:", k, " T_sn:", v.T_sn, " T_data:", v.T_data, " T_num:", v.T_num, " T_tonum:", v.T_tonum)
- if v.T_tonum >= 1 {
- Mqtt_publish(v.T_sn, v.T_data)
- }
- v.T_tonum += 1
- lock.Lock()
- DeviceMqttMap[k] = v
- if v.T_num <= v.T_tonum {
- delete(DeviceMqttMap, k)
- }
- lock.Unlock()
- time.Sleep(time.Millisecond * 100)
- }
- time.Sleep(time.Second * 3)
- }
- }
- func DeviceMqttMapGet(key string) (DeviceMqttMap_Struct, bool) {
- lock.Lock()
- defer lock.Unlock()
- data, ok := DeviceMqttMap[key]
- return data, ok
- }
- func DeviceMqttMapAdd(key string, data DeviceMqttMap_Struct) {
- lock.Lock()
- defer lock.Unlock()
- DeviceMqttMap[key] = data // 缓存数据
- logs.Println("DeviceMqttMapAdd:", key, " T_sn:", data.T_sn, " T_data:", data.T_data, " T_num:", data.T_num, " T_tonum:", data.T_tonum)
- }
- // 获取GetMid缓存
- func GetMid(key string, Type int) bool {
- if !redisCache_Mid.IsExist(key) {
- //redisCache_Mid.Put(key, "", time.Minute*4) // 24-3-26 = 1
- if Type == 2 {
- redisCache_Mid.Put(key, "", time.Minute*10)
- } else {
- redisCache_Mid.Put(key, "", time.Minute*1)
- }
- return false
- }
- return true
- }
- func Run_MqttServer() {
- time.Sleep(3 * time.Second)
- logs.Println("============Run_MqttServer=============", "")
- HTTPPort, _ := beego.AppConfig.String("HTTPPort")
- // Create an MQTT Client.
- cli = client.New(&client.Options{
- // Define the processing of the error handler.
- ErrorHandler: func(err error) {
- logs.PrintlnError("err!!!!!! Run_MqttServer:", err.Error())
- time.Sleep(3 * time.Second)
- go Run_MqttServer() // MQTT 通讯
- return
- },
- })
- // Terminate the Client.
- defer cli.Terminate()
- c := client.ConnectOptions{
- Network: "tcp",
- Address: conf.MqttServer_Url,
- ClientID: []byte(conf.MqttServer_ClientID + HTTPPort),
- UserName: []byte(conf.MqttServer_Username),
- Password: []byte(conf.MqttServer_Password),
- }
- logs.Println("Address:", c.Address)
- logs.Println("ClientID:", string(c.ClientID))
- // Connect to the MQTT Server.
- err := cli.Connect(&c)
- if err != nil {
- logs.Println("MqttServer", "连接MQTT失败 [cli.Connect]", "")
- logs.Println("err!!!!!! Run_MqttServer:", "连接MQTT失败:", err)
- fmt.Println("err!!!!!! 连接MQTT失败:", err)
- cli.Terminate()
- time.Sleep(3 * time.Second)
- go Run_MqttServer() // MQTT 通讯
- return
- }
- // Subscribe to topics.
- err = cli.Subscribe(&client.SubscribeOptions{
- SubReqs: []*client.SubReq{
- &client.SubReq{
- TopicFilter: []byte("/sub/#"),
- QoS: mqtt.QoS0,
- // Define the processing of the message handler.
- Handler: func(topicName, message []byte) {
- logs.PrintlnMqtt("<-" + string(topicName) + " " + string(message))
- messagePubHandlerV3(string(topicName), message)
- },
- },
- &client.SubReq{ // 设备断开
- TopicFilter: []byte("$SYS/brokers/+/clients/+/disconnected"),
- QoS: mqtt.QoS0,
- // Define the processing of the message handler.
- Handler: func(topicName, message []byte) {
- logs.PrintlnMqtt("<-" + string(topicName) + " " + string(message))
- MessageDisconnected(string(topicName), message)
- },
- },
- &client.SubReq{ // 设备上线
- TopicFilter: []byte("$SYS/brokers/+/clients/+/connected"),
- QoS: mqtt.QoS0,
- // Define the processing of the message handler.
- Handler: func(topicName, message []byte) {
- logs.PrintlnMqtt("<-" + string(topicName) + " " + string(message))
- MessageConnected(string(topicName), message)
- },
- },
- },
- })
- if err != nil {
- logs.Println("MqttServer", "订阅消息 [cli.Subscribe]", "")
- logs.Println("err!!!!!! Run_MqttServer:", "连接MQTT失败:", err)
- }
- fmt.Println("MQTT ok!")
- }
- // 冷链 3.0 协议
- func messagePubHandlerV3(topicName string, message []byte) {
- // 本地测试
- logs.Println("=", "============= Mqtt2 JSON =============")
- logs.Println("<-"+topicName, string(message))
- start := time.Now()
- var Ms2_project Ms2m_Project
- err := json.Unmarshal(message, &Ms2_project)
- if err != nil {
- logs.Println("MqttServer", "JSON反序列化失败[Ms_Project]", string(message), err.Error())
- return
- }
- // 过滤
- topicNameS := strings.Split(topicName, "/")
- if len(topicNameS) != 3 {
- logs.Println("MqttServer", "订阅地址错误 len(topicNameS) != 3", strconv.Itoa(len(topicNameS)))
- return
- }
- Sn := topicNameS[2]
- if len(Sn) < 4 {
- logs.Println("MqttServer", "订阅地址错误 len(Sn) < 4", Sn)
- return
- }
- // 过滤重复数据
- SnMid := fmt.Sprintf("%s|%d", Sn, Ms2_project.Msid)
- if GetMid(SnMid, Ms2_project.Type) {
- logs.Println("MqttServer", "SnMid 重复发送!!", SnMid)
- ms2_Return := Ms2_Return{
- Type: Ms2_project.Type,
- Msid: Ms2_project.Msid,
- Status: 1,
- }
- // 回复
- jsonStu, err := json.Marshal(ms2_Return)
- if err != nil {
- logs.Println("回复失败 [Ms_project_0],err=", err)
- return
- }
- Mqtt_publish(Sn, string(jsonStu))
- return
- }
- r_Device, err := Device.Read_Device_ByT_sn(Sn)
- if err != nil {
- Device.Add_DeviceSnOld(Sn)
- logs.Println("没有该设备:", Sn)
- return
- }
- //误差 10秒
- if math.Abs(float64(Ms2_project.Dut-time.Now().Unix())) > 10 && Ms2_project.Dut != 0 {
- logs.Println(fmt.Sprintf("%s 时间相差:%d", r_Device.T_sn, int(math.Abs(float64(Ms2_project.Dut-time.Now().Unix())))))
- Warning.Add_DeviceLogs(103, r_Device, fmt.Sprintf("时间相差:%d", int(math.Abs(float64(Ms2_project.Dut-time.Now().Unix())))))
- RTC_TimeStr(r_Device.T_sn, time.Now().Unix(), 99)
- }
- logs.Println("1r_Device:", r_Device.T_sn, "T_devName:", r_Device.T_devName, "T_Dattery:", r_Device.T_Dattery, "T_Site:", r_Device.T_Site, "T_monitor:", r_Device.T_monitor, "T_online:", r_Device.T_online, "T_online_s:", r_Device.T_online_s)
- //// 更新 状态 为了防止中间有变化,重新获取最新数据
- //r_Device_new, err := Device.Read_Device_ByT_sn(r_Device.T_sn)
- if r_Device.T_online_s == 0 { // 如果 备用网络没有启动
- r_Device.T_online = 1
- Device.Update_Device(r_Device, "T_online", "T_online_s")
- }
- if r_Device.T_online == 0 { // 如果 备用网络没有启动
- r_Device.T_online_s = 1
- Device.Update_Device(r_Device, "T_online", "T_online_s")
- }
- if r_Device.T_online != 1 && r_Device.T_online_s != 1 {
- r_Device.T_online = 1
- Device.Update_Device(r_Device, "T_online", "T_online_s")
- }
- // 开始出来
- AsyncFuncV3(&r_Device, Ms2_project, message) // 开始处理
- logs.Println("========= END Mqtt2 JSON========== ", " mid:", Ms2_project.Msid, " RunTime :", time.Since(start))
- go func() {
- time.Sleep(time.Second)
- // 清除数据缓存 数据
- logs.Println("清除数据缓存:", r_Device.T_sn+"-"+strconv.FormatInt(Ms2_project.Msid, 10))
- lock.Lock()
- delete(DeviceMqttMap, r_Device.T_sn+"-"+strconv.FormatInt(Ms2_project.Msid, 10))
- lock.Unlock()
- }()
- }
- // 发送数据
- func Mqtt_publish(topic string, text string) {
- // Publish a message.
- err := cli.Publish(&client.PublishOptions{
- QoS: mqtt.QoS0,
- TopicName: []byte("/pub/" + topic),
- Message: []byte(text),
- })
- logs.PrintlnMqtt("-> /pub/" + topic + " " + text)
- if err != nil {
- logs.PrintlnError("MqttServer", "发送消息失败 [cli.Publish]", text)
- }
- }
|