MqttServev2.go 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118
  1. package MqttServer
  2. import (
  3. "fmt"
  4. beego "github.com/beego/beego/v2/server/web"
  5. "github.com/yosssi/gmq/mqtt"
  6. "github.com/yosssi/gmq/mqtt/client"
  7. "log"
  8. "math/rand"
  9. "strings"
  10. "time"
  11. )
  12. var (
  13. mqttSuffix string
  14. mqttPort string
  15. mqttUsername string
  16. mqttPassword string
  17. )
  18. func init() {
  19. var err error
  20. mqttSuffix, err = beego.AppConfig.String("Mqtt_suffix")
  21. if err != nil {
  22. log.Fatalf("Failed to load Mqtt_suffix: %v", err)
  23. }
  24. mqttPort, err = beego.AppConfig.String("Mqtt_port")
  25. if err != nil {
  26. log.Fatalf("Failed to load Mqtt_port: %v", err)
  27. }
  28. mqttUsername, err = beego.AppConfig.String("Mqtt_username")
  29. if err != nil {
  30. log.Fatalf("Failed to load Mqtt_username: %v", err)
  31. }
  32. mqttPassword, err = beego.AppConfig.String("Mqtt_password")
  33. if err != nil {
  34. log.Fatalf("Failed to load Mqtt_password: %v", err)
  35. }
  36. }
  37. // GetMqttClient 获取MQTT客户端连接
  38. func GetMqttClient(mqttAddrPrefix string) (*client.Client, error) {
  39. r := rand.New(rand.NewSource(time.Now().Unix()))
  40. options := client.ConnectOptions{
  41. Network: "tcp",
  42. Address: fmt.Sprintf("%s.%s:%s", mqttAddrPrefix, mqttSuffix, mqttPort),
  43. UserName: []byte(mqttUsername),
  44. Password: []byte(mqttPassword),
  45. ClientID: []byte(fmt.Sprintf("ColdP_server_%d", r.Intn(9))),
  46. }
  47. cli := client.New(&client.Options{
  48. ErrorHandler: func(e error) {
  49. log.Printf("MQTT Error: %v", e)
  50. },
  51. })
  52. if err := cli.Connect(&options); err != nil {
  53. log.Printf("MQTT 连接失败: %v", err)
  54. return nil, err
  55. }
  56. return cli, nil
  57. }
  58. // PubMqttMessage 发送信息
  59. func PubMqttMessage(cli *client.Client, topic string, msg []byte) error {
  60. fmt.Printf("发布主题:%s -> 发布内容:%s\n", topic, string(msg))
  61. opts := &client.PublishOptions{
  62. QoS: mqtt.QoS0,
  63. TopicName: []byte(topic),
  64. Message: msg,
  65. }
  66. if err := cli.Publish(opts); err != nil {
  67. log.Printf("MQTT 发布失败: %v", err)
  68. return err
  69. }
  70. fmt.Println("MQTT发送成功!")
  71. return nil
  72. }
  73. // Subscript 订阅信息
  74. func Subscript(cli *client.Client, topic string, msg chan string, isStr string) error {
  75. fmt.Println("连接成功", time.Now().Format("2006-01-02 15:04:05"))
  76. fmt.Printf("订阅主题:%s\n", topic)
  77. subOpts := &client.SubscribeOptions{
  78. SubReqs: []*client.SubReq{
  79. &client.SubReq{
  80. TopicFilter: []byte(topic),
  81. QoS: mqtt.QoS0,
  82. Handler: func(topicName, message []byte) {
  83. fmt.Println("MQTT接收到信息:", string(message))
  84. if strings.Contains(strings.ReplaceAll(string(message), " ", ""), isStr) {
  85. msg <- topic + "||" + string(message)
  86. if _, ok := <-msg; !ok {
  87. cli.Disconnect()
  88. cli.Terminate()
  89. }
  90. }
  91. },
  92. },
  93. },
  94. }
  95. if err := cli.Subscribe(subOpts); err != nil {
  96. log.Printf("MQTT 订阅失败: %v", err)
  97. return err
  98. }
  99. return nil
  100. }