package MqttServer import ( "Yunlot/Handle" "Yunlot/conf" "Yunlot/lib" "Yunlot/logs" "Yunlot/models/Device" "Yunlot/models/Product" "encoding/json" "fmt" beego "github.com/beego/beego/v2/server/web" "github.com/gin-gonic/gin" "github.com/yosssi/gmq/mqtt" "github.com/yosssi/gmq/mqtt/client" "plugin" "strconv" "strings" "time" ) var cli *client.Client func Run_MqttServer() { time.Sleep(3 * time.Second) logs.Println("============Run_MqttServer=============", "") HTTPPort, _ := beego.AppConfig.String("HTTPPort") // Create an MQTT Client. cli = client.New(&client.Options{ // Define the processing of the error handler. ErrorHandler: func(err error) { logs.PrintlnError("err!!!!!! Run_MqttServer:", err.Error()) time.Sleep(3 * time.Second) go Run_MqttServer() // MQTT 通讯 return }, }) // Terminate the Client. defer cli.Terminate() c := client.ConnectOptions{ Network: "tcp", Address: conf.MqttServer_Url, ClientID: []byte(conf.MqttServer_ClientID + HTTPPort), UserName: []byte(conf.MqttServer_Username), Password: []byte(conf.MqttServer_Password), } logs.Println("Address:", c.Address) logs.Println("ClientID:", string(c.ClientID)) // Connect to the MQTT Server. err := cli.Connect(&c) if err != nil { logs.Println("MqttServer", "连接MQTT失败 [cli.Connect]", "") logs.Println("err!!!!!! Run_MqttServer:", "连接MQTT失败:", err) fmt.Println("err!!!!!! 连接MQTT失败:", err) cli.Terminate() time.Sleep(3 * time.Second) go Run_MqttServer() // MQTT 通讯 return } logs.Println("MqttServer", "连接成功!") // Subscribe to topics. err = cli.Subscribe(&client.SubscribeOptions{ SubReqs: []*client.SubReq{ &client.SubReq{ TopicFilter: []byte("#"), QoS: mqtt.QoS0, // Define the processing of the message handler. Handler: func(topicName, message []byte) { _, exists := lib.TopicMap[string(topicName)] if exists { //logs.Println("跳过:",string(topicName)) return // 这个 订阅号 平台->设备,跳过 } start := time.Now() messagePubHandler(string(topicName), message) elapsed := time.Since(start) fmt.Printf("代码运行时间:%s\n", elapsed) }, }, &client.SubReq{ // 设备断开 TopicFilter: []byte("$SYS/brokers/+/clients/+/disconnected"), QoS: mqtt.QoS0, // Define the processing of the message handler. Handler: func(topicName, message []byte) { logs.PrintlnMqtt("<-" + string(topicName) + " " + string(message)) MessageDisconnected(string(topicName), message) }, }, &client.SubReq{ // 设备上线 TopicFilter: []byte("$SYS/brokers/+/clients/+/connected"), QoS: mqtt.QoS0, // Define the processing of the message handler. Handler: func(topicName, message []byte) { logs.PrintlnMqtt("<-" + string(topicName) + " " + string(message)) MessageConnected(string(topicName), message) }, }, }, }) if err != nil { logs.Println("MqttServer", "订阅消息 [cli.Subscribe]", "") logs.Println("err!!!!!! Run_MqttServer:", "连接MQTT失败:", err) } fmt.Println("MQTT ok!") go MQTT_Gin_Run() // 初始化 MQTT ACL } func MQTT_Gin_Run() { // 创建一个Gin路由器 r := gin.Default() // 定义路由 r.POST("/authentication", func(c *gin.Context) { var json struct { Clientid string `json:"clientid"` Username string `json:"username"` Password string `json:"password"` } if err := c.Bind(&json); err != nil { c.JSON(200, gin.H{ "result": "deny", // 认证结果 可选 "allow" | "deny" | "ignore" "is_superuser": false, // 超级用户 可选 true | false,该项为空时默认为 false }) return } // 保证 Clientid唯一,否则会 下线 if json.Clientid != json.Username { c.JSON(200, gin.H{ "result": "deny", // 认证结果 可选 "allow" | "deny" | "ignore" "is_superuser": false, // 超级用户 可选 true | false,该项为空时默认为 false }) return } Device_r := Device.Device{ T_sn: json.Username, } // 判断是否存在 if !Device_r.Read_Tidy() { c.JSON(200, gin.H{ "result": "deny", // 认证结果 可选 "allow" | "deny" | "ignore" "is_superuser": false, // 超级用户 可选 true | false,该项为空时默认为 false }) return } // 判断密码是否正确 if Device_r.T_password != json.Password { c.JSON(200, gin.H{ "result": "deny", // 认证结果 可选 "allow" | "deny" | "ignore" "is_superuser": false, // 超级用户 可选 true | false,该项为空时默认为 false }) return } c.JSON(200, gin.H{ "result": "allow", // 可选 "allow" | "deny" | "ignore" "is_superuser": false, // 可选 true | false,该项为空时默认为 false }) }) r.POST("/Acl", func(c *gin.Context) { logs.Println("Acl!!") var json struct { Clientid string `json:"clientid"` //${clientid} — 客户端的 ID。 Username string `json:"username"` //${username} — 客户端登录是用的用户名。 Peerhost string `json:"peerhost"` //${peerhost} — 客户端的源 IP 地址。 Proto_name string `json:"proto_name"` //${proto_name} — 客户端使用的协议名称。例如 MQTT,CoAP 等。 Mountpoint string `json:"mountpoint"` //${mountpoint} — 网关监听器的挂载点(主题前缀)。 Action string `json:"action"` //${action} — 当前执行的动作请求,例如 publish,subscribe。 Topic string `json:"topic"` //${topic} — 当前请求想要发布或订阅的主题(或主题过滤器) } if err := c.Bind(&json); err != nil { c.JSON(200, gin.H{ "result": "deny", // 可选 "allow" | "deny" | "ignore" }) } fmt.Println("json:", json) if json.Username == "admin" { c.JSON(200, gin.H{"result": "allow"}) return } if json.Username == "test" { c.JSON(200, gin.H{"result": "allow"}) return } // 保证 Clientid唯一,否则会 下线 if json.Clientid != json.Username { c.JSON(200, gin.H{"result": "deny"}) return } Device_r := Device.Device{ T_sn: json.Username, } // 判断是否存在 if !Device_r.Read_Tidy() { c.JSON(200, gin.H{"result": "deny"}) return } //topic /sub/SN /pub/SN topic_list := strings.Split(json.Topic, "/") for _, v := range topic_list { if len(v) < 7 { continue // 不是有效SN } if v == Device_r.T_sn { c.JSON(200, gin.H{"result": "allow"}) return } } logs.Println("MQTT Acl E!topic_list:", topic_list) c.JSON(200, gin.H{"result": "deny"}) return // //Clientid_list := strings.Split(username+"_s", "_") //username = Clientid_list[0] //if topic_list[2] != username { // fmt.Println("topic_list[2] != username",topic_list[2]) // c.JSON(200, gin.H{"result": "deny"}) // return //} //c.JSON(200, gin.H{"result": "allow"}) //return }) r.Run(":8080") } // 开始处理 func messagePubHandler(topicName string, message []byte) { // 本地测试 logs.Println("", "============= Mqtt JSON =============") logs.Println("<-"+topicName, string(message)) // 过滤 topicNameS := strings.Split(topicName, "/") if len(topicNameS) == 1 { logs.Println("MqttServer", "订阅地址错误 len(topicNameS) == 1", strconv.Itoa(len(topicNameS))) return } // 验证设备 Device_r := Device.Device{} for _, sn := range topicNameS { if len(sn) < 7 { continue // 不是有效SN } Device_r.T_sn = sn if Device_r.Read_Tidy() { if Device_r.T_state == 3 { continue // 不是有效SN } break } } if Device_r.T_state != 1 { logs.Println("MqttServer", Device_r.T_sn+" 设备未激活") return } // 设备类型 ProductType_r := Product.ProductType{T_ProductID: Device_r.T_ProductID} if !ProductType_r.Read() { logs.Println("MqttServer", Device_r.T_sn+"|"+Device_r.T_ProductID+" 设备类型找不到!") return } // 设备协议 ProductProt_r := Product.ProductProt{Id: ProductType_r.T_prot} if !ProductProt_r.Read() { logs.Println("MqttServer", Device_r.T_sn+"|"+Device_r.T_ProductID+"-"+fmt.Sprintf("%d", ProductType_r.T_prot)+" 设备协议找不到!") return } // Plugin 指针添加到 MAP,不知道能不能提高速度 //_, exists := lib.TopicMap[string(topicName)] //if exists { // //logs.Println("跳过:",string(topicName)) // return // 这个 订阅号 平台->设备,跳过 //} //p, p_is := lib.PluginMap[ProductProt_r.T_analysis] //if !p_is { // var err error // // 根据库的存放路径加载库 // p_p, err := plugin.Open(conf.Analysis_Dir + ProductProt_r.T_analysis + ".so") // if err != nil { // logs.PrintlnError("打开 SO 失败:", err) // return // } // p = p_p // lib.PluginMap[ProductProt_r.T_analysis] = p // logs.Println("NEW Plugin 地址:",&p_p) //} // 加载 SO 文件 p, err := plugin.Open(conf.Analysis_Dir + ProductProt_r.T_analysis + ".so") if err != nil { logs.PrintlnError("打开 SO 失败:", err) return } logs.Println("Plugin 地址:", &p) var Rt_r = lib.Rt{Status: 200, Msg: "ok"} // 查找库导出信息 s, err := p.Lookup("T") if err != nil { logs.Println("", err) panic(any(err)) } // 类型转换 f := s.(func(t string, b []byte) string)(topicName, message) // 开始处理 logs.Println("协议后:", f) //logs.Println("首字符:", string(f[0])) if f[0] == '{' { //结构体 var json_r map[string]interface{} err = json.Unmarshal([]byte(f), &json_r) if err != nil { Rt_r.Status = 203 Rt_r.Msg = "json E!" goto Mreturn } Handle.AnalysisMap(&Device_r, ProductType_r, json_r, "") // 合并json Device_r.Read_Tidy() // 提前最新的 logs.Println("Device_r.T_data:", Device_r.T_data) if len(Device_r.T_data) > 5 { json.Unmarshal([]byte(Device_r.T_data), &json_r) } Device_r.T_dataJson = json_r Device_r.UpdateTime.NowDbTime() Device_r.Update("T_data", "UpdateTime") } else { //列表 var json_lr []map[string]interface{} err = json.Unmarshal([]byte(f), &json_lr) if err != nil { Rt_r.Status = 204 Rt_r.Msg = "[]json E!" goto Mreturn } for _, value := range json_lr { Handle.AnalysisMap(&Device_r, ProductType_r, value, "") } // 合并json Device_r.Read_Tidy() // 提前最新的 if len(Device_r.T_data) > 5 { json.Unmarshal([]byte(Device_r.T_data), &json_lr[len(json_lr)-1]) } Device_r.T_dataJson = json_lr[len(json_lr)-1] Device_r.UpdateTime.NowDbTime() Device_r.Update("T_data", "UpdateTime") } // 返回 Mreturn: data, _ := json.Marshal(Rt_r) fmt.Println(string(data)) // 查找库导出信息 s, err = p.Lookup("R") if err != nil { logs.Println("", err) panic(any(err)) } // 转换 t, b := s.(func(t string, b string) (string, []byte))(topicName, string(data)) // 订阅号 与 内容 必须有数据 if len(t) > 0 && len(b) > 0 { // 返回数据 Mqtt_publish(t, b) } } // 发送数据 func Mqtt_publish(topic string, b []byte) { lib.TopicMap[topic] = true // Publish a message. err := cli.Publish(&client.PublishOptions{ QoS: mqtt.QoS0, TopicName: []byte(topic), Message: b, }) logs.PrintlnMqtt("-> " + topic + ":" + string(b)) if err != nil { logs.PrintlnError("MqttServer", "发送消息失败 [Mqtt_publish]", "-> "+topic+" "+string(b)) } }