MqttServev2.go 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217
  1. package MqttServer
  2. //
  3. //import (
  4. // "ColdP_server/models/Device"
  5. // "encoding/json"
  6. // "errors"
  7. // "fmt"
  8. // "github.com/astaxie/beego/logs"
  9. // beego "github.com/beego/beego/v2/server/web"
  10. // "github.com/yosssi/gmq/mqtt"
  11. // "github.com/yosssi/gmq/mqtt/client"
  12. // "log"
  13. // "os"
  14. // "sync"
  15. // "sync/atomic"
  16. // "time"
  17. //)
  18. //
  19. //var (
  20. // mqttSuffix string
  21. // mqttPort string
  22. // mqttUsername string
  23. // mqttPassword string
  24. // mqttUrl string
  25. // subscribedTopics map[string]bool
  26. // topicMutex sync.Mutex
  27. // publishQueue chan *PublishTask
  28. //)
  29. //
  30. //type PublishTask struct {
  31. // Cli *client.Client
  32. // Topic string
  33. // Msg []byte
  34. //}
  35. //
  36. //func init() {
  37. // var err error
  38. // mqttSuffix, err = beego.AppConfig.String("Mqtt_suffix")
  39. // if err != nil {
  40. // log.Fatalf("Failed to load Mqtt_suffix: %v", err)
  41. // }
  42. //
  43. // mqttPort, err = beego.AppConfig.String("Mqtt_port")
  44. // if err != nil {
  45. // log.Fatalf("Failed to load Mqtt_port: %v", err)
  46. // }
  47. //
  48. // mqttUsername, err = beego.AppConfig.String("Mqtt_username")
  49. // if err != nil {
  50. // log.Fatalf("Failed to load Mqtt_username: %v", err)
  51. // }
  52. //
  53. // mqttPassword, err = beego.AppConfig.String("Mqtt_password")
  54. // if err != nil {
  55. // log.Fatalf("Failed to load Mqtt_password: %v", err)
  56. // }
  57. //
  58. // subscribedTopics = make(map[string]bool)
  59. // publishQueue = make(chan *PublishTask, 100) // 根据预估消息量设置队列大小
  60. // go func() {
  61. // for task := range publishQueue {
  62. // err := PubMqttMessage(task.Cli, task.Topic, task.Msg)
  63. // if err != nil {
  64. // log.Printf("发布消息失败,主题: %s,错误: %v", task.Topic, err)
  65. // }
  66. // }
  67. // }()
  68. //}
  69. //
  70. //// GetMqttClient 获取MQTT客户端连接
  71. //func GetMqttClient(mqttAddrPrefix string) (*client.Client, error) {
  72. // //r := rand.New(rand.NewSource(time.Now().Unix()))
  73. // var clientIdCounter int32
  74. // pid := os.Getpid()
  75. // // 使用时间戳和进程ID以及一个自增的计数器(简单示例,实际可更完善)
  76. // uniqueId := fmt.Sprintf("%d_%d_%d", time.Now().UnixNano(), pid, atomic.AddInt32(&clientIdCounter, 1))
  77. // options := client.ConnectOptions{
  78. // Network: "tcp",
  79. // Address: fmt.Sprintf("%s.%s:%s", mqttAddrPrefix, mqttSuffix, mqttPort),
  80. // UserName: []byte(mqttUsername),
  81. // Password: []byte(mqttPassword),
  82. // ClientID: []byte(uniqueId),
  83. // CleanSession: false, // 保持会话,有助于自动重连
  84. // }
  85. // cli := client.New(&client.Options{
  86. // ErrorHandler: func(e error) {
  87. // logs.Error("MQTT Error: %v", e)
  88. // },
  89. // })
  90. // options.PINGRESPTimeout = time.Second * 5
  91. //
  92. // // 尝试连接并处理重试逻辑
  93. // var err error
  94. // for i := 0; i <= 3; i++ {
  95. // if err = cli.Connect(&options); err == nil {
  96. // fmt.Println("MQTT 连接成功")
  97. // break
  98. // }
  99. // logs.Error("MQTT 连接失败: %v,正在重试... (尝试次数: %d/%d)", err, i+1, 3)
  100. //
  101. // // 如果不是最后一次尝试,则等待一段时间再重试
  102. // if i < 3 {
  103. // time.Sleep(time.Second * time.Duration(i+1))
  104. // }
  105. // }
  106. //
  107. // if err != nil {
  108. // logs.Error("MQTT 连接失败,已达到最大重试次数")
  109. // return nil, fmt.Errorf("MQTT 连接失败: %w", err)
  110. // }
  111. //
  112. // return cli, nil
  113. //}
  114. //
  115. //// PubMqttMessage 发送信息
  116. //func PubMqttMessage(cli *client.Client, topic string, msg []byte) error {
  117. // opts := &client.PublishOptions{
  118. // QoS: mqtt.QoS0, // 根据需求调整QoS级别
  119. // TopicName: []byte(topic),
  120. // Message: msg,
  121. // }
  122. // for i := 0; i <= 3; i++ {
  123. // if err := cli.Publish(opts); err == nil {
  124. // log.Printf("MQTT 发送成功, 主题: %s", topic)
  125. // return nil
  126. // }
  127. // log.Printf("MQTT 发布失败, 主题: %s, 错误: %v,正在重试... (尝试次数: %d)", topic, i+1, 3)
  128. // // 如果不是最后一次尝试,则等待一段时间再重试
  129. // if i < 3 {
  130. // time.Sleep(time.Second * time.Duration(i+1))
  131. // }
  132. // }
  133. // // 如果所有重试都失败了,返回错误
  134. // return fmt.Errorf("MQTT 发布失败, 主题: %s, 最大重试次数已达到", topic)
  135. //}
  136. //func PubMqttMessageAndSub(cli *client.Client, topic, topicSub string, msg []byte) (devi Device.Deviation, e error) {
  137. // opts := &client.PublishOptions{
  138. // QoS: mqtt.QoS1, // 根据需求调整QoS级别
  139. // TopicName: []byte(topic),
  140. // Message: msg,
  141. // }
  142. // var publishErr error
  143. // err := cli.Subscribe(&client.SubscribeOptions{
  144. // SubReqs: []*client.SubReq{
  145. // &client.SubReq{
  146. // TopicFilter: []byte(topicSub),
  147. // QoS: mqtt.QoS1,
  148. // Handler: func(topicName, message []byte) {
  149. // log.Println("接收到数据", string(message))
  150. // err := json.Unmarshal(message, &devi)
  151. // if err != nil {
  152. // log.Printf("JSON解析消息失败: %v", err)
  153. // }
  154. // },
  155. // },
  156. // },
  157. // })
  158. // if err != nil {
  159. // log.Printf("MQTT 订阅失败: %v", err)
  160. // return devi, err
  161. // }
  162. // var done chan struct{}
  163. // go func() {
  164. // for i := 0; i <= 3; i++ {
  165. // if err := cli.Publish(opts); err == nil {
  166. // log.Printf("MQTT 发送成功, 主题: %s", topic)
  167. // close(done)
  168. // return
  169. // }
  170. // if i < 3 {
  171. // time.Sleep(time.Second * time.Duration(i+1))
  172. // }
  173. // }
  174. // publishErr = fmt.Errorf("MQTT 发布失败, 主题: %s, 最大重试次数已达到", topic)
  175. // close(done)
  176. // }()
  177. //
  178. // // 等待发布操作完成或者超时处理(这里简单等待一定时间,可根据实际情况优化超时逻辑)
  179. // select {
  180. // case <-done:
  181. // case <-time.After(time.Second * 10):
  182. // publishErr = fmt.Errorf("MQTT 发布超时, 主题: %s", topic)
  183. // }
  184. // // 如果所有重试都失败了,返回错误
  185. // return Device.Deviation{}, publishErr
  186. //}
  187. //
  188. //func Subscript(cli *client.Client, topic string, msg chan string, done chan struct{}) error {
  189. // fmt.Printf("订阅主题:%s\n", topic)
  190. // subOpts := &client.SubscribeOptions{
  191. // SubReqs: []*client.SubReq{
  192. // {
  193. // TopicFilter: []byte(topic),
  194. // QoS: mqtt.QoS0,
  195. // Handler: func(topicName, message []byte) {
  196. // fmt.Println("MQTT接收到信息:", string(message))
  197. // select {
  198. // case msg <- topic + "||" + string(message):
  199. // fmt.Println("接收到数据", topic, string(message))
  200. // case <-done:
  201. // fmt.Println("通道已关闭,无法发送消息")
  202. // return
  203. // case <-time.After(time.Millisecond * 100): // 防止阻塞太久
  204. // fmt.Println("通道已满,无法发送消息")
  205. // }
  206. // },
  207. // },
  208. // },
  209. // }
  210. // if err := cli.Subscribe(subOpts); err != nil {
  211. // log.Printf("MQTT 订阅失败: %v", err)
  212. // cli.Terminate()
  213. // return errors.New("订阅消息失败")
  214. // }
  215. // return nil
  216. //}