MqttServer.go 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120
  1. package MqttServer
  2. import (
  3. "fmt"
  4. "github.com/yosssi/gmq/mqtt"
  5. "github.com/yosssi/gmq/mqtt/client"
  6. "time"
  7. )
  8. var (
  9. cli *client.Client
  10. T_sn string
  11. T_pass string
  12. )
  13. func init() {
  14. T_sn = "2024168563127390"
  15. T_pass = "qMua4PZyl2vwchNL"
  16. Run_MqttServer()
  17. }
  18. /*
  19. {
  20. "type": 203,
  21. "data": {
  22. "sn": "2024666666666666",
  23. "mqtt1": "mqttyuht.coldbaozhida.com",
  24. "mqtt2": "mqttlodr.coldbaozhida.com",
  25. "mqtt3": "mqttyuht.coldbaozhida.com",
  26. "mqtt4": "",
  27. "mqtt5": "",
  28. "port": 1883,
  29. "user": "2024666666666666",
  30. "pass": "qMua4PZyl2vwchNL"
  31. }
  32. }
  33. */
  34. func Run_MqttServer() {
  35. time.Sleep(3 * time.Second)
  36. fmt.Println("============Run_MqttServer=============", "")
  37. // Create an MQTT Client.
  38. cli = client.New(&client.Options{
  39. // Define the processing of the error handler.
  40. ErrorHandler: func(err error) {
  41. fmt.Println("err!!!!!! Run_MqttServer:", err.Error())
  42. time.Sleep(3 * time.Second)
  43. go Run_MqttServer() // MQTT 通讯
  44. return
  45. },
  46. })
  47. // Terminate the Client.
  48. defer cli.Terminate()
  49. c := client.ConnectOptions{
  50. Network: "tcp",
  51. Address: "mqttyuht.coldbaozhida.com",
  52. ClientID: []byte(T_sn + "_test"),
  53. UserName: []byte(T_sn),
  54. Password: []byte(T_pass),
  55. }
  56. fmt.Println("Address:", c.Address)
  57. fmt.Println("ClientID:", string(c.ClientID))
  58. // Connect to the MQTT Server.
  59. err := cli.Connect(&c)
  60. if err != nil {
  61. fmt.Println("MqttServer", "连接MQTT失败 [cli.Connect]", "")
  62. fmt.Println("err!!!!!! Run_MqttServer:", "连接MQTT失败:", err)
  63. fmt.Println("err!!!!!! 连接MQTT失败:", err)
  64. cli.Terminate()
  65. time.Sleep(3 * time.Second)
  66. go Run_MqttServer() // MQTT 通讯
  67. return
  68. }
  69. // Subscribe to topics.
  70. err = cli.Subscribe(&client.SubscribeOptions{
  71. SubReqs: []*client.SubReq{
  72. &client.SubReq{
  73. TopicFilter: []byte("/pub/2024168563127390"),
  74. QoS: mqtt.QoS0,
  75. // Define the processing of the message handler.
  76. Handler: func(topicName, message []byte) {
  77. fmt.Println(">-" + string(topicName) + " " + string(message))
  78. },
  79. },
  80. },
  81. })
  82. if err != nil {
  83. fmt.Println("MqttServer", "订阅消息 [cli.Subscribe]", "")
  84. fmt.Println("err!!!!!! Run_MqttServer:", "连接MQTT失败:", err)
  85. }
  86. fmt.Println("MQTT ok!")
  87. }
  88. // 发送数据
  89. func Mqtt_publish(topic string, text string) {
  90. // Publish a message.
  91. err := cli.Publish(&client.PublishOptions{
  92. QoS: mqtt.QoS0,
  93. TopicName: []byte("/pub/" + topic),
  94. Message: []byte(text),
  95. })
  96. fmt.Println("-> /pub/" + topic + " " + text)
  97. if err != nil {
  98. fmt.Println("MqttServer", "发送消息失败 [cli.Publish]", text)
  99. }
  100. }