MqttServev2.go 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990
  1. package MqttServer
  2. import (
  3. "fmt"
  4. beego "github.com/beego/beego/v2/server/web"
  5. "github.com/yosssi/gmq/mqtt"
  6. "github.com/yosssi/gmq/mqtt/client"
  7. "log"
  8. "math/rand"
  9. "strings"
  10. "time"
  11. )
  12. var (
  13. mqtt_suffix,
  14. mqtt_port,
  15. mqtt_username,
  16. mqtt_password string
  17. err error
  18. )
  19. func init() {
  20. mqtt_suffix, err = beego.AppConfig.String("Mqtt_suffix")
  21. mqtt_port, err = beego.AppConfig.String("Mqtt_port")
  22. mqtt_username, err = beego.AppConfig.String("Mqtt_username")
  23. mqtt_password, err = beego.AppConfig.String("Mqtt_password")
  24. }
  25. // GetConnectionOptions 获取链接参数
  26. func GetMqttClient(mqttAddrPrefix string) *client.Client {
  27. r := rand.New(rand.NewSource(time.Now().Unix()))
  28. options := client.ConnectOptions{
  29. Network: "tcp",
  30. Address: fmt.Sprintf("%s.coldbaozhida.com:1883", mqttAddrPrefix),
  31. UserName: []byte(mqtt_username),
  32. Password: []byte(mqtt_password),
  33. ClientID: []byte(fmt.Sprintf("ColdP_server_%d", r.Intn(9))),
  34. }
  35. cli := client.New(&client.Options{ErrorHandler: func(e error) {
  36. fmt.Println("PubMqtt Error 发布信息错误:", e.Error())
  37. }})
  38. err = cli.Connect(&options)
  39. if err != nil {
  40. fmt.Println("MQTT 连接失败", err.Error())
  41. }
  42. return cli
  43. }
  44. // PubMqttMessage 发送信息
  45. func PubMqttMessage(cli *client.Client, topic string, msg []byte) {
  46. fmt.Printf("发布主题:%s -> 发布内容:%s\n", topic, string(msg))
  47. time.Sleep(time.Second)
  48. err = cli.Publish(&client.PublishOptions{
  49. QoS: mqtt.QoS0,
  50. TopicName: []byte(topic),
  51. Message: msg,
  52. })
  53. if err != nil {
  54. log.Panicln("Mqtt发布失败", err.Error())
  55. return
  56. }
  57. fmt.Println("MQTT发送成功!")
  58. }
  59. // Subscript 订阅信息
  60. func Subscript(cli *client.Client, topic string, msg chan string, is_str string) {
  61. fmt.Println("连接成功", time.Now().Format("2006-01-02 15:04:05"))
  62. fmt.Printf("订阅主题:%s\n", topic)
  63. cli.Subscribe(&client.SubscribeOptions{
  64. SubReqs: []*client.SubReq{
  65. &client.SubReq{
  66. TopicFilter: []byte(topic),
  67. QoS: mqtt.QoS0,
  68. Handler: func(topicName, message []byte) {
  69. //如果管道已经关闭,表示已经超时
  70. fmt.Println("MQTT接收到信息:", string(message))
  71. if strings.Contains(strings.Replace(string(message), " ", "", -1), is_str) {
  72. msg <- topic + "||" + string(message)
  73. if _, ok := <-msg; !ok {
  74. cli.Disconnect()
  75. cli.Terminate()
  76. }
  77. }
  78. },
  79. },
  80. },
  81. })
  82. }