123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145 |
- //+build ignore
- package MqttServer
- import (
- "ColdP_server/conf"
- "ColdP_server/models/Device"
- "ColdP_server/models/System"
- "encoding/json"
- "fmt"
- "github.com/beego/beego/v2/core/logs"
- mqtt "github.com/eclipse/paho.mqtt.golang"
- "runtime"
- )
- var logx * logs.BeeLogger
- var client mqtt.Client
- var (
- DeviceSn map[string]Device.Device /*创建集合 */
- test = true
- )
- func init() {
- logx = logs.NewLogger()
- logx.SetLogger(logs.AdapterFile, `{"filename":"logs/Mqtt/Mqtt.log"}`)
- //logs.Async(1e3)
- cpuNum := runtime.NumCPU() //获得当前设备的cpu核心数
- fmt.Println("cpu核心数:", cpuNum)
- runtime.GOMAXPROCS(cpuNum) //设置需要用到的cpu数量
- DeviceSn = make(map[string]Device.Device)
- if runtime.GOOS == "windows" {
- test = true
- } else {
- test = false
- }
- //test = false // 强行启动 插入
- }
- var messagePubHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
- // 本地测试
- if(test) {
- fmt.Println("============= Mqtt JSON =============")
- fmt.Printf("Received message: %s from topic: %s\n", msg.Payload(), msg.Topic())
- return
- }
- logx.Info(string(msg.Payload()) )
- var Ms_project Ms_Project
- err := json.Unmarshal(msg.Payload(), &Ms_project)
- if err != nil {
- System.Add_Logs("MqttServer","JSON反序列化失败[Ms_Project]",string(msg.Payload()))
- fmt.Println("JSON反序列化失败[Ms_Project],err=",err)
- return
- }
- SN := Ms_project.Sn
- //Type := lib.To_int(Ms_project.Type)
- //Msid := lib.To_int(Ms_project.Msid)
- //
- //fmt.Println("SN:", SN)
- //fmt.Println("Type:", Type)
- //fmt.Println("Msid:", Msid)
- // 过滤
- if(len(SN) < 8){
- return
- }
- _, ok := DeviceSn[SN] /*如果确定是真实的,则存在,否则不存在 */
- if !ok {
- r_Device,err := Device.Read_Device_ByT_sn(SN)
- if err != nil {
- Device.Add_DeviceSnOld(SN)
- fmt.Println("没有该设备:",SN)
- return
- }
- DeviceSn[SN] = r_Device
- }
- // 并发
- //go AsyncFunc(Ms_project,msg.Payload())
- AsyncFunc(Ms_project,msg.Payload())
- }
- var connectHandler mqtt.OnConnectHandler = func(client mqtt.Client) {
- fmt.Println("Connected")
- }
- var connectLostHandler mqtt.ConnectionLostHandler = func(client mqtt.Client, err error) {
- fmt.Printf("Connect lost: %v", err)
- }
- func Mqtt_publish(topic string,text string) {
- token := client.Publish(topic, 0, false, text)
- token.Wait()
- }
- func Mqtt_sub() {
- topic := conf.MqttServer_Sub
- token := client.Subscribe(topic, 1, nil)
- token.Wait()
- fmt.Printf("Subscribed to topic: %s", topic)
- }
- func Run_MqttServer() {
- fmt.Printf("Run_MqttServer")
- opts := mqtt.NewClientOptions()
- MqttServer_Url := conf.MqttServer_Url
- if runtime.GOOS == "windows" {
- MqttServer_Url = "mqtt1.baozhida.cn"
- opts.SetClientID("go_mqtt_client_test")
- } else {
- opts.SetClientID("go_mqtt_client")
- }
- opts.AddBroker(fmt.Sprintf("tcp://%s:%d",MqttServer_Url , conf.MqttServer_Port))
- opts.SetUsername(conf.MqttServer_Username)
- opts.SetPassword(conf.MqttServer_Password)
- opts.SetDefaultPublishHandler(messagePubHandler)
- opts.OnConnect = connectHandler
- opts.OnConnectionLost = connectLostHandler
- client = mqtt.NewClient(opts)
- if token := client.Connect(); token.Wait() && token.Error() != nil {
- panic(token.Error())
- }
- Mqtt_sub()
- //publish(client)
- //
- //client.Disconnect(250)
- }
|