MqttServer_oldf2.txt 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179
  1. package MqttServer
  2. import (
  3. "ColdP_server/models/Device"
  4. "encoding/json"
  5. "fmt"
  6. beego "github.com/beego/beego/v2/server/web"
  7. "github.com/yosssi/gmq/mqtt"
  8. "github.com/yosssi/gmq/mqtt/client"
  9. "runtime"
  10. )
  11. var (
  12. cli *client.Client
  13. DeviceSn map[string]Device.Device /*创建集合 */
  14. test = true
  15. )
  16. func init() {
  17. if runtime.GOOS == "windows" {
  18. test = true
  19. } else {
  20. test = false
  21. }
  22. //test = false // 强行启动 插入
  23. go Realtime() // 循环刷新
  24. //logs.Async(1e3)
  25. //
  26. //cpuNum := runtime.NumCPU() //获得当前设备的cpu核心数
  27. //fmt.Println("cpu核心数:", cpuNum)
  28. //runtime.GOMAXPROCS(cpuNum) //设置需要用到的cpu数量
  29. //
  30. }
  31. func Run_MqttServer() {
  32. fmt.Println("============Run_MqttServer=============")
  33. // Create an MQTT Client.
  34. cli = client.New(&client.Options{
  35. // Define the processing of the error handler.
  36. ErrorHandler: func(err error) {
  37. fmt.Println(err)
  38. },
  39. })
  40. // Terminate the Client.
  41. defer cli.Terminate()
  42. c := client.ConnectOptions{
  43. Network: "tcp",
  44. Address: "192.168.0.7:1883",
  45. UserName: []byte("coldchain"),
  46. Password: []byte("coldchainBZD"),
  47. }
  48. HTTPPort, _ := beego.AppConfig.String("HTTPPort")
  49. var sub_str = ""
  50. c.Address = "mqtt1.baozhida.cn:1883"
  51. c.ClientID = []byte("go_mqtt_P_client" + HTTPPort)
  52. sub_str = "temp_humidity_sub"
  53. //sub_str = "temp_humidity_sub"
  54. fmt.Println("Address:", c.Address)
  55. fmt.Println("ClientID:", string(c.ClientID))
  56. fmt.Println("sub_str:", sub_str)
  57. // Connect to the MQTT Server.
  58. err := cli.Connect(&c)
  59. if err != nil {
  60. println(err)
  61. }
  62. // Subscribe to topics.
  63. err = cli.Subscribe(&client.SubscribeOptions{
  64. SubReqs: []*client.SubReq{
  65. &client.SubReq{
  66. TopicFilter: []byte(sub_str),
  67. QoS: mqtt.QoS0,
  68. // Define the processing of the message handler.
  69. Handler: func(topicName, message []byte) {
  70. //fmt.Println(string(topicName), string(message))
  71. //messagePubHandler(string(topicName), message)
  72. },
  73. },
  74. &client.SubReq{
  75. TopicFilter: []byte("temp_humidity_sub_host"),
  76. QoS: mqtt.QoS0,
  77. // Define the processing of the message handler.
  78. Handler: func(topicName, message []byte) {
  79. //fmt.Println(string(topicName), string(message))
  80. //messagePubHandler(string(topicName), message)
  81. },
  82. },
  83. //&client.SubReq{
  84. // TopicFilter: []byte("bar/#"),
  85. // QoS: mqtt.QoS1,
  86. // Handler: func(topicName, message []byte) {
  87. // fmt.Println(string(topicName), string(message))
  88. // },
  89. //},
  90. },
  91. })
  92. if err != nil {
  93. }
  94. }
  95. // 发送数据
  96. func Mqtt_publish(topic string, text string) {
  97. fmt.Println(topic, "->", text)
  98. // Publish a message.
  99. err := cli.Publish(&client.PublishOptions{
  100. QoS: mqtt.QoS0,
  101. TopicName: []byte(topic),
  102. Message: []byte(text),
  103. })
  104. if err != nil {
  105. }
  106. }
  107. // 接收數據
  108. func Mqtt_Subscribe(topic string, callBack func([]byte, []byte)) {
  109. fmt.Println(topic, "<-")
  110. cli.Subscribe(&client.SubscribeOptions{
  111. SubReqs: []*client.SubReq{
  112. &client.SubReq{
  113. TopicFilter: []byte(topic),
  114. QoS: mqtt.QoS0,
  115. Handler: callBack,
  116. },
  117. },
  118. })
  119. }
  120. func messagePubHandler(topicName string, message []byte) {
  121. //// 本地测试
  122. //if test {
  123. // fmt.Println("============= Mqtt JSON =============")
  124. // fmt.Printf("topic: %s -> %s \n", topicName, string(message))
  125. //}
  126. var Ms_project Ms_Project
  127. err := json.Unmarshal(message, &Ms_project)
  128. if err != nil {
  129. fmt.Println("JSON反序列化失败[Ms_Project],err=", err)
  130. return
  131. }
  132. SN := Ms_project.Sn
  133. //Type := lib.To_int(Ms_project.Type)
  134. //Msid := lib.To_int(Ms_project.Msid)
  135. //
  136. //fmt.Println("SN:", SN)
  137. //fmt.Println("Type:", Type)
  138. //fmt.Println("Msid:", Msid)
  139. // 过滤
  140. if len(SN) < 8 {
  141. return
  142. }
  143. r_Device, err := Device.Read_Device_ByT_sn(SN)
  144. if err != nil {
  145. //Device.Add_DeviceSnOld(SN)
  146. //fmt.Println("没有该设备:", SN)
  147. return
  148. }
  149. // 并发
  150. //go AsyncFunc(Ms_project,message)
  151. AsyncFunc(r_Device, Ms_project, message)
  152. }