123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550 |
- package MqttServer
- import (
- "ColdP_server/controllers/lib"
- "ColdP_server/logs"
- "ColdP_server/models/Device"
- "encoding/json"
- "fmt"
- "log"
- "math/rand"
- "os"
- "os/signal"
- "strconv"
- "strings"
- "sync"
- "sync/atomic"
- "syscall"
- "time"
- "github.com/beego/beego/v2/adapter/orm"
- beego "github.com/beego/beego/v2/server/web"
- mqtt "github.com/eclipse/paho.mqtt.golang"
- "github.com/robfig/cron/v3"
- "github.com/tidwall/gjson"
- )
- var (
- mqttSuffix string
- mqttPort string
- mqttUsername string
- mqttPassword string
- Mqtt_UrlMqttjxit string
- Mqtt_UrlMqttlodr string
- MqttjxitCon mqtt.Client
- MqttlodrCon mqtt.Client
- )
- // var MqttCon = make(map[string]mqtt.Client)
- var MqttCon = NewSafeMap()
- type SafeMap struct {
- mu sync.Mutex
- m map[string]mqtt.Client
- }
- func NewSafeMap() *SafeMap {
- return &SafeMap{
- m: make(map[string]mqtt.Client),
- }
- }
- func (sm *SafeMap) Set(key string, value mqtt.Client) {
- sm.mu.Lock()
- defer sm.mu.Unlock()
- sm.m[key] = value
- }
- func (sm *SafeMap) Get(key string) (mqtt.Client, bool) {
- sm.mu.Lock()
- defer sm.mu.Unlock()
- val, ok := sm.m[key]
- return val, ok
- }
- func (sm *SafeMap) Delete(key string) {
- sm.mu.Lock()
- defer sm.mu.Unlock()
- delete(sm.m, key)
- }
- type ClodpServer struct {
- Id int `json:"id"`
- Sn string `json:"sn"` // 设备序列号
- T_id int `json:"t_id"` // 设备id
- IsTrue int `json:"is_true"` //是否发送成功
- T_t float64 `json:"t_t"` //温度
- T_h float64 `json:"t_h"` //湿度
- Types int `json:"type"` //类型
- Speed float64 `json:"speed"` // 传感器采样率
- Sense float64 `json:"sense"` // 传感器灵敏度
- CreateTime time.Time `json:"create_time"` //auto_now_add 第一次保存时才设置时间
- UpdateTime time.Time `json:"update_time"` //auto_now 每次 model 保存时都会对时间自动更新
- }
- func (t *ClodpServer) TableName() string {
- return "coldp_server"
- }
- func init() {
- var err error
- mqttSuffix, err = beego.AppConfig.String("Mqtt_suffix")
- if err != nil {
- log.Fatalf("Failed to load Mqtt_suffix: %v", err)
- }
- mqttPort, err = beego.AppConfig.String("Mqtt_port")
- if err != nil {
- log.Fatalf("Failed to load Mqtt_port: %v", err)
- }
- mqttUsername, err = beego.AppConfig.String("Mqtt_username")
- if err != nil {
- log.Fatalf("Failed to load Mqtt_username: %v", err)
- }
- mqttPassword, err = beego.AppConfig.String("Mqtt_password")
- if err != nil {
- log.Fatalf("Failed to load Mqtt_password: %v", err)
- }
- Mqtt_UrlMqttjxit, err = beego.AppConfig.String("Mqtt_UrlMqttjxit")
- if err != nil {
- log.Fatalf("Failed to load Mqtt_password: %v", err)
- }
- Mqtt_UrlMqttlodr, err = beego.AppConfig.String("Mqtt_UrlMqttlodr")
- if err != nil {
- log.Fatalf("Failed to load Mqtt_password: %v", err)
- }
- orm.RegisterModel(new(ClodpServer))
- }
- func MqttConntMqttjxit() {
- pid := os.Getpid()
- var clientIdCounter int32
- uniqueId := fmt.Sprintf("%d_%d_%d", time.Now().UnixNano(), pid, atomic.AddInt32(&clientIdCounter, 1))
- fmt.Println("uniqueId:", uniqueId)
- opts := mqtt.NewClientOptions().AddBroker(Mqtt_UrlMqttjxit)
- opts.SetClientID(uniqueId)
- opts.SetUsername("coldp")
- opts.SetPassword("EHM5PpXDD579gmp")
- MqttjxitCon = mqtt.NewClient(opts)
- MqttCon.Set("mqttjxit", MqttjxitCon)
- if token := MqttjxitCon.Connect(); token.Wait() && token.Error() != nil {
- log.Fatal(token.Error())
- }
- MqttjxitCon.Subscribe("/sub/#", 0, onMessageReceived)
- ch := make(chan os.Signal, 1)
- signal.Notify(ch, os.Interrupt, syscall.SIGTERM)
- <-ch // 等待信号
- log.Println("Interrupt received, disconnecting...")
- MqttjxitCon.Disconnect(250)
- }
- func MqttConntMqttlodr() {
- pid := os.Getpid()
- var clientIdCounter int32
- uniqueId := fmt.Sprintf("%d_%d_%d", time.Now().Unix(), pid, atomic.AddInt32(&clientIdCounter, 1))
- fmt.Println("uniqueId:", uniqueId)
- opts := mqtt.NewClientOptions().AddBroker(Mqtt_UrlMqttlodr)
- opts.SetClientID(uniqueId)
- opts.SetUsername("coldp")
- opts.SetPassword("EHM5PpXDD579gmp")
- MqttlodrCon = mqtt.NewClient(opts)
- MqttCon.Set("mqttlodr", MqttlodrCon)
- if token := MqttlodrCon.Connect(); token.Wait() && token.Error() != nil {
- log.Fatal(token.Error())
- }
- MqttlodrCon.Subscribe("/sub/#", 0, onMessageReceived)
- ch := make(chan os.Signal, 1)
- signal.Notify(ch, os.Interrupt, syscall.SIGTERM)
- <-ch // 等待信号
- log.Println("Interrupt received, disconnecting...")
- MqttlodrCon.Disconnect(250)
- }
- func MqttConntMqttyuht() {
- pid := os.Getpid()
- var clientIdCounter int32
- uniqueId := fmt.Sprintf("%d_%d_%d", time.Now().Unix(), pid, atomic.AddInt32(&clientIdCounter, 1+1))
- fmt.Println("uniqueId:", uniqueId)
- opts := mqtt.NewClientOptions().AddBroker(Mqtt_UrlMqttlodr)
- opts.SetClientID(uniqueId)
- opts.SetUsername("coldp")
- opts.SetPassword("EHM5PpXDD579gmp")
- MqttlodrCon = mqtt.NewClient(opts)
- MqttCon.Set("mqttlodr", MqttlodrCon)
- if token := MqttlodrCon.Connect(); token.Wait() && token.Error() != nil {
- log.Fatal(token.Error())
- }
- MqttlodrCon.Subscribe("/sub/#", 0, onMessageReceived)
- ch := make(chan os.Signal, 1)
- signal.Notify(ch, os.Interrupt, syscall.SIGTERM)
- <-ch // 等待信号
- log.Println("Interrupt received, disconnecting...")
- MqttlodrCon.Disconnect(250)
- }
- type Type5 struct {
- Dut int `json:"dut"`
- Type int `json:"type"`
- Data []struct {
- Tpreu float64 `json:"tpreu"`
- Tl float64 `json:"tl"`
- Hu float64 `json:"hu"`
- Hprel float64 `json:"hprel"`
- Hpreu float64 `json:"hpreu"`
- Sense float64 `json:"sense"`
- Tu float64 `json:"tu"`
- Tprel float64 `json:"tprel"`
- Enprelnote float64 `json:"enprelnote"`
- Enprel float64 `json:"enprel"`
- Name string `json:"name"`
- En float64 `json:"en"`
- Speed float64 `json:"speed"`
- Hl float64 `json:"hl"`
- Free float64 `json:"free"`
- Id int `json:"id"`
- } `json:"data"`
- Mid int `json:"mid"`
- }
- func onMessageReceived(client mqtt.Client, msg mqtt.Message) {
- s := gjson.Get(string(msg.Payload()), "type").Int()
- //log.Println("MessageID", s)
- clos := make([]string, 0)
- switch s {
- case 7:
- var t Deviation_Sub
- err := json.Unmarshal(msg.Payload(), &t)
- if err != nil {
- log.Println("json 序列化失败")
- log.Println("失败参数", string(msg.Payload()))
- }
- sn := strings.TrimPrefix(msg.Topic(), "/sub/")
- log.Println("订阅主题", sn)
- log.Println("接收到参数", t)
- if len(t.Data) > 0 {
- for _, v := range t.Data {
- coldpServer := ClodpServer{
- Sn: sn,
- T_id: v.Id,
- IsTrue: 2,
- T_t: v.T,
- T_h: v.H,
- Types: 8,
- CreateTime: time.Now(),
- UpdateTime: time.Now(),
- }
- coldp, err := FindClodpServerBySnAndId(sn, v.Id, 8)
- if err != nil {
- Add_ClodpServer(coldpServer)
- }
- if len(coldp.Sn) == 0 {
- Add_ClodpServer(coldpServer)
- }
- if coldp.T_h != v.H || coldp.T_t != v.T || coldp.IsTrue != 2 {
- coldp.T_t = v.T
- coldp.T_h = v.H
- coldp.IsTrue = 2
- clos = append(clos, "t_t")
- clos = append(clos, "t_h")
- clos = append(clos, "is_true")
- Update_ClodpServer(coldp, clos...)
- }
- }
- }
- case 5:
- var type5 Type5
- err := json.Unmarshal(msg.Payload(), &type5)
- if err != nil {
- log.Println("json 序列化失败")
- log.Println("失败参数", string(msg.Payload()))
- }
- sn := strings.TrimPrefix(msg.Topic(), "/sub/")
- log.Println("订阅主题", sn)
- log.Println("接收到参数", string(msg.Payload()))
- if len(type5.Data) > 0 {
- for _, v := range type5.Data {
- coldpServer := ClodpServer{
- Sn: sn,
- T_id: v.Id,
- IsTrue: 2,
- Sense: v.Sense,
- Speed: v.Speed,
- Types: 6,
- CreateTime: time.Now(),
- UpdateTime: time.Now(),
- }
- coldp, err := FindClodpServerBySnAndId(sn, v.Id, 6)
- if err != nil {
- Add_ClodpServer(coldpServer)
- }
- if len(coldp.Sn) == 0 {
- Add_ClodpServer(coldpServer)
- }
- if coldp.Sense != v.Sense || coldp.Speed != v.Speed || coldp.IsTrue != 2 {
- coldp.Sense = v.Sense
- coldp.Speed = v.Speed
- coldp.IsTrue = 2
- clos = append(clos, "sense")
- clos = append(clos, "speed")
- clos = append(clos, "is_true")
- Update_ClodpServer(coldp, clos...)
- }
- }
- }
- }
- }
- func MqttPublish(client mqtt.Client, publishTopic string, pubData []byte) error {
- log.Println("发送主题", publishTopic, "发送消息", string(pubData))
- publish := client.Publish(publishTopic, 0, false, pubData)
- publish.Wait()
- if publish.Error() != nil {
- return fmt.Errorf("发送消息失败 %v", publish.Error())
- }
- return nil
- }
- // SendAndWaitForAck 发送消息并等待确认
- func SendAndWaitForAck(broker, publishTopic, subfirmTopic string, pubData []byte, timeout time.Duration) (bool, Deviation_Sub, error) {
- uniqueId := strconv.FormatInt(time.Now().Unix(), 10) + "_" + strconv.Itoa(rand.Intn(1000))
- bro := fmt.Sprintf("tcp://%s", broker+"."+mqttSuffix+":"+mqttPort)
- fmt.Println(bro)
- opts := mqtt.NewClientOptions().AddBroker(bro)
- opts.SetClientID(uniqueId)
- opts.SetUsername(mqttUsername)
- opts.SetPassword(mqttPassword)
- opts.SetCleanSession(true) // 启用持久会话
- // 创建通道用于接收确认消息
- ackChan := make(chan bool, 1)
- msgChan := make(chan Deviation_Sub, 1)
- // 设置消息接收回调
- opts.SetDefaultPublishHandler(func(client mqtt.Client, msg mqtt.Message) {
- var deviation_Pub Deviation_Pub
- err := json.Unmarshal(pubData, &deviation_Pub)
- if err != nil {
- log.Println("json序列化失败")
- }
- var ackMessage Deviation_Sub
- log.Println("初始化接收值", string(msg.Payload()))
- if err := json.Unmarshal(msg.Payload(), &ackMessage); err == nil {
- if deviation_Pub.Mid == ackMessage.Mid {
- ackChan <- true
- msgChan <- ackMessage
- log.Println("接收到参数", string(msg.Payload()))
- }
- }
- })
- client := mqtt.NewClient(opts)
- if token := client.Connect(); token.Wait() && token.Error() != nil {
- return false, Deviation_Sub{}, fmt.Errorf("连接失败: %v", token.Error())
- }
- defer client.Disconnect(250)
- // 订阅确认消息的主题
- token := client.Subscribe(subfirmTopic, 0, nil)
- token.Wait()
- if token.Error() != nil {
- return false, Deviation_Sub{}, fmt.Errorf("订阅主题失败: %v", token.Error())
- }
- log.Printf("订阅主题成功 '%s'", subfirmTopic)
- // 发布消息到指定主题
- token = client.Publish(publishTopic, 0, false, pubData)
- token.Wait()
- if token.Error() != nil {
- return false, Deviation_Sub{}, fmt.Errorf("发送消息失败 %v", token.Error())
- } else {
- log.Printf("发送消息成功: %s", pubData)
- }
- // 等待确认消息或超时
- select {
- case <-ackChan:
- receivedMsg := <-msgChan
- client.Disconnect(250)
- return true, receivedMsg, nil
- case <-time.After(timeout):
- client.Disconnect(250)
- return false, Deviation_Sub{}, fmt.Errorf("连接超时")
- }
- }
- func SendAndWait(client mqtt.Client, publishTopic, subfirmTopic string, pubData []byte, timeout time.Duration) (bool, Deviation_Sub, error) {
- // 创建通道用于接收确认消息
- ackChan := make(chan bool, 1)
- msgChan := make(chan Deviation_Sub, 1)
- if token := client.Connect(); token.Wait() && token.Error() != nil {
- return false, Deviation_Sub{}, fmt.Errorf("连接失败: %v", token.Error())
- }
- // 订阅确认消息的主题
- token := client.Subscribe(subfirmTopic, 0, func(client mqtt.Client, msg mqtt.Message) {
- var deviation_Pub Deviation_Pub
- err := json.Unmarshal(pubData, &deviation_Pub)
- if err != nil {
- log.Println("json序列化失败")
- }
- var ackMessage Deviation_Sub
- log.Println("初始化接收值", string(msg.Payload()))
- if err := json.Unmarshal(msg.Payload(), &ackMessage); err == nil {
- if deviation_Pub.Mid == ackMessage.Mid {
- ackChan <- true
- msgChan <- ackMessage
- log.Println("接收到参数", string(msg.Payload()))
- }
- }
- })
- token.Wait()
- if token.Error() != nil {
- return false, Deviation_Sub{}, fmt.Errorf("订阅主题失败: %v", token.Error())
- }
- log.Printf("订阅主题成功 '%s'", subfirmTopic)
- // 发布消息到指定主题
- Publishtoken := client.Publish(publishTopic, 0, false, pubData)
- Publishtoken.Wait()
- if Publishtoken.Error() != nil {
- return false, Deviation_Sub{}, fmt.Errorf("发送消息失败 %v", token.Error())
- } else {
- log.Printf("发送消息成功: %s", pubData)
- }
- //// 使用 token.Done() 来创建一个 goroutine 监听发布操作的完成
- //go func() {
- // <-Publishtoken.Done() // 等待发布操作完成
- // if err := Publishtoken.Error(); err != nil {
- // log.Printf("发送消息失败: %v", err)
- // return false, Deviation_Sub{}, fmt.Errorf("发送消息失败 %v", Publishtoken.Error())
- // } else {
- // log.Printf("发送消息成功: %s", pubData)
- // }
- //}()
- // 等待确认消息或超时
- select {
- case <-ackChan:
- receivedMsg := <-msgChan
- return true, receivedMsg, nil
- default:
- return false, Deviation_Sub{}, fmt.Errorf("订阅失败")
- }
- }
- func Cron() {
- // 创建一个定时任务对象
- c := cron.New(cron.WithSeconds())
- // 给对象增加定时任务
- cron, _ := beego.AppConfig.String("Cron")
- c.AddFunc(cron, UpdateDataRepeat)
- // 启动定时任务
- c.Start()
- defer c.Stop()
- // 查询语句,阻塞,让main函数不退出,保持程序运行
- select {}
- }
- func UpdateDataRepeat() {
- o := orm.NewOrm()
- table := o.QueryTable(new(ClodpServer))
- var clodps []ClodpServer
- all, err := table.Filter("is_true", 1).All(&clodps)
- if err != nil {
- logs.Error("查询参数失败")
- return // 确保函数在此处结束,因为下面的代码依赖于成功的查询结果
- }
- var wg sync.WaitGroup
- maxConcurrency := 10 // 设置最大并发数
- sem := make(chan struct{}, maxConcurrency)
- if all > 0 {
- for _, clodp := range clodps {
- wg.Add(1)
- sem <- struct{}{}
- go func(clodp ClodpServer) {
- defer wg.Done()
- defer func() { <-sem }()
- mqttId := Device.ReadDeviceMqttId(clodp.Sn)
- if client, ok := MqttCon.Get(mqttId); ok {
- topicPub := fmt.Sprintf("/pub/%s", clodp.Sn)
- switch clodp.Types {
- case 8:
- pubData, err := json.Marshal(Deviation_Sub{
- Sn: clodp.Sn,
- Type: 8,
- Mid: time.Now().Unix() + int64(rand.Intn(10)),
- Data: []Deviation_Sub_Data{
- {
- Id: clodp.T_id,
- T: clodp.T_t,
- H: clodp.T_h,
- },
- },
- })
- if err != nil {
- logs.Error("下发参数失败", clodp.Sn, "错误信息:", err)
- return
- }
- err = MqttPublish(client, topicPub, pubData)
- if err != nil {
- logs.Error("下发参数失败", clodp.Sn, "错误信息:", err)
- }
- case 6:
- pubData, err := json.Marshal(Sensor_Sub{
- Type: 6,
- Sn: clodp.Sn,
- Mid: int(time.Now().Unix() + int64(rand.Intn(10))),
- Data: []Sensor_Sub_Data{
- {
- Id: clodp.T_id,
- Speed: clodp.Speed,
- Sense: clodp.Sense,
- },
- },
- })
- if err != nil {
- logs.Error("下发参数失败", clodp.Sn, "错误信息:", err)
- return
- }
- err = MqttPublish(client, topicPub, pubData)
- if err != nil {
- logs.Error("下发参数失败", clodp.Sn, "错误信息:", err)
- }
- }
- }
- }(clodp)
- }
- wg.Wait()
- }
- }
- func Add_ClodpServer(m ClodpServer) (err error) {
- o := orm.NewOrm()
- _, err = o.Insert(&m)
- if err != nil {
- logs.Error(lib.FuncName(), err)
- return err
- }
- return nil
- }
- func FindClodpServerBySnAndId(sn string, tid, types int) (coldp ClodpServer, err error) {
- o := orm.NewOrm()
- table := o.QueryTable(new(ClodpServer))
- err = table.Filter("sn", sn).Filter("t_id", tid).Filter("types", types).One(&coldp)
- if err != nil {
- logs.Error(lib.FuncName(), err)
- return ClodpServer{}, nil
- }
- return coldp, nil
- }
- func Update_ClodpServer(r ClodpServer, cols ...string) bool {
- o := orm.NewOrm()
- num, err := o.Update(&r, cols...)
- if err != nil {
- logs.Error(lib.FuncName(), err)
- return false
- }
- fmt.Println("Number of records updated in database:", num)
- return true
- }
|