Mqtt.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559
  1. package MqttServer
  2. import (
  3. "ColdP_server/controllers/lib"
  4. "ColdP_server/logs"
  5. "ColdP_server/models/Device"
  6. "encoding/json"
  7. "fmt"
  8. "github.com/aliyun/credentials-go/credentials/utils"
  9. "log"
  10. "math/rand"
  11. "os"
  12. "os/signal"
  13. "strconv"
  14. "strings"
  15. "sync"
  16. "sync/atomic"
  17. "syscall"
  18. "time"
  19. "github.com/beego/beego/v2/adapter/orm"
  20. beego "github.com/beego/beego/v2/server/web"
  21. mqtt "github.com/eclipse/paho.mqtt.golang"
  22. "github.com/robfig/cron/v3"
  23. "github.com/tidwall/gjson"
  24. )
  25. var (
  26. mqttSuffix string
  27. mqttPort string
  28. mqttUsername string
  29. mqttPassword string
  30. Mqtt_UrlMqttjxit string
  31. Mqtt_UrlMqttlodr string
  32. MqttjxitCon mqtt.Client
  33. MqttlodrCon mqtt.Client
  34. )
  35. // var MqttCon = make(map[string]mqtt.Client)
  36. var MqttCon = NewSafeMap()
  37. type SafeMap struct {
  38. mu sync.Mutex
  39. m map[string]mqtt.Client
  40. }
  41. func NewSafeMap() *SafeMap {
  42. return &SafeMap{
  43. m: make(map[string]mqtt.Client),
  44. }
  45. }
  46. func (sm *SafeMap) Set(key string, value mqtt.Client) {
  47. sm.mu.Lock()
  48. defer sm.mu.Unlock()
  49. sm.m[key] = value
  50. }
  51. func (sm *SafeMap) Get(key string) (mqtt.Client, bool) {
  52. sm.mu.Lock()
  53. defer sm.mu.Unlock()
  54. val, ok := sm.m[key]
  55. return val, ok
  56. }
  57. func (sm *SafeMap) Delete(key string) {
  58. sm.mu.Lock()
  59. defer sm.mu.Unlock()
  60. delete(sm.m, key)
  61. }
  62. type ClodpServer struct {
  63. Id int `json:"id"`
  64. Sn string `json:"sn"` // 设备序列号
  65. T_id int `json:"t_id"` // 设备id
  66. IsTrue int `json:"is_true"` //是否发送成功
  67. T_t float64 `json:"t_t"` //温度
  68. T_h float64 `json:"t_h"` //湿度
  69. Types int `json:"type"` //类型
  70. Speed float64 `json:"speed"` // 传感器采样率
  71. Sense float64 `json:"sense"` // 传感器灵敏度
  72. CreateTime time.Time `json:"create_time"` //auto_now_add 第一次保存时才设置时间
  73. UpdateTime time.Time `json:"update_time"` //auto_now 每次 model 保存时都会对时间自动更新
  74. }
  75. func (t *ClodpServer) TableName() string {
  76. return "coldp_server"
  77. }
  78. func init() {
  79. var err error
  80. mqttSuffix, err = beego.AppConfig.String("Mqtt_suffix")
  81. if err != nil {
  82. log.Fatalf("Failed to load Mqtt_suffix: %v", err)
  83. }
  84. mqttPort, err = beego.AppConfig.String("Mqtt_port")
  85. if err != nil {
  86. log.Fatalf("Failed to load Mqtt_port: %v", err)
  87. }
  88. mqttUsername, err = beego.AppConfig.String("Mqtt_username")
  89. if err != nil {
  90. log.Fatalf("Failed to load Mqtt_username: %v", err)
  91. }
  92. mqttPassword, err = beego.AppConfig.String("Mqtt_password")
  93. if err != nil {
  94. log.Fatalf("Failed to load Mqtt_password: %v", err)
  95. }
  96. Mqtt_UrlMqttjxit, err = beego.AppConfig.String("Mqtt_UrlMqttjxit")
  97. if err != nil {
  98. log.Fatalf("Failed to load Mqtt_password: %v", err)
  99. }
  100. Mqtt_UrlMqttlodr, err = beego.AppConfig.String("Mqtt_UrlMqttlodr")
  101. if err != nil {
  102. log.Fatalf("Failed to load Mqtt_password: %v", err)
  103. }
  104. orm.RegisterModel(new(ClodpServer))
  105. }
  106. func MqttConntMqttjxit() {
  107. pid := os.Getpid()
  108. var clientIdCounter int32
  109. hex := utils.GetUUID()
  110. uniqueId := fmt.Sprintf("%d_%d_%d_%v", time.Now().UnixNano(), pid, atomic.AddInt32(&clientIdCounter, 1), hex)
  111. fmt.Println("uniqueId:", uniqueId)
  112. opts := mqtt.NewClientOptions().AddBroker(Mqtt_UrlMqttjxit)
  113. opts.SetClientID(uniqueId)
  114. opts.SetUsername("coldp")
  115. opts.SetPassword("EHM5PpXDD579gmp")
  116. log.Println(mqttUsername, mqttPassword)
  117. MqttjxitCon = mqtt.NewClient(opts)
  118. MqttCon.Set("mqttjxit", MqttjxitCon)
  119. if token := MqttjxitCon.Connect(); token.Wait() && token.Error() != nil {
  120. log.Println("mqtt连接失败", token.Error())
  121. logs.Error("Mqtt连接失败", token.Error())
  122. }
  123. MqttjxitCon.Subscribe("/sub/#", 0, onMessageReceived)
  124. ch := make(chan os.Signal, 1)
  125. signal.Notify(ch, os.Interrupt, syscall.SIGTERM)
  126. <-ch // 等待信号
  127. log.Println("Interrupt received, disconnecting...")
  128. MqttjxitCon.Disconnect(250)
  129. }
  130. func MqttConntMqttlodr() {
  131. pid := os.Getpid()
  132. var clientIdCounter int32
  133. hex := utils.GetUUID()
  134. uniqueId := fmt.Sprintf("%d_%d_%d_%v", time.Now().UnixNano(), pid, atomic.AddInt32(&clientIdCounter, 1), hex)
  135. fmt.Println("uniqueId:", uniqueId)
  136. opts := mqtt.NewClientOptions().AddBroker(Mqtt_UrlMqttlodr)
  137. opts.SetClientID(uniqueId)
  138. opts.SetUsername(mqttUsername)
  139. opts.SetPassword(mqttPassword)
  140. MqttlodrCon = mqtt.NewClient(opts)
  141. MqttCon.Set("mqttlodr", MqttlodrCon)
  142. if token := MqttlodrCon.Connect(); token.Wait() && token.Error() != nil {
  143. log.Println("mqtt连接失败", token.Error())
  144. logs.Error("Mqtt连接失败", token.Error())
  145. }
  146. MqttlodrCon.Subscribe("/sub/#", 0, onMessageReceived)
  147. ch := make(chan os.Signal, 1)
  148. signal.Notify(ch, os.Interrupt, syscall.SIGTERM)
  149. <-ch // 等待信号
  150. log.Println("Interrupt received, disconnecting...")
  151. MqttlodrCon.Disconnect(250)
  152. }
  153. func MqttConntMqttyuht() {
  154. pid := os.Getpid()
  155. var clientIdCounter int32
  156. hex := utils.GetUUID()
  157. uniqueId := fmt.Sprintf("%d_%d_%d_%v", time.Now().UnixNano(), pid, atomic.AddInt32(&clientIdCounter, 1), hex)
  158. fmt.Println("uniqueId:", uniqueId)
  159. opts := mqtt.NewClientOptions().AddBroker(Mqtt_UrlMqttlodr)
  160. opts.SetClientID(uniqueId)
  161. opts.SetUsername(mqttUsername)
  162. opts.SetPassword(mqttPassword)
  163. MqttlodrCon = mqtt.NewClient(opts)
  164. MqttCon.Set("mqttlodr", MqttlodrCon)
  165. if token := MqttlodrCon.Connect(); token.Wait() && token.Error() != nil {
  166. log.Println("mqtt连接失败", token.Error())
  167. logs.Error("Mqtt连接失败", token.Error())
  168. }
  169. MqttlodrCon.Subscribe("/sub/#", 0, onMessageReceived)
  170. ch := make(chan os.Signal, 1)
  171. signal.Notify(ch, os.Interrupt, syscall.SIGTERM)
  172. <-ch // 等待信号
  173. log.Println("Interrupt received, disconnecting...")
  174. MqttlodrCon.Disconnect(250)
  175. }
  176. type Type5 struct {
  177. Dut int `json:"dut"`
  178. Type int `json:"type"`
  179. Data []struct {
  180. Tpreu float64 `json:"tpreu"`
  181. Tl float64 `json:"tl"`
  182. Hu float64 `json:"hu"`
  183. Hprel float64 `json:"hprel"`
  184. Hpreu float64 `json:"hpreu"`
  185. Sense float64 `json:"sense"`
  186. Tu float64 `json:"tu"`
  187. Tprel float64 `json:"tprel"`
  188. Enprelnote float64 `json:"enprelnote"`
  189. Enprel float64 `json:"enprel"`
  190. Name string `json:"name"`
  191. En float64 `json:"en"`
  192. Speed float64 `json:"speed"`
  193. Hl float64 `json:"hl"`
  194. Free float64 `json:"free"`
  195. Id int `json:"id"`
  196. } `json:"data"`
  197. Mid int `json:"mid"`
  198. }
  199. func onMessageReceived(client mqtt.Client, msg mqtt.Message) {
  200. s := gjson.Get(string(msg.Payload()), "type").Int()
  201. //log.Println("MessageID", s)
  202. clos := make([]string, 0)
  203. switch s {
  204. case 7:
  205. var t Deviation_Sub
  206. err := json.Unmarshal(msg.Payload(), &t)
  207. if err != nil {
  208. log.Println("json 序列化失败")
  209. log.Println("失败参数", string(msg.Payload()))
  210. }
  211. sn := strings.TrimPrefix(msg.Topic(), "/sub/")
  212. log.Println("订阅主题", sn)
  213. log.Println("接收到参数", t)
  214. if len(t.Data) > 0 {
  215. for _, v := range t.Data {
  216. coldpServer := ClodpServer{
  217. Sn: sn,
  218. T_id: v.Id,
  219. IsTrue: 2,
  220. T_t: v.T,
  221. T_h: v.H,
  222. Types: 8,
  223. CreateTime: time.Now(),
  224. UpdateTime: time.Now(),
  225. }
  226. coldp, err := FindClodpServerBySnAndId(sn, v.Id, 8)
  227. if err != nil {
  228. Add_ClodpServer(coldpServer)
  229. }
  230. if len(coldp.Sn) == 0 {
  231. Add_ClodpServer(coldpServer)
  232. }
  233. if coldp.T_h != v.H || coldp.T_t != v.T || coldp.IsTrue != 2 {
  234. coldp.T_t = v.T
  235. coldp.T_h = v.H
  236. coldp.IsTrue = 2
  237. clos = append(clos, "t_t")
  238. clos = append(clos, "t_h")
  239. clos = append(clos, "is_true")
  240. Update_ClodpServer(coldp, clos...)
  241. }
  242. }
  243. }
  244. case 5:
  245. var type5 Type5
  246. err := json.Unmarshal(msg.Payload(), &type5)
  247. if err != nil {
  248. log.Println("json 序列化失败")
  249. log.Println("失败参数", string(msg.Payload()))
  250. }
  251. sn := strings.TrimPrefix(msg.Topic(), "/sub/")
  252. log.Println("订阅主题", sn)
  253. log.Println("接收到参数", string(msg.Payload()))
  254. if len(type5.Data) > 0 {
  255. for _, v := range type5.Data {
  256. coldpServer := ClodpServer{
  257. Sn: sn,
  258. T_id: v.Id,
  259. IsTrue: 2,
  260. Sense: v.Sense,
  261. Speed: v.Speed,
  262. Types: 6,
  263. CreateTime: time.Now(),
  264. UpdateTime: time.Now(),
  265. }
  266. coldp, err := FindClodpServerBySnAndId(sn, v.Id, 6)
  267. if err != nil {
  268. Add_ClodpServer(coldpServer)
  269. }
  270. if len(coldp.Sn) == 0 {
  271. Add_ClodpServer(coldpServer)
  272. }
  273. if coldp.Sense != v.Sense || coldp.Speed != v.Speed || coldp.IsTrue != 2 {
  274. coldp.Sense = v.Sense
  275. coldp.Speed = v.Speed
  276. coldp.IsTrue = 2
  277. clos = append(clos, "sense")
  278. clos = append(clos, "speed")
  279. clos = append(clos, "is_true")
  280. Update_ClodpServer(coldp, clos...)
  281. }
  282. }
  283. }
  284. }
  285. }
  286. func MqttPublish(client mqtt.Client, publishTopic string, pubData []byte) error {
  287. log.Println("发送主题", publishTopic, "发送消息", string(pubData))
  288. publish := client.Publish(publishTopic, 0, false, pubData)
  289. publish.Wait()
  290. if publish.Error() != nil {
  291. return fmt.Errorf("发送消息失败 %v", publish.Error())
  292. }
  293. return nil
  294. }
  295. // SendAndWaitForAck 发送消息并等待确认
  296. func SendAndWaitForAck(broker, publishTopic, subfirmTopic string, pubData []byte, timeout time.Duration) (bool, Deviation_Sub, error) {
  297. uniqueId := strconv.FormatInt(time.Now().Unix(), 10) + "_" + strconv.Itoa(rand.Intn(1000))
  298. bro := fmt.Sprintf("tcp://%s", broker+"."+mqttSuffix+":"+mqttPort)
  299. fmt.Println(bro)
  300. opts := mqtt.NewClientOptions().AddBroker(bro)
  301. opts.SetClientID(uniqueId)
  302. opts.SetUsername(mqttUsername)
  303. opts.SetPassword(mqttPassword)
  304. opts.SetCleanSession(true) // 启用持久会话
  305. // 创建通道用于接收确认消息
  306. ackChan := make(chan bool, 1)
  307. msgChan := make(chan Deviation_Sub, 1)
  308. // 设置消息接收回调
  309. opts.SetDefaultPublishHandler(func(client mqtt.Client, msg mqtt.Message) {
  310. var deviation_Pub Deviation_Pub
  311. err := json.Unmarshal(pubData, &deviation_Pub)
  312. if err != nil {
  313. log.Println("json序列化失败")
  314. }
  315. var ackMessage Deviation_Sub
  316. log.Println("初始化接收值", string(msg.Payload()))
  317. if err := json.Unmarshal(msg.Payload(), &ackMessage); err == nil {
  318. if deviation_Pub.Mid == ackMessage.Mid {
  319. ackChan <- true
  320. msgChan <- ackMessage
  321. log.Println("接收到参数", string(msg.Payload()))
  322. }
  323. }
  324. })
  325. client := mqtt.NewClient(opts)
  326. if token := client.Connect(); token.Wait() && token.Error() != nil {
  327. return false, Deviation_Sub{}, fmt.Errorf("连接失败: %v", token.Error())
  328. }
  329. defer client.Disconnect(250)
  330. // 订阅确认消息的主题
  331. token := client.Subscribe(subfirmTopic, 0, nil)
  332. token.Wait()
  333. if token.Error() != nil {
  334. return false, Deviation_Sub{}, fmt.Errorf("订阅主题失败: %v", token.Error())
  335. }
  336. log.Printf("订阅主题成功 '%s'", subfirmTopic)
  337. // 发布消息到指定主题
  338. token = client.Publish(publishTopic, 0, false, pubData)
  339. token.Wait()
  340. if token.Error() != nil {
  341. return false, Deviation_Sub{}, fmt.Errorf("发送消息失败 %v", token.Error())
  342. } else {
  343. log.Printf("发送消息成功: %s", pubData)
  344. }
  345. // 等待确认消息或超时
  346. select {
  347. case <-ackChan:
  348. receivedMsg := <-msgChan
  349. client.Disconnect(250)
  350. return true, receivedMsg, nil
  351. case <-time.After(timeout):
  352. client.Disconnect(250)
  353. return false, Deviation_Sub{}, fmt.Errorf("连接超时")
  354. }
  355. }
  356. func SendAndWait(client mqtt.Client, publishTopic, subfirmTopic string, pubData []byte, timeout time.Duration) (bool, Deviation_Sub, error) {
  357. // 创建通道用于接收确认消息
  358. ackChan := make(chan bool, 1)
  359. msgChan := make(chan Deviation_Sub, 1)
  360. if token := client.Connect(); token.Wait() && token.Error() != nil {
  361. return false, Deviation_Sub{}, fmt.Errorf("连接失败: %v", token.Error())
  362. }
  363. // 订阅确认消息的主题
  364. token := client.Subscribe(subfirmTopic, 0, func(client mqtt.Client, msg mqtt.Message) {
  365. var deviation_Pub Deviation_Pub
  366. err := json.Unmarshal(pubData, &deviation_Pub)
  367. if err != nil {
  368. log.Println("json序列化失败")
  369. }
  370. var ackMessage Deviation_Sub
  371. log.Println("初始化接收值", string(msg.Payload()))
  372. if err := json.Unmarshal(msg.Payload(), &ackMessage); err == nil {
  373. if deviation_Pub.Mid == ackMessage.Mid {
  374. ackChan <- true
  375. msgChan <- ackMessage
  376. log.Println("接收到参数", string(msg.Payload()))
  377. }
  378. }
  379. })
  380. token.Wait()
  381. if token.Error() != nil {
  382. return false, Deviation_Sub{}, fmt.Errorf("订阅主题失败: %v", token.Error())
  383. }
  384. log.Printf("订阅主题成功 '%s'", subfirmTopic)
  385. // 发布消息到指定主题
  386. Publishtoken := client.Publish(publishTopic, 0, false, pubData)
  387. Publishtoken.Wait()
  388. if Publishtoken.Error() != nil {
  389. return false, Deviation_Sub{}, fmt.Errorf("发送消息失败 %v", token.Error())
  390. } else {
  391. log.Printf("发送消息成功: %s", pubData)
  392. }
  393. //// 使用 token.Done() 来创建一个 goroutine 监听发布操作的完成
  394. //go func() {
  395. // <-Publishtoken.Done() // 等待发布操作完成
  396. // if err := Publishtoken.Error(); err != nil {
  397. // log.Printf("发送消息失败: %v", err)
  398. // return false, Deviation_Sub{}, fmt.Errorf("发送消息失败 %v", Publishtoken.Error())
  399. // } else {
  400. // log.Printf("发送消息成功: %s", pubData)
  401. // }
  402. //}()
  403. // 等待确认消息或超时
  404. select {
  405. case <-ackChan:
  406. receivedMsg := <-msgChan
  407. return true, receivedMsg, nil
  408. default:
  409. return false, Deviation_Sub{}, fmt.Errorf("订阅失败")
  410. }
  411. }
  412. func Cron() {
  413. // 创建一个定时任务对象
  414. c := cron.New(cron.WithSeconds())
  415. // 给对象增加定时任务
  416. cron, _ := beego.AppConfig.String("Cron")
  417. c.AddFunc(cron, UpdateDataRepeat)
  418. // 启动定时任务
  419. c.Start()
  420. defer c.Stop()
  421. // 查询语句,阻塞,让main函数不退出,保持程序运行
  422. select {}
  423. }
  424. func UpdateDataRepeat() {
  425. o := orm.NewOrm()
  426. table := o.QueryTable(new(ClodpServer))
  427. var clodps []ClodpServer
  428. all, err := table.Filter("is_true", 1).All(&clodps)
  429. if err != nil {
  430. logs.Error("查询参数失败")
  431. return // 确保函数在此处结束,因为下面的代码依赖于成功的查询结果
  432. }
  433. var wg sync.WaitGroup
  434. maxConcurrency := 10 // 设置最大并发数
  435. sem := make(chan struct{}, maxConcurrency)
  436. if all > 0 {
  437. for _, clodp := range clodps {
  438. wg.Add(1)
  439. sem <- struct{}{}
  440. go func(clodp ClodpServer) {
  441. defer wg.Done()
  442. defer func() { <-sem }()
  443. mqttId := Device.ReadDeviceMqttId(clodp.Sn)
  444. if client, ok := MqttCon.Get(mqttId); ok {
  445. topicPub := fmt.Sprintf("/pub/%s", clodp.Sn)
  446. switch clodp.Types {
  447. case 8:
  448. pubData, err := json.Marshal(Deviation_Sub{
  449. Sn: clodp.Sn,
  450. Type: 8,
  451. Mid: time.Now().Unix() + int64(rand.Intn(10)),
  452. Data: []Deviation_Sub_Data{
  453. {
  454. Id: clodp.T_id,
  455. T: clodp.T_t,
  456. H: clodp.T_h,
  457. },
  458. },
  459. })
  460. if err != nil {
  461. logs.Error("下发参数失败", clodp.Sn, "错误信息:", err)
  462. return
  463. }
  464. err = MqttPublish(client, topicPub, pubData)
  465. if err != nil {
  466. logs.Error("下发参数失败", clodp.Sn, "错误信息:", err)
  467. }
  468. case 6:
  469. pubData, err := json.Marshal(Sensor_Sub{
  470. Type: 6,
  471. Sn: clodp.Sn,
  472. Mid: int(time.Now().Unix() + int64(rand.Intn(10))),
  473. Data: []Sensor_Sub_Data{
  474. {
  475. Id: clodp.T_id,
  476. Speed: clodp.Speed,
  477. Sense: clodp.Sense,
  478. },
  479. },
  480. })
  481. if err != nil {
  482. logs.Error("下发参数失败", clodp.Sn, "错误信息:", err)
  483. return
  484. }
  485. err = MqttPublish(client, topicPub, pubData)
  486. if err != nil {
  487. logs.Error("下发参数失败", clodp.Sn, "错误信息:", err)
  488. }
  489. }
  490. }
  491. }(clodp)
  492. }
  493. wg.Wait()
  494. }
  495. }
  496. func Add_ClodpServer(m ClodpServer) (err error) {
  497. o := orm.NewOrm()
  498. _, err = o.Insert(&m)
  499. if err != nil {
  500. logs.Error(lib.FuncName(), err)
  501. return err
  502. }
  503. return nil
  504. }
  505. func FindClodpServerBySnAndId(sn string, tid, types int) (coldp ClodpServer, err error) {
  506. o := orm.NewOrm()
  507. table := o.QueryTable(new(ClodpServer))
  508. err = table.Filter("sn", sn).Filter("t_id", tid).Filter("types", types).One(&coldp)
  509. if err != nil {
  510. logs.Error(lib.FuncName(), err)
  511. return ClodpServer{}, nil
  512. }
  513. return coldp, nil
  514. }
  515. func Update_ClodpServer(r ClodpServer, cols ...string) bool {
  516. o := orm.NewOrm()
  517. num, err := o.Update(&r, cols...)
  518. if err != nil {
  519. logs.Error(lib.FuncName(), err)
  520. return false
  521. }
  522. fmt.Println("Number of records updated in database:", num)
  523. return true
  524. }