package MqttServer import ( "ColdP_server/models/Device" "encoding/json" "fmt" beego "github.com/beego/beego/v2/server/web" "github.com/yosssi/gmq/mqtt" "github.com/yosssi/gmq/mqtt/client" "runtime" ) var ( cli *client.Client DeviceSn map[string]Device.Device /*创建集合 */ test = true ) func init() { if runtime.GOOS == "windows" { test = true } else { test = false } //test = false // 强行启动 插入 go Realtime() // 循环刷新 //logs.Async(1e3) // //cpuNum := runtime.NumCPU() //获得当前设备的cpu核心数 //fmt.Println("cpu核心数:", cpuNum) //runtime.GOMAXPROCS(cpuNum) //设置需要用到的cpu数量 // } func Run_MqttServer() { fmt.Println("============Run_MqttServer=============") // Create an MQTT Client. cli = client.New(&client.Options{ // Define the processing of the error handler. ErrorHandler: func(err error) { fmt.Println(err) }, }) // Terminate the Client. defer cli.Terminate() c := client.ConnectOptions{ Network: "tcp", Address: "192.168.0.7:1883", UserName: []byte("coldchain"), Password: []byte("coldchainBZD"), } HTTPPort, _ := beego.AppConfig.String("HTTPPort") var sub_str = "" c.Address = "mqtt1.baozhida.cn:1883" c.ClientID = []byte("go_mqtt_P_client" + HTTPPort) sub_str = "temp_humidity_sub" //sub_str = "temp_humidity_sub" fmt.Println("Address:", c.Address) fmt.Println("ClientID:", string(c.ClientID)) fmt.Println("sub_str:", sub_str) // Connect to the MQTT Server. err := cli.Connect(&c) if err != nil { println(err) } // Subscribe to topics. err = cli.Subscribe(&client.SubscribeOptions{ SubReqs: []*client.SubReq{ &client.SubReq{ TopicFilter: []byte(sub_str), QoS: mqtt.QoS0, // Define the processing of the message handler. Handler: func(topicName, message []byte) { //fmt.Println(string(topicName), string(message)) //messagePubHandler(string(topicName), message) }, }, &client.SubReq{ TopicFilter: []byte("temp_humidity_sub_host"), QoS: mqtt.QoS0, // Define the processing of the message handler. Handler: func(topicName, message []byte) { //fmt.Println(string(topicName), string(message)) //messagePubHandler(string(topicName), message) }, }, //&client.SubReq{ // TopicFilter: []byte("bar/#"), // QoS: mqtt.QoS1, // Handler: func(topicName, message []byte) { // fmt.Println(string(topicName), string(message)) // }, //}, }, }) if err != nil { } } // 发送数据 func Mqtt_publish(topic string, text string) { fmt.Println(topic, "->", text) // Publish a message. err := cli.Publish(&client.PublishOptions{ QoS: mqtt.QoS0, TopicName: []byte(topic), Message: []byte(text), }) if err != nil { } } // 接收數據 func Mqtt_Subscribe(topic string, callBack func([]byte, []byte)) { fmt.Println(topic, "<-") cli.Subscribe(&client.SubscribeOptions{ SubReqs: []*client.SubReq{ &client.SubReq{ TopicFilter: []byte(topic), QoS: mqtt.QoS0, Handler: callBack, }, }, }) } func messagePubHandler(topicName string, message []byte) { //// 本地测试 //if test { // fmt.Println("============= Mqtt JSON =============") // fmt.Printf("topic: %s -> %s \n", topicName, string(message)) //} var Ms_project Ms_Project err := json.Unmarshal(message, &Ms_project) if err != nil { fmt.Println("JSON反序列化失败[Ms_Project],err=", err) return } SN := Ms_project.Sn //Type := lib.To_int(Ms_project.Type) //Msid := lib.To_int(Ms_project.Msid) // //fmt.Println("SN:", SN) //fmt.Println("Type:", Type) //fmt.Println("Msid:", Msid) // 过滤 if len(SN) < 8 { return } r_Device, err := Device.Read_Device_ByT_sn(SN) if err != nil { //Device.Add_DeviceSnOld(SN) //fmt.Println("没有该设备:", SN) return } // 并发 //go AsyncFunc(Ms_project,message) AsyncFunc(r_Device, Ms_project, message) }