123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179 |
- package MqttServer
- import (
- "ColdP_server/models/Device"
- "encoding/json"
- "fmt"
- beego "github.com/beego/beego/v2/server/web"
- "github.com/yosssi/gmq/mqtt"
- "github.com/yosssi/gmq/mqtt/client"
- "runtime"
- )
- var (
- cli *client.Client
- DeviceSn map[string]Device.Device /*创建集合 */
- test = true
- )
- func init() {
- if runtime.GOOS == "windows" {
- test = true
- } else {
- test = false
- }
- //test = false // 强行启动 插入
- go Realtime() // 循环刷新
- //logs.Async(1e3)
- //
- //cpuNum := runtime.NumCPU() //获得当前设备的cpu核心数
- //fmt.Println("cpu核心数:", cpuNum)
- //runtime.GOMAXPROCS(cpuNum) //设置需要用到的cpu数量
- //
- }
- func Run_MqttServer() {
- fmt.Println("============Run_MqttServer=============")
- // Create an MQTT Client.
- cli = client.New(&client.Options{
- // Define the processing of the error handler.
- ErrorHandler: func(err error) {
- fmt.Println(err)
- },
- })
- // Terminate the Client.
- defer cli.Terminate()
- c := client.ConnectOptions{
- Network: "tcp",
- Address: "192.168.0.7:1883",
- UserName: []byte("coldchain"),
- Password: []byte("coldchainBZD"),
- }
- HTTPPort, _ := beego.AppConfig.String("HTTPPort")
- var sub_str = ""
- c.Address = "mqtt1.baozhida.cn:1883"
- c.ClientID = []byte("go_mqtt_P_client" + HTTPPort)
- sub_str = "temp_humidity_sub"
- //sub_str = "temp_humidity_sub"
- fmt.Println("Address:", c.Address)
- fmt.Println("ClientID:", string(c.ClientID))
- fmt.Println("sub_str:", sub_str)
- // Connect to the MQTT Server.
- err := cli.Connect(&c)
- if err != nil {
- println(err)
- }
- // Subscribe to topics.
- err = cli.Subscribe(&client.SubscribeOptions{
- SubReqs: []*client.SubReq{
- &client.SubReq{
- TopicFilter: []byte(sub_str),
- QoS: mqtt.QoS0,
- // Define the processing of the message handler.
- Handler: func(topicName, message []byte) {
- //fmt.Println(string(topicName), string(message))
- //messagePubHandler(string(topicName), message)
- },
- },
- &client.SubReq{
- TopicFilter: []byte("temp_humidity_sub_host"),
- QoS: mqtt.QoS0,
- // Define the processing of the message handler.
- Handler: func(topicName, message []byte) {
- //fmt.Println(string(topicName), string(message))
- //messagePubHandler(string(topicName), message)
- },
- },
- //&client.SubReq{
- // TopicFilter: []byte("bar/#"),
- // QoS: mqtt.QoS1,
- // Handler: func(topicName, message []byte) {
- // fmt.Println(string(topicName), string(message))
- // },
- //},
- },
- })
- if err != nil {
- }
- }
- // 发送数据
- func Mqtt_publish(topic string, text string) {
- fmt.Println(topic, "->", text)
- // Publish a message.
- err := cli.Publish(&client.PublishOptions{
- QoS: mqtt.QoS0,
- TopicName: []byte(topic),
- Message: []byte(text),
- })
- if err != nil {
- }
- }
- // 接收數據
- func Mqtt_Subscribe(topic string, callBack func([]byte, []byte)) {
- fmt.Println(topic, "<-")
- cli.Subscribe(&client.SubscribeOptions{
- SubReqs: []*client.SubReq{
- &client.SubReq{
- TopicFilter: []byte(topic),
- QoS: mqtt.QoS0,
- Handler: callBack,
- },
- },
- })
- }
- func messagePubHandler(topicName string, message []byte) {
- //// 本地测试
- //if test {
- // fmt.Println("============= Mqtt JSON =============")
- // fmt.Printf("topic: %s -> %s \n", topicName, string(message))
- //}
- var Ms_project Ms_Project
- err := json.Unmarshal(message, &Ms_project)
- if err != nil {
- fmt.Println("JSON反序列化失败[Ms_Project],err=", err)
- return
- }
- SN := Ms_project.Sn
- //Type := lib.To_int(Ms_project.Type)
- //Msid := lib.To_int(Ms_project.Msid)
- //
- //fmt.Println("SN:", SN)
- //fmt.Println("Type:", Type)
- //fmt.Println("Msid:", Msid)
- // 过滤
- if len(SN) < 8 {
- return
- }
- r_Device, err := Device.Read_Device_ByT_sn(SN)
- if err != nil {
- //Device.Add_DeviceSnOld(SN)
- //fmt.Println("没有该设备:", SN)
- return
- }
- // 并发
- //go AsyncFunc(Ms_project,message)
- AsyncFunc(r_Device, Ms_project, message)
- }
|