MqttServer_old.txt 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145
  1. //+build ignore
  2. package MqttServer
  3. import (
  4. "ColdP_server/conf"
  5. "ColdP_server/models/Device"
  6. "ColdP_server/models/System"
  7. "encoding/json"
  8. "fmt"
  9. "github.com/beego/beego/v2/core/logs"
  10. mqtt "github.com/eclipse/paho.mqtt.golang"
  11. "runtime"
  12. )
  13. var logx * logs.BeeLogger
  14. var client mqtt.Client
  15. var (
  16. DeviceSn map[string]Device.Device /*创建集合 */
  17. test = true
  18. )
  19. func init() {
  20. logx = logs.NewLogger()
  21. logx.SetLogger(logs.AdapterFile, `{"filename":"logs/Mqtt/Mqtt.log"}`)
  22. //logs.Async(1e3)
  23. cpuNum := runtime.NumCPU() //获得当前设备的cpu核心数
  24. fmt.Println("cpu核心数:", cpuNum)
  25. runtime.GOMAXPROCS(cpuNum) //设置需要用到的cpu数量
  26. DeviceSn = make(map[string]Device.Device)
  27. if runtime.GOOS == "windows" {
  28. test = true
  29. } else {
  30. test = false
  31. }
  32. //test = false // 强行启动 插入
  33. }
  34. var messagePubHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
  35. // 本地测试
  36. if(test) {
  37. fmt.Println("============= Mqtt JSON =============")
  38. fmt.Printf("Received message: %s from topic: %s\n", msg.Payload(), msg.Topic())
  39. return
  40. }
  41. logx.Info(string(msg.Payload()) )
  42. var Ms_project Ms_Project
  43. err := json.Unmarshal(msg.Payload(), &Ms_project)
  44. if err != nil {
  45. System.Add_Logs("MqttServer","JSON反序列化失败[Ms_Project]",string(msg.Payload()))
  46. fmt.Println("JSON反序列化失败[Ms_Project],err=",err)
  47. return
  48. }
  49. SN := Ms_project.Sn
  50. //Type := lib.To_int(Ms_project.Type)
  51. //Msid := lib.To_int(Ms_project.Msid)
  52. //
  53. //fmt.Println("SN:", SN)
  54. //fmt.Println("Type:", Type)
  55. //fmt.Println("Msid:", Msid)
  56. // 过滤
  57. if(len(SN) < 8){
  58. return
  59. }
  60. _, ok := DeviceSn[SN] /*如果确定是真实的,则存在,否则不存在 */
  61. if !ok {
  62. r_Device,err := Device.Read_Device_ByT_sn(SN)
  63. if err != nil {
  64. Device.Add_DeviceSnOld(SN)
  65. fmt.Println("没有该设备:",SN)
  66. return
  67. }
  68. DeviceSn[SN] = r_Device
  69. }
  70. // 并发
  71. //go AsyncFunc(Ms_project,msg.Payload())
  72. AsyncFunc(Ms_project,msg.Payload())
  73. }
  74. var connectHandler mqtt.OnConnectHandler = func(client mqtt.Client) {
  75. fmt.Println("Connected")
  76. }
  77. var connectLostHandler mqtt.ConnectionLostHandler = func(client mqtt.Client, err error) {
  78. fmt.Printf("Connect lost: %v", err)
  79. }
  80. func Mqtt_publish(topic string,text string) {
  81. token := client.Publish(topic, 0, false, text)
  82. token.Wait()
  83. }
  84. func Mqtt_sub() {
  85. topic := conf.MqttServer_Sub
  86. token := client.Subscribe(topic, 1, nil)
  87. token.Wait()
  88. fmt.Printf("Subscribed to topic: %s", topic)
  89. }
  90. func Run_MqttServer() {
  91. fmt.Printf("Run_MqttServer")
  92. opts := mqtt.NewClientOptions()
  93. MqttServer_Url := conf.MqttServer_Url
  94. if runtime.GOOS == "windows" {
  95. MqttServer_Url = "mqtt1.baozhida.cn"
  96. opts.SetClientID("go_mqtt_client_test")
  97. } else {
  98. opts.SetClientID("go_mqtt_client")
  99. }
  100. opts.AddBroker(fmt.Sprintf("tcp://%s:%d",MqttServer_Url , conf.MqttServer_Port))
  101. opts.SetUsername(conf.MqttServer_Username)
  102. opts.SetPassword(conf.MqttServer_Password)
  103. opts.SetDefaultPublishHandler(messagePubHandler)
  104. opts.OnConnect = connectHandler
  105. opts.OnConnectionLost = connectLostHandler
  106. client = mqtt.NewClient(opts)
  107. if token := client.Connect(); token.Wait() && token.Error() != nil {
  108. panic(token.Error())
  109. }
  110. Mqtt_sub()
  111. //publish(client)
  112. //
  113. //client.Disconnect(250)
  114. }