package MqttServer import ( "ColdP_server/controllers/lib" "ColdP_server/logs" "ColdP_server/models/Device" "encoding/json" "fmt" "log" "math/rand" "os" "os/signal" "strconv" "strings" "sync" "sync/atomic" "syscall" "time" "github.com/beego/beego/v2/adapter/orm" beego "github.com/beego/beego/v2/server/web" mqtt "github.com/eclipse/paho.mqtt.golang" "github.com/robfig/cron/v3" "github.com/tidwall/gjson" ) var ( mqttSuffix string mqttPort string mqttUsername string mqttPassword string Mqtt_UrlMqttjxit string Mqtt_UrlMqttlodr string MqttjxitCon mqtt.Client MqttlodrCon mqtt.Client ) // var MqttCon = make(map[string]mqtt.Client) var MqttCon = NewSafeMap() type SafeMap struct { mu sync.Mutex m map[string]mqtt.Client } func NewSafeMap() *SafeMap { return &SafeMap{ m: make(map[string]mqtt.Client), } } func (sm *SafeMap) Set(key string, value mqtt.Client) { sm.mu.Lock() defer sm.mu.Unlock() sm.m[key] = value } func (sm *SafeMap) Get(key string) (mqtt.Client, bool) { sm.mu.Lock() defer sm.mu.Unlock() val, ok := sm.m[key] return val, ok } func (sm *SafeMap) Delete(key string) { sm.mu.Lock() defer sm.mu.Unlock() delete(sm.m, key) } type ClodpServer struct { Id int `json:"id"` Sn string `json:"sn"` // 设备序列号 T_id int `json:"t_id"` // 设备id IsTrue int `json:"is_true"` //是否发送成功 T_t float64 `json:"t_t"` //温度 T_h float64 `json:"t_h"` //湿度 Types int `json:"type"` //类型 Speed float64 `json:"speed"` // 传感器采样率 Sense float64 `json:"sense"` // 传感器灵敏度 CreateTime time.Time `json:"create_time"` //auto_now_add 第一次保存时才设置时间 UpdateTime time.Time `json:"update_time"` //auto_now 每次 model 保存时都会对时间自动更新 } func (t *ClodpServer) TableName() string { return "coldp_server" } func init() { var err error mqttSuffix, err = beego.AppConfig.String("Mqtt_suffix") if err != nil { log.Fatalf("Failed to load Mqtt_suffix: %v", err) } mqttPort, err = beego.AppConfig.String("Mqtt_port") if err != nil { log.Fatalf("Failed to load Mqtt_port: %v", err) } mqttUsername, err = beego.AppConfig.String("Mqtt_username") if err != nil { log.Fatalf("Failed to load Mqtt_username: %v", err) } mqttPassword, err = beego.AppConfig.String("Mqtt_password") if err != nil { log.Fatalf("Failed to load Mqtt_password: %v", err) } Mqtt_UrlMqttjxit, err = beego.AppConfig.String("Mqtt_UrlMqttjxit") if err != nil { log.Fatalf("Failed to load Mqtt_password: %v", err) } Mqtt_UrlMqttlodr, err = beego.AppConfig.String("Mqtt_UrlMqttlodr") if err != nil { log.Fatalf("Failed to load Mqtt_password: %v", err) } orm.RegisterModel(new(ClodpServer)) } func MqttConntMqttjxit() { pid := os.Getpid() var clientIdCounter int32 uniqueId := fmt.Sprintf("%d_%d_%d", time.Now().UnixNano(), pid, atomic.AddInt32(&clientIdCounter, 1)) fmt.Println("uniqueId:", uniqueId) opts := mqtt.NewClientOptions().AddBroker(Mqtt_UrlMqttjxit) opts.SetClientID(uniqueId) opts.SetUsername("coldp") opts.SetPassword("EHM5PpXDD579gmp") MqttjxitCon = mqtt.NewClient(opts) MqttCon.Set("mqttjxit", MqttjxitCon) if token := MqttjxitCon.Connect(); token.Wait() && token.Error() != nil { log.Fatal(token.Error()) } MqttjxitCon.Subscribe("/sub/#", 0, onMessageReceived) ch := make(chan os.Signal, 1) signal.Notify(ch, os.Interrupt, syscall.SIGTERM) <-ch // 等待信号 log.Println("Interrupt received, disconnecting...") MqttjxitCon.Disconnect(250) } func MqttConntMqttlodr() { pid := os.Getpid() var clientIdCounter int32 uniqueId := fmt.Sprintf("%d_%d_%d", time.Now().Unix(), pid, atomic.AddInt32(&clientIdCounter, 1)) fmt.Println("uniqueId:", uniqueId) opts := mqtt.NewClientOptions().AddBroker(Mqtt_UrlMqttlodr) opts.SetClientID(uniqueId) opts.SetUsername("coldp") opts.SetPassword("EHM5PpXDD579gmp") MqttlodrCon = mqtt.NewClient(opts) MqttCon.Set("mqttlodr", MqttlodrCon) if token := MqttlodrCon.Connect(); token.Wait() && token.Error() != nil { log.Fatal(token.Error()) } MqttlodrCon.Subscribe("/sub/#", 0, onMessageReceived) ch := make(chan os.Signal, 1) signal.Notify(ch, os.Interrupt, syscall.SIGTERM) <-ch // 等待信号 log.Println("Interrupt received, disconnecting...") MqttlodrCon.Disconnect(250) } func MqttConntMqttyuht() { pid := os.Getpid() var clientIdCounter int32 uniqueId := fmt.Sprintf("%d_%d_%d", time.Now().Unix(), pid, atomic.AddInt32(&clientIdCounter, 1+1)) fmt.Println("uniqueId:", uniqueId) opts := mqtt.NewClientOptions().AddBroker(Mqtt_UrlMqttlodr) opts.SetClientID(uniqueId) opts.SetUsername("coldp") opts.SetPassword("EHM5PpXDD579gmp") MqttlodrCon = mqtt.NewClient(opts) MqttCon.Set("mqttlodr", MqttlodrCon) if token := MqttlodrCon.Connect(); token.Wait() && token.Error() != nil { log.Fatal(token.Error()) } MqttlodrCon.Subscribe("/sub/#", 0, onMessageReceived) ch := make(chan os.Signal, 1) signal.Notify(ch, os.Interrupt, syscall.SIGTERM) <-ch // 等待信号 log.Println("Interrupt received, disconnecting...") MqttlodrCon.Disconnect(250) } type Type5 struct { Dut int `json:"dut"` Type int `json:"type"` Data []struct { Tpreu float64 `json:"tpreu"` Tl float64 `json:"tl"` Hu float64 `json:"hu"` Hprel float64 `json:"hprel"` Hpreu float64 `json:"hpreu"` Sense float64 `json:"sense"` Tu float64 `json:"tu"` Tprel float64 `json:"tprel"` Enprelnote float64 `json:"enprelnote"` Enprel float64 `json:"enprel"` Name string `json:"name"` En float64 `json:"en"` Speed float64 `json:"speed"` Hl float64 `json:"hl"` Free float64 `json:"free"` Id int `json:"id"` } `json:"data"` Mid int `json:"mid"` } func onMessageReceived(client mqtt.Client, msg mqtt.Message) { s := gjson.Get(string(msg.Payload()), "type").Int() //log.Println("MessageID", s) clos := make([]string, 0) switch s { case 7: var t Deviation_Sub err := json.Unmarshal(msg.Payload(), &t) if err != nil { log.Println("json 序列化失败") log.Println("失败参数", string(msg.Payload())) } sn := strings.TrimPrefix(msg.Topic(), "/sub/") log.Println("订阅主题", sn) log.Println("接收到参数", t) if len(t.Data) > 0 { for _, v := range t.Data { coldpServer := ClodpServer{ Sn: sn, T_id: v.Id, IsTrue: 2, T_t: v.T, T_h: v.H, Types: 8, CreateTime: time.Now(), UpdateTime: time.Now(), } coldp, err := FindClodpServerBySnAndId(sn, v.Id, 8) if err != nil { Add_ClodpServer(coldpServer) } if len(coldp.Sn) == 0 { Add_ClodpServer(coldpServer) } if coldp.T_h != v.H || coldp.T_t != v.T || coldp.IsTrue != 2 { coldp.T_t = v.T coldp.T_h = v.H coldp.IsTrue = 2 clos = append(clos, "t_t") clos = append(clos, "t_h") clos = append(clos, "is_true") Update_ClodpServer(coldp, clos...) } } } case 5: var type5 Type5 err := json.Unmarshal(msg.Payload(), &type5) if err != nil { log.Println("json 序列化失败") log.Println("失败参数", string(msg.Payload())) } sn := strings.TrimPrefix(msg.Topic(), "/sub/") log.Println("订阅主题", sn) log.Println("接收到参数", string(msg.Payload())) if len(type5.Data) > 0 { for _, v := range type5.Data { coldpServer := ClodpServer{ Sn: sn, T_id: v.Id, IsTrue: 2, Sense: v.Sense, Speed: v.Speed, Types: 6, CreateTime: time.Now(), UpdateTime: time.Now(), } coldp, err := FindClodpServerBySnAndId(sn, v.Id, 6) if err != nil { Add_ClodpServer(coldpServer) } if len(coldp.Sn) == 0 { Add_ClodpServer(coldpServer) } if coldp.Sense != v.Sense || coldp.Speed != v.Speed || coldp.IsTrue != 2 { coldp.Sense = v.Sense coldp.Speed = v.Speed coldp.IsTrue = 2 clos = append(clos, "sense") clos = append(clos, "speed") clos = append(clos, "is_true") Update_ClodpServer(coldp, clos...) } } } } } func MqttPublish(client mqtt.Client, publishTopic string, pubData []byte) error { log.Println("发送主题", publishTopic, "发送消息", string(pubData)) publish := client.Publish(publishTopic, 0, false, pubData) publish.Wait() if publish.Error() != nil { return fmt.Errorf("发送消息失败 %v", publish.Error()) } return nil } // SendAndWaitForAck 发送消息并等待确认 func SendAndWaitForAck(broker, publishTopic, subfirmTopic string, pubData []byte, timeout time.Duration) (bool, Deviation_Sub, error) { uniqueId := strconv.FormatInt(time.Now().Unix(), 10) + "_" + strconv.Itoa(rand.Intn(1000)) bro := fmt.Sprintf("tcp://%s", broker+"."+mqttSuffix+":"+mqttPort) fmt.Println(bro) opts := mqtt.NewClientOptions().AddBroker(bro) opts.SetClientID(uniqueId) opts.SetUsername(mqttUsername) opts.SetPassword(mqttPassword) opts.SetCleanSession(true) // 启用持久会话 // 创建通道用于接收确认消息 ackChan := make(chan bool, 1) msgChan := make(chan Deviation_Sub, 1) // 设置消息接收回调 opts.SetDefaultPublishHandler(func(client mqtt.Client, msg mqtt.Message) { var deviation_Pub Deviation_Pub err := json.Unmarshal(pubData, &deviation_Pub) if err != nil { log.Println("json序列化失败") } var ackMessage Deviation_Sub log.Println("初始化接收值", string(msg.Payload())) if err := json.Unmarshal(msg.Payload(), &ackMessage); err == nil { if deviation_Pub.Mid == ackMessage.Mid { ackChan <- true msgChan <- ackMessage log.Println("接收到参数", string(msg.Payload())) } } }) client := mqtt.NewClient(opts) if token := client.Connect(); token.Wait() && token.Error() != nil { return false, Deviation_Sub{}, fmt.Errorf("连接失败: %v", token.Error()) } defer client.Disconnect(250) // 订阅确认消息的主题 token := client.Subscribe(subfirmTopic, 0, nil) token.Wait() if token.Error() != nil { return false, Deviation_Sub{}, fmt.Errorf("订阅主题失败: %v", token.Error()) } log.Printf("订阅主题成功 '%s'", subfirmTopic) // 发布消息到指定主题 token = client.Publish(publishTopic, 0, false, pubData) token.Wait() if token.Error() != nil { return false, Deviation_Sub{}, fmt.Errorf("发送消息失败 %v", token.Error()) } else { log.Printf("发送消息成功: %s", pubData) } // 等待确认消息或超时 select { case <-ackChan: receivedMsg := <-msgChan client.Disconnect(250) return true, receivedMsg, nil case <-time.After(timeout): client.Disconnect(250) return false, Deviation_Sub{}, fmt.Errorf("连接超时") } } func SendAndWait(client mqtt.Client, publishTopic, subfirmTopic string, pubData []byte, timeout time.Duration) (bool, Deviation_Sub, error) { // 创建通道用于接收确认消息 ackChan := make(chan bool, 1) msgChan := make(chan Deviation_Sub, 1) if token := client.Connect(); token.Wait() && token.Error() != nil { return false, Deviation_Sub{}, fmt.Errorf("连接失败: %v", token.Error()) } // 订阅确认消息的主题 token := client.Subscribe(subfirmTopic, 0, func(client mqtt.Client, msg mqtt.Message) { var deviation_Pub Deviation_Pub err := json.Unmarshal(pubData, &deviation_Pub) if err != nil { log.Println("json序列化失败") } var ackMessage Deviation_Sub log.Println("初始化接收值", string(msg.Payload())) if err := json.Unmarshal(msg.Payload(), &ackMessage); err == nil { if deviation_Pub.Mid == ackMessage.Mid { ackChan <- true msgChan <- ackMessage log.Println("接收到参数", string(msg.Payload())) } } }) token.Wait() if token.Error() != nil { return false, Deviation_Sub{}, fmt.Errorf("订阅主题失败: %v", token.Error()) } log.Printf("订阅主题成功 '%s'", subfirmTopic) // 发布消息到指定主题 Publishtoken := client.Publish(publishTopic, 0, false, pubData) Publishtoken.Wait() if Publishtoken.Error() != nil { return false, Deviation_Sub{}, fmt.Errorf("发送消息失败 %v", token.Error()) } else { log.Printf("发送消息成功: %s", pubData) } //// 使用 token.Done() 来创建一个 goroutine 监听发布操作的完成 //go func() { // <-Publishtoken.Done() // 等待发布操作完成 // if err := Publishtoken.Error(); err != nil { // log.Printf("发送消息失败: %v", err) // return false, Deviation_Sub{}, fmt.Errorf("发送消息失败 %v", Publishtoken.Error()) // } else { // log.Printf("发送消息成功: %s", pubData) // } //}() // 等待确认消息或超时 select { case <-ackChan: receivedMsg := <-msgChan return true, receivedMsg, nil default: return false, Deviation_Sub{}, fmt.Errorf("订阅失败") } } func Cron() { // 创建一个定时任务对象 c := cron.New(cron.WithSeconds()) // 给对象增加定时任务 cron, _ := beego.AppConfig.String("Cron") c.AddFunc(cron, UpdateDataRepeat) // 启动定时任务 c.Start() defer c.Stop() // 查询语句,阻塞,让main函数不退出,保持程序运行 select {} } func UpdateDataRepeat() { o := orm.NewOrm() table := o.QueryTable(new(ClodpServer)) var clodps []ClodpServer all, err := table.Filter("is_true", 1).All(&clodps) if err != nil { logs.Error("查询参数失败") return // 确保函数在此处结束,因为下面的代码依赖于成功的查询结果 } var wg sync.WaitGroup maxConcurrency := 10 // 设置最大并发数 sem := make(chan struct{}, maxConcurrency) if all > 0 { for _, clodp := range clodps { wg.Add(1) sem <- struct{}{} go func(clodp ClodpServer) { defer wg.Done() defer func() { <-sem }() mqttId := Device.ReadDeviceMqttId(clodp.Sn) if client, ok := MqttCon.Get(mqttId); ok { topicPub := fmt.Sprintf("/pub/%s", clodp.Sn) switch clodp.Types { case 8: pubData, err := json.Marshal(Deviation_Sub{ Sn: clodp.Sn, Type: 8, Mid: time.Now().Unix() + int64(rand.Intn(10)), Data: []Deviation_Sub_Data{ { Id: clodp.T_id, T: clodp.T_t, H: clodp.T_h, }, }, }) if err != nil { logs.Error("下发参数失败", clodp.Sn, "错误信息:", err) return } err = MqttPublish(client, topicPub, pubData) if err != nil { logs.Error("下发参数失败", clodp.Sn, "错误信息:", err) } case 6: pubData, err := json.Marshal(Sensor_Sub{ Type: 6, Sn: clodp.Sn, Mid: int(time.Now().Unix() + int64(rand.Intn(10))), Data: []Sensor_Sub_Data{ { Id: clodp.T_id, Speed: clodp.Speed, Sense: clodp.Sense, }, }, }) if err != nil { logs.Error("下发参数失败", clodp.Sn, "错误信息:", err) return } err = MqttPublish(client, topicPub, pubData) if err != nil { logs.Error("下发参数失败", clodp.Sn, "错误信息:", err) } } } }(clodp) } wg.Wait() } } func Add_ClodpServer(m ClodpServer) (err error) { o := orm.NewOrm() _, err = o.Insert(&m) if err != nil { logs.Error(lib.FuncName(), err) return err } return nil } func FindClodpServerBySnAndId(sn string, tid, types int) (coldp ClodpServer, err error) { o := orm.NewOrm() table := o.QueryTable(new(ClodpServer)) err = table.Filter("sn", sn).Filter("t_id", tid).Filter("types", types).One(&coldp) if err != nil { logs.Error(lib.FuncName(), err) return ClodpServer{}, nil } return coldp, nil } func Update_ClodpServer(r ClodpServer, cols ...string) bool { o := orm.NewOrm() num, err := o.Update(&r, cols...) if err != nil { logs.Error(lib.FuncName(), err) return false } fmt.Println("Number of records updated in database:", num) return true }