Mqtt.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550
  1. package MqttServer
  2. import (
  3. "ColdP_server/controllers/lib"
  4. "ColdP_server/logs"
  5. "ColdP_server/models/Device"
  6. "encoding/json"
  7. "fmt"
  8. "log"
  9. "math/rand"
  10. "os"
  11. "os/signal"
  12. "strconv"
  13. "strings"
  14. "sync"
  15. "sync/atomic"
  16. "syscall"
  17. "time"
  18. "github.com/beego/beego/v2/adapter/orm"
  19. beego "github.com/beego/beego/v2/server/web"
  20. mqtt "github.com/eclipse/paho.mqtt.golang"
  21. "github.com/robfig/cron/v3"
  22. "github.com/tidwall/gjson"
  23. )
  24. var (
  25. mqttSuffix string
  26. mqttPort string
  27. mqttUsername string
  28. mqttPassword string
  29. Mqtt_UrlMqttjxit string
  30. Mqtt_UrlMqttlodr string
  31. MqttjxitCon mqtt.Client
  32. MqttlodrCon mqtt.Client
  33. )
  34. // var MqttCon = make(map[string]mqtt.Client)
  35. var MqttCon = NewSafeMap()
  36. type SafeMap struct {
  37. mu sync.Mutex
  38. m map[string]mqtt.Client
  39. }
  40. func NewSafeMap() *SafeMap {
  41. return &SafeMap{
  42. m: make(map[string]mqtt.Client),
  43. }
  44. }
  45. func (sm *SafeMap) Set(key string, value mqtt.Client) {
  46. sm.mu.Lock()
  47. defer sm.mu.Unlock()
  48. sm.m[key] = value
  49. }
  50. func (sm *SafeMap) Get(key string) (mqtt.Client, bool) {
  51. sm.mu.Lock()
  52. defer sm.mu.Unlock()
  53. val, ok := sm.m[key]
  54. return val, ok
  55. }
  56. func (sm *SafeMap) Delete(key string) {
  57. sm.mu.Lock()
  58. defer sm.mu.Unlock()
  59. delete(sm.m, key)
  60. }
  61. type ClodpServer struct {
  62. Id int `json:"id"`
  63. Sn string `json:"sn"` // 设备序列号
  64. T_id int `json:"t_id"` // 设备id
  65. IsTrue int `json:"is_true"` //是否发送成功
  66. T_t float64 `json:"t_t"` //温度
  67. T_h float64 `json:"t_h"` //湿度
  68. Types int `json:"type"` //类型
  69. Speed float64 `json:"speed"` // 传感器采样率
  70. Sense float64 `json:"sense"` // 传感器灵敏度
  71. CreateTime time.Time `json:"create_time"` //auto_now_add 第一次保存时才设置时间
  72. UpdateTime time.Time `json:"update_time"` //auto_now 每次 model 保存时都会对时间自动更新
  73. }
  74. func (t *ClodpServer) TableName() string {
  75. return "coldp_server"
  76. }
  77. func init() {
  78. var err error
  79. mqttSuffix, err = beego.AppConfig.String("Mqtt_suffix")
  80. if err != nil {
  81. log.Fatalf("Failed to load Mqtt_suffix: %v", err)
  82. }
  83. mqttPort, err = beego.AppConfig.String("Mqtt_port")
  84. if err != nil {
  85. log.Fatalf("Failed to load Mqtt_port: %v", err)
  86. }
  87. mqttUsername, err = beego.AppConfig.String("Mqtt_username")
  88. if err != nil {
  89. log.Fatalf("Failed to load Mqtt_username: %v", err)
  90. }
  91. mqttPassword, err = beego.AppConfig.String("Mqtt_password")
  92. if err != nil {
  93. log.Fatalf("Failed to load Mqtt_password: %v", err)
  94. }
  95. Mqtt_UrlMqttjxit, err = beego.AppConfig.String("Mqtt_UrlMqttjxit")
  96. if err != nil {
  97. log.Fatalf("Failed to load Mqtt_password: %v", err)
  98. }
  99. Mqtt_UrlMqttlodr, err = beego.AppConfig.String("Mqtt_UrlMqttlodr")
  100. if err != nil {
  101. log.Fatalf("Failed to load Mqtt_password: %v", err)
  102. }
  103. orm.RegisterModel(new(ClodpServer))
  104. }
  105. func MqttConntMqttjxit() {
  106. pid := os.Getpid()
  107. var clientIdCounter int32
  108. uniqueId := fmt.Sprintf("%d_%d_%d", time.Now().UnixNano(), pid, atomic.AddInt32(&clientIdCounter, 1))
  109. fmt.Println("uniqueId:", uniqueId)
  110. opts := mqtt.NewClientOptions().AddBroker(Mqtt_UrlMqttjxit)
  111. opts.SetClientID(uniqueId)
  112. opts.SetUsername("coldp")
  113. opts.SetPassword("EHM5PpXDD579gmp")
  114. MqttjxitCon = mqtt.NewClient(opts)
  115. MqttCon.Set("mqttjxit", MqttjxitCon)
  116. if token := MqttjxitCon.Connect(); token.Wait() && token.Error() != nil {
  117. log.Fatal(token.Error())
  118. }
  119. MqttjxitCon.Subscribe("/sub/#", 0, onMessageReceived)
  120. ch := make(chan os.Signal, 1)
  121. signal.Notify(ch, os.Interrupt, syscall.SIGTERM)
  122. <-ch // 等待信号
  123. log.Println("Interrupt received, disconnecting...")
  124. MqttjxitCon.Disconnect(250)
  125. }
  126. func MqttConntMqttlodr() {
  127. pid := os.Getpid()
  128. var clientIdCounter int32
  129. uniqueId := fmt.Sprintf("%d_%d_%d", time.Now().Unix(), pid, atomic.AddInt32(&clientIdCounter, 1))
  130. fmt.Println("uniqueId:", uniqueId)
  131. opts := mqtt.NewClientOptions().AddBroker(Mqtt_UrlMqttlodr)
  132. opts.SetClientID(uniqueId)
  133. opts.SetUsername("coldp")
  134. opts.SetPassword("EHM5PpXDD579gmp")
  135. MqttlodrCon = mqtt.NewClient(opts)
  136. MqttCon.Set("mqttlodr", MqttlodrCon)
  137. if token := MqttlodrCon.Connect(); token.Wait() && token.Error() != nil {
  138. log.Fatal(token.Error())
  139. }
  140. MqttlodrCon.Subscribe("/sub/#", 0, onMessageReceived)
  141. ch := make(chan os.Signal, 1)
  142. signal.Notify(ch, os.Interrupt, syscall.SIGTERM)
  143. <-ch // 等待信号
  144. log.Println("Interrupt received, disconnecting...")
  145. MqttlodrCon.Disconnect(250)
  146. }
  147. func MqttConntMqttyuht() {
  148. pid := os.Getpid()
  149. var clientIdCounter int32
  150. uniqueId := fmt.Sprintf("%d_%d_%d", time.Now().Unix(), pid, atomic.AddInt32(&clientIdCounter, 1+1))
  151. fmt.Println("uniqueId:", uniqueId)
  152. opts := mqtt.NewClientOptions().AddBroker(Mqtt_UrlMqttlodr)
  153. opts.SetClientID(uniqueId)
  154. opts.SetUsername("coldp")
  155. opts.SetPassword("EHM5PpXDD579gmp")
  156. MqttlodrCon = mqtt.NewClient(opts)
  157. MqttCon.Set("mqttlodr", MqttlodrCon)
  158. if token := MqttlodrCon.Connect(); token.Wait() && token.Error() != nil {
  159. log.Fatal(token.Error())
  160. }
  161. MqttlodrCon.Subscribe("/sub/#", 0, onMessageReceived)
  162. ch := make(chan os.Signal, 1)
  163. signal.Notify(ch, os.Interrupt, syscall.SIGTERM)
  164. <-ch // 等待信号
  165. log.Println("Interrupt received, disconnecting...")
  166. MqttlodrCon.Disconnect(250)
  167. }
  168. type Type5 struct {
  169. Dut int `json:"dut"`
  170. Type int `json:"type"`
  171. Data []struct {
  172. Tpreu float64 `json:"tpreu"`
  173. Tl float64 `json:"tl"`
  174. Hu float64 `json:"hu"`
  175. Hprel float64 `json:"hprel"`
  176. Hpreu float64 `json:"hpreu"`
  177. Sense float64 `json:"sense"`
  178. Tu float64 `json:"tu"`
  179. Tprel float64 `json:"tprel"`
  180. Enprelnote float64 `json:"enprelnote"`
  181. Enprel float64 `json:"enprel"`
  182. Name string `json:"name"`
  183. En float64 `json:"en"`
  184. Speed float64 `json:"speed"`
  185. Hl float64 `json:"hl"`
  186. Free float64 `json:"free"`
  187. Id int `json:"id"`
  188. } `json:"data"`
  189. Mid int `json:"mid"`
  190. }
  191. func onMessageReceived(client mqtt.Client, msg mqtt.Message) {
  192. s := gjson.Get(string(msg.Payload()), "type").Int()
  193. //log.Println("MessageID", s)
  194. clos := make([]string, 0)
  195. switch s {
  196. case 7:
  197. var t Deviation_Sub
  198. err := json.Unmarshal(msg.Payload(), &t)
  199. if err != nil {
  200. log.Println("json 序列化失败")
  201. log.Println("失败参数", string(msg.Payload()))
  202. }
  203. sn := strings.TrimPrefix(msg.Topic(), "/sub/")
  204. log.Println("订阅主题", sn)
  205. log.Println("接收到参数", t)
  206. if len(t.Data) > 0 {
  207. for _, v := range t.Data {
  208. coldpServer := ClodpServer{
  209. Sn: sn,
  210. T_id: v.Id,
  211. IsTrue: 2,
  212. T_t: v.T,
  213. T_h: v.H,
  214. Types: 8,
  215. CreateTime: time.Now(),
  216. UpdateTime: time.Now(),
  217. }
  218. coldp, err := FindClodpServerBySnAndId(sn, v.Id, 8)
  219. if err != nil {
  220. Add_ClodpServer(coldpServer)
  221. }
  222. if len(coldp.Sn) == 0 {
  223. Add_ClodpServer(coldpServer)
  224. }
  225. if coldp.T_h != v.H || coldp.T_t != v.T || coldp.IsTrue != 2 {
  226. coldp.T_t = v.T
  227. coldp.T_h = v.H
  228. coldp.IsTrue = 2
  229. clos = append(clos, "t_t")
  230. clos = append(clos, "t_h")
  231. clos = append(clos, "is_true")
  232. Update_ClodpServer(coldp, clos...)
  233. }
  234. }
  235. }
  236. case 5:
  237. var type5 Type5
  238. err := json.Unmarshal(msg.Payload(), &type5)
  239. if err != nil {
  240. log.Println("json 序列化失败")
  241. log.Println("失败参数", string(msg.Payload()))
  242. }
  243. sn := strings.TrimPrefix(msg.Topic(), "/sub/")
  244. log.Println("订阅主题", sn)
  245. log.Println("接收到参数", string(msg.Payload()))
  246. if len(type5.Data) > 0 {
  247. for _, v := range type5.Data {
  248. coldpServer := ClodpServer{
  249. Sn: sn,
  250. T_id: v.Id,
  251. IsTrue: 2,
  252. Sense: v.Sense,
  253. Speed: v.Speed,
  254. Types: 6,
  255. CreateTime: time.Now(),
  256. UpdateTime: time.Now(),
  257. }
  258. coldp, err := FindClodpServerBySnAndId(sn, v.Id, 6)
  259. if err != nil {
  260. Add_ClodpServer(coldpServer)
  261. }
  262. if len(coldp.Sn) == 0 {
  263. Add_ClodpServer(coldpServer)
  264. }
  265. if coldp.Sense != v.Sense || coldp.Speed != v.Speed || coldp.IsTrue != 2 {
  266. coldp.Sense = v.Sense
  267. coldp.Speed = v.Speed
  268. coldp.IsTrue = 2
  269. clos = append(clos, "sense")
  270. clos = append(clos, "speed")
  271. clos = append(clos, "is_true")
  272. Update_ClodpServer(coldp, clos...)
  273. }
  274. }
  275. }
  276. }
  277. }
  278. func MqttPublish(client mqtt.Client, publishTopic string, pubData []byte) error {
  279. log.Println("发送主题", publishTopic, "发送消息", string(pubData))
  280. publish := client.Publish(publishTopic, 0, false, pubData)
  281. publish.Wait()
  282. if publish.Error() != nil {
  283. return fmt.Errorf("发送消息失败 %v", publish.Error())
  284. }
  285. return nil
  286. }
  287. // SendAndWaitForAck 发送消息并等待确认
  288. func SendAndWaitForAck(broker, publishTopic, subfirmTopic string, pubData []byte, timeout time.Duration) (bool, Deviation_Sub, error) {
  289. uniqueId := strconv.FormatInt(time.Now().Unix(), 10) + "_" + strconv.Itoa(rand.Intn(1000))
  290. bro := fmt.Sprintf("tcp://%s", broker+"."+mqttSuffix+":"+mqttPort)
  291. fmt.Println(bro)
  292. opts := mqtt.NewClientOptions().AddBroker(bro)
  293. opts.SetClientID(uniqueId)
  294. opts.SetUsername(mqttUsername)
  295. opts.SetPassword(mqttPassword)
  296. opts.SetCleanSession(true) // 启用持久会话
  297. // 创建通道用于接收确认消息
  298. ackChan := make(chan bool, 1)
  299. msgChan := make(chan Deviation_Sub, 1)
  300. // 设置消息接收回调
  301. opts.SetDefaultPublishHandler(func(client mqtt.Client, msg mqtt.Message) {
  302. var deviation_Pub Deviation_Pub
  303. err := json.Unmarshal(pubData, &deviation_Pub)
  304. if err != nil {
  305. log.Println("json序列化失败")
  306. }
  307. var ackMessage Deviation_Sub
  308. log.Println("初始化接收值", string(msg.Payload()))
  309. if err := json.Unmarshal(msg.Payload(), &ackMessage); err == nil {
  310. if deviation_Pub.Mid == ackMessage.Mid {
  311. ackChan <- true
  312. msgChan <- ackMessage
  313. log.Println("接收到参数", string(msg.Payload()))
  314. }
  315. }
  316. })
  317. client := mqtt.NewClient(opts)
  318. if token := client.Connect(); token.Wait() && token.Error() != nil {
  319. return false, Deviation_Sub{}, fmt.Errorf("连接失败: %v", token.Error())
  320. }
  321. defer client.Disconnect(250)
  322. // 订阅确认消息的主题
  323. token := client.Subscribe(subfirmTopic, 0, nil)
  324. token.Wait()
  325. if token.Error() != nil {
  326. return false, Deviation_Sub{}, fmt.Errorf("订阅主题失败: %v", token.Error())
  327. }
  328. log.Printf("订阅主题成功 '%s'", subfirmTopic)
  329. // 发布消息到指定主题
  330. token = client.Publish(publishTopic, 0, false, pubData)
  331. token.Wait()
  332. if token.Error() != nil {
  333. return false, Deviation_Sub{}, fmt.Errorf("发送消息失败 %v", token.Error())
  334. } else {
  335. log.Printf("发送消息成功: %s", pubData)
  336. }
  337. // 等待确认消息或超时
  338. select {
  339. case <-ackChan:
  340. receivedMsg := <-msgChan
  341. client.Disconnect(250)
  342. return true, receivedMsg, nil
  343. case <-time.After(timeout):
  344. client.Disconnect(250)
  345. return false, Deviation_Sub{}, fmt.Errorf("连接超时")
  346. }
  347. }
  348. func SendAndWait(client mqtt.Client, publishTopic, subfirmTopic string, pubData []byte, timeout time.Duration) (bool, Deviation_Sub, error) {
  349. // 创建通道用于接收确认消息
  350. ackChan := make(chan bool, 1)
  351. msgChan := make(chan Deviation_Sub, 1)
  352. if token := client.Connect(); token.Wait() && token.Error() != nil {
  353. return false, Deviation_Sub{}, fmt.Errorf("连接失败: %v", token.Error())
  354. }
  355. // 订阅确认消息的主题
  356. token := client.Subscribe(subfirmTopic, 0, func(client mqtt.Client, msg mqtt.Message) {
  357. var deviation_Pub Deviation_Pub
  358. err := json.Unmarshal(pubData, &deviation_Pub)
  359. if err != nil {
  360. log.Println("json序列化失败")
  361. }
  362. var ackMessage Deviation_Sub
  363. log.Println("初始化接收值", string(msg.Payload()))
  364. if err := json.Unmarshal(msg.Payload(), &ackMessage); err == nil {
  365. if deviation_Pub.Mid == ackMessage.Mid {
  366. ackChan <- true
  367. msgChan <- ackMessage
  368. log.Println("接收到参数", string(msg.Payload()))
  369. }
  370. }
  371. })
  372. token.Wait()
  373. if token.Error() != nil {
  374. return false, Deviation_Sub{}, fmt.Errorf("订阅主题失败: %v", token.Error())
  375. }
  376. log.Printf("订阅主题成功 '%s'", subfirmTopic)
  377. // 发布消息到指定主题
  378. Publishtoken := client.Publish(publishTopic, 0, false, pubData)
  379. Publishtoken.Wait()
  380. if Publishtoken.Error() != nil {
  381. return false, Deviation_Sub{}, fmt.Errorf("发送消息失败 %v", token.Error())
  382. } else {
  383. log.Printf("发送消息成功: %s", pubData)
  384. }
  385. //// 使用 token.Done() 来创建一个 goroutine 监听发布操作的完成
  386. //go func() {
  387. // <-Publishtoken.Done() // 等待发布操作完成
  388. // if err := Publishtoken.Error(); err != nil {
  389. // log.Printf("发送消息失败: %v", err)
  390. // return false, Deviation_Sub{}, fmt.Errorf("发送消息失败 %v", Publishtoken.Error())
  391. // } else {
  392. // log.Printf("发送消息成功: %s", pubData)
  393. // }
  394. //}()
  395. // 等待确认消息或超时
  396. select {
  397. case <-ackChan:
  398. receivedMsg := <-msgChan
  399. return true, receivedMsg, nil
  400. default:
  401. return false, Deviation_Sub{}, fmt.Errorf("订阅失败")
  402. }
  403. }
  404. func Cron() {
  405. // 创建一个定时任务对象
  406. c := cron.New(cron.WithSeconds())
  407. // 给对象增加定时任务
  408. cron, _ := beego.AppConfig.String("Cron")
  409. c.AddFunc(cron, UpdateDataRepeat)
  410. // 启动定时任务
  411. c.Start()
  412. defer c.Stop()
  413. // 查询语句,阻塞,让main函数不退出,保持程序运行
  414. select {}
  415. }
  416. func UpdateDataRepeat() {
  417. o := orm.NewOrm()
  418. table := o.QueryTable(new(ClodpServer))
  419. var clodps []ClodpServer
  420. all, err := table.Filter("is_true", 1).All(&clodps)
  421. if err != nil {
  422. logs.Error("查询参数失败")
  423. return // 确保函数在此处结束,因为下面的代码依赖于成功的查询结果
  424. }
  425. var wg sync.WaitGroup
  426. maxConcurrency := 10 // 设置最大并发数
  427. sem := make(chan struct{}, maxConcurrency)
  428. if all > 0 {
  429. for _, clodp := range clodps {
  430. wg.Add(1)
  431. sem <- struct{}{}
  432. go func(clodp ClodpServer) {
  433. defer wg.Done()
  434. defer func() { <-sem }()
  435. mqttId := Device.ReadDeviceMqttId(clodp.Sn)
  436. if client, ok := MqttCon.Get(mqttId); ok {
  437. topicPub := fmt.Sprintf("/pub/%s", clodp.Sn)
  438. switch clodp.Types {
  439. case 8:
  440. pubData, err := json.Marshal(Deviation_Sub{
  441. Sn: clodp.Sn,
  442. Type: 8,
  443. Mid: time.Now().Unix() + int64(rand.Intn(10)),
  444. Data: []Deviation_Sub_Data{
  445. {
  446. Id: clodp.T_id,
  447. T: clodp.T_t,
  448. H: clodp.T_h,
  449. },
  450. },
  451. })
  452. if err != nil {
  453. logs.Error("下发参数失败", clodp.Sn, "错误信息:", err)
  454. return
  455. }
  456. err = MqttPublish(client, topicPub, pubData)
  457. if err != nil {
  458. logs.Error("下发参数失败", clodp.Sn, "错误信息:", err)
  459. }
  460. case 6:
  461. pubData, err := json.Marshal(Sensor_Sub{
  462. Type: 6,
  463. Sn: clodp.Sn,
  464. Mid: int(time.Now().Unix() + int64(rand.Intn(10))),
  465. Data: []Sensor_Sub_Data{
  466. {
  467. Id: clodp.T_id,
  468. Speed: clodp.Speed,
  469. Sense: clodp.Sense,
  470. },
  471. },
  472. })
  473. if err != nil {
  474. logs.Error("下发参数失败", clodp.Sn, "错误信息:", err)
  475. return
  476. }
  477. err = MqttPublish(client, topicPub, pubData)
  478. if err != nil {
  479. logs.Error("下发参数失败", clodp.Sn, "错误信息:", err)
  480. }
  481. }
  482. }
  483. }(clodp)
  484. }
  485. wg.Wait()
  486. }
  487. }
  488. func Add_ClodpServer(m ClodpServer) (err error) {
  489. o := orm.NewOrm()
  490. _, err = o.Insert(&m)
  491. if err != nil {
  492. logs.Error(lib.FuncName(), err)
  493. return err
  494. }
  495. return nil
  496. }
  497. func FindClodpServerBySnAndId(sn string, tid, types int) (coldp ClodpServer, err error) {
  498. o := orm.NewOrm()
  499. table := o.QueryTable(new(ClodpServer))
  500. err = table.Filter("sn", sn).Filter("t_id", tid).Filter("types", types).One(&coldp)
  501. if err != nil {
  502. logs.Error(lib.FuncName(), err)
  503. return ClodpServer{}, nil
  504. }
  505. return coldp, nil
  506. }
  507. func Update_ClodpServer(r ClodpServer, cols ...string) bool {
  508. o := orm.NewOrm()
  509. num, err := o.Update(&r, cols...)
  510. if err != nil {
  511. logs.Error(lib.FuncName(), err)
  512. return false
  513. }
  514. fmt.Println("Number of records updated in database:", num)
  515. return true
  516. }