//+build ignore package MqttServer import ( "ColdP_server/conf" "ColdP_server/models/Device" "ColdP_server/models/System" "encoding/json" "fmt" "github.com/beego/beego/v2/core/logs" mqtt "github.com/eclipse/paho.mqtt.golang" "runtime" ) var logx * logs.BeeLogger var client mqtt.Client var ( DeviceSn map[string]Device.Device /*创建集合 */ test = true ) func init() { logx = logs.NewLogger() logx.SetLogger(logs.AdapterFile, `{"filename":"logs/Mqtt/Mqtt.log"}`) //logs.Async(1e3) cpuNum := runtime.NumCPU() //获得当前设备的cpu核心数 fmt.Println("cpu核心数:", cpuNum) runtime.GOMAXPROCS(cpuNum) //设置需要用到的cpu数量 DeviceSn = make(map[string]Device.Device) if runtime.GOOS == "windows" { test = true } else { test = false } //test = false // 强行启动 插入 } var messagePubHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) { // 本地测试 if(test) { fmt.Println("============= Mqtt JSON =============") fmt.Printf("Received message: %s from topic: %s\n", msg.Payload(), msg.Topic()) return } logx.Info(string(msg.Payload()) ) var Ms_project Ms_Project err := json.Unmarshal(msg.Payload(), &Ms_project) if err != nil { System.Add_Logs("MqttServer","JSON反序列化失败[Ms_Project]",string(msg.Payload())) fmt.Println("JSON反序列化失败[Ms_Project],err=",err) return } SN := Ms_project.Sn //Type := lib.To_int(Ms_project.Type) //Msid := lib.To_int(Ms_project.Msid) // //fmt.Println("SN:", SN) //fmt.Println("Type:", Type) //fmt.Println("Msid:", Msid) // 过滤 if(len(SN) < 8){ return } _, ok := DeviceSn[SN] /*如果确定是真实的,则存在,否则不存在 */ if !ok { r_Device,err := Device.Read_Device_ByT_sn(SN) if err != nil { Device.Add_DeviceSnOld(SN) fmt.Println("没有该设备:",SN) return } DeviceSn[SN] = r_Device } // 并发 //go AsyncFunc(Ms_project,msg.Payload()) AsyncFunc(Ms_project,msg.Payload()) } var connectHandler mqtt.OnConnectHandler = func(client mqtt.Client) { fmt.Println("Connected") } var connectLostHandler mqtt.ConnectionLostHandler = func(client mqtt.Client, err error) { fmt.Printf("Connect lost: %v", err) } func Mqtt_publish(topic string,text string) { token := client.Publish(topic, 0, false, text) token.Wait() } func Mqtt_sub() { topic := conf.MqttServer_Sub token := client.Subscribe(topic, 1, nil) token.Wait() fmt.Printf("Subscribed to topic: %s", topic) } func Run_MqttServer() { fmt.Printf("Run_MqttServer") opts := mqtt.NewClientOptions() MqttServer_Url := conf.MqttServer_Url if runtime.GOOS == "windows" { MqttServer_Url = "mqtt1.baozhida.cn" opts.SetClientID("go_mqtt_client_test") } else { opts.SetClientID("go_mqtt_client") } opts.AddBroker(fmt.Sprintf("tcp://%s:%d",MqttServer_Url , conf.MqttServer_Port)) opts.SetUsername(conf.MqttServer_Username) opts.SetPassword(conf.MqttServer_Password) opts.SetDefaultPublishHandler(messagePubHandler) opts.OnConnect = connectHandler opts.OnConnectionLost = connectLostHandler client = mqtt.NewClient(opts) if token := client.Connect(); token.Wait() && token.Error() != nil { panic(token.Error()) } Mqtt_sub() //publish(client) // //client.Disconnect(250) }