package MqttServer import ( "Yunlot/conf" "Yunlot/lib" "Yunlot/logs" "Yunlot/models/Device" "Yunlot/models/Product" "encoding/json" "fmt" beego "github.com/beego/beego/v2/server/web" "github.com/gin-gonic/gin" "github.com/yosssi/gmq/mqtt" "github.com/yosssi/gmq/mqtt/client" "plugin" "strconv" "strings" "time" ) var cli *client.Client func Run_MqttServer() { time.Sleep(3 * time.Second) logs.Println("============Run_MqttServer=============", "") HTTPPort, _ := beego.AppConfig.String("HTTPPort") // Create an MQTT Client. cli = client.New(&client.Options{ // Define the processing of the error handler. ErrorHandler: func(err error) { logs.PrintlnError("err!!!!!! Run_MqttServer:", err.Error()) time.Sleep(3 * time.Second) go Run_MqttServer() // MQTT 通讯 return }, }) // Terminate the Client. defer cli.Terminate() c := client.ConnectOptions{ Network: "tcp", Address: conf.MqttServer_Url, ClientID: []byte(conf.MqttServer_ClientID + HTTPPort), UserName: []byte(conf.MqttServer_Username), Password: []byte(conf.MqttServer_Password), } logs.Println("Address:", c.Address) logs.Println("ClientID:", string(c.ClientID)) // Connect to the MQTT Server. err := cli.Connect(&c) if err != nil { logs.Println("MqttServer", "连接MQTT失败 [cli.Connect]", "") logs.Println("err!!!!!! Run_MqttServer:", "连接MQTT失败:", err) fmt.Println("err!!!!!! 连接MQTT失败:", err) cli.Terminate() time.Sleep(3 * time.Second) go Run_MqttServer() // MQTT 通讯 return } // Subscribe to topics. err = cli.Subscribe(&client.SubscribeOptions{ SubReqs: []*client.SubReq{ &client.SubReq{ TopicFilter: []byte("#"), QoS: mqtt.QoS0, // Define the processing of the message handler. Handler: func(topicName, message []byte) { logs.PrintlnMqtt("<-" + string(topicName) + " " + string(message)) messagePubHandler(string(topicName), message) }, }, &client.SubReq{ // 设备断开 TopicFilter: []byte("$SYS/brokers/+/clients/+/disconnected"), QoS: mqtt.QoS0, // Define the processing of the message handler. Handler: func(topicName, message []byte) { logs.PrintlnMqtt("<-" + string(topicName) + " " + string(message)) MessageDisconnected(string(topicName), message) }, }, &client.SubReq{ // 设备上线 TopicFilter: []byte("$SYS/brokers/+/clients/+/connected"), QoS: mqtt.QoS0, // Define the processing of the message handler. Handler: func(topicName, message []byte) { logs.PrintlnMqtt("<-" + string(topicName) + " " + string(message)) MessageConnected(string(topicName), message) }, }, }, }) if err != nil { logs.Println("MqttServer", "订阅消息 [cli.Subscribe]", "") logs.Println("err!!!!!! Run_MqttServer:", "连接MQTT失败:", err) } fmt.Println("MQTT ok!") go MQTT_Gin_Run() // 初始化 MQTT ACL } func MQTT_Gin_Run() { // 创建一个Gin路由器 r := gin.Default() // 定义路由 r.POST("/authentication", func(c *gin.Context) { var json struct { Clientid string `json:"clientid"` Username string `json:"username"` Password string `json:"password"` } if err := c.Bind(&json); err != nil { c.JSON(200, gin.H{ "result": "deny", // 认证结果 可选 "allow" | "deny" | "ignore" "is_superuser": false, // 超级用户 可选 true | false,该项为空时默认为 false }) return } // 保证 Clientid唯一,否则会 下线 if json.Clientid != json.Username { c.JSON(200, gin.H{ "result": "deny", // 认证结果 可选 "allow" | "deny" | "ignore" "is_superuser": false, // 超级用户 可选 true | false,该项为空时默认为 false }) return } Device_r := Device.Device{ T_sn: json.Username, } // 判断是否存在 if !Device_r.Read_Tidy() { c.JSON(200, gin.H{ "result": "deny", // 认证结果 可选 "allow" | "deny" | "ignore" "is_superuser": false, // 超级用户 可选 true | false,该项为空时默认为 false }) return } // 判断密码是否正确 if Device_r.T_password != json.Password { c.JSON(200, gin.H{ "result": "deny", // 认证结果 可选 "allow" | "deny" | "ignore" "is_superuser": false, // 超级用户 可选 true | false,该项为空时默认为 false }) return } c.JSON(200, gin.H{ "result": "allow", // 可选 "allow" | "deny" | "ignore" "is_superuser": false, // 可选 true | false,该项为空时默认为 false }) }) r.POST("/Acl", func(c *gin.Context) { logs.Println("Acl!!") var json struct { Clientid string `json:"clientid"` //${clientid} — 客户端的 ID。 Username string `json:"username"` //${username} — 客户端登录是用的用户名。 Peerhost string `json:"peerhost"` //${peerhost} — 客户端的源 IP 地址。 Proto_name string `json:"proto_name"` //${proto_name} — 客户端使用的协议名称。例如 MQTT,CoAP 等。 Mountpoint string `json:"mountpoint"` //${mountpoint} — 网关监听器的挂载点(主题前缀)。 Action string `json:"action"` //${action} — 当前执行的动作请求,例如 publish,subscribe。 Topic string `json:"topic"` //${topic} — 当前请求想要发布或订阅的主题(或主题过滤器) } if err := c.Bind(&json); err != nil { c.JSON(200, gin.H{ "result": "deny", // 可选 "allow" | "deny" | "ignore" }) } fmt.Println("json:", json) if json.Username == "admin" { c.JSON(200, gin.H{"result": "allow"}) return } if json.Username == "test" { c.JSON(200, gin.H{"result": "allow"}) return } // 保证 Clientid唯一,否则会 下线 if json.Clientid != json.Username { c.JSON(200, gin.H{"result": "deny"}) return } Device_r := Device.Device{ T_sn: json.Username, } // 判断是否存在 if !Device_r.Read_Tidy() { c.JSON(200, gin.H{"result": "deny"}) return } //topic /sub/SN /pub/SN topic_list := strings.Split(json.Topic, "/") for _, v := range topic_list { if len(v) < 7 { continue // 不是有效SN } if v == Device_r.T_sn { c.JSON(200, gin.H{"result": "allow"}) return } } logs.Println("MQTT Acl E!topic_list:", topic_list) c.JSON(200, gin.H{"result": "deny"}) return // //Clientid_list := strings.Split(username+"_s", "_") //username = Clientid_list[0] //if topic_list[2] != username { // fmt.Println("topic_list[2] != username",topic_list[2]) // c.JSON(200, gin.H{"result": "deny"}) // return //} //c.JSON(200, gin.H{"result": "allow"}) //return }) r.Run(":8080") } // 开始处理 func messagePubHandler(topicName string, message []byte) { // 本地测试 logs.Println("=", "============= Mqtt2 JSON =============") logs.Println("<-"+topicName, string(message)) // 过滤 topicNameS := strings.Split(topicName, "/") if len(topicNameS) == 1 { logs.Println("MqttServer", "订阅地址错误 len(topicNameS) == 1", strconv.Itoa(len(topicNameS))) return } // 验证设备 Device_r := Device.Device{} for _, sn := range topicNameS { if len(sn) < 7 { continue // 不是有效SN } Device_r.T_sn = sn if Device_r.Read_Tidy() { if Device_r.T_state == 3 { continue // 不是有效SN } break } } if Device_r.T_state != 1 { logs.Println("MqttServer", Device_r.T_sn+" 设备未激活") return } // 设备类型 ProductType_r := Product.ProductType{T_ProductID: Device_r.T_ProductID} if !ProductType_r.Read() { logs.Println("MqttServer", Device_r.T_sn+"|"+Device_r.T_ProductID+" 设备类型找不到!") return } // 设备协议 ProductProt_r := Product.ProductProt{Id: ProductType_r.T_prot} if !ProductProt_r.Read() { logs.Println("MqttServer", Device_r.T_sn+"|"+Device_r.T_ProductID+"-"+fmt.Sprintf("%d", ProductType_r.T_prot)+" 设备协议找不到!") return } // 根据库的存放路径加载库 p, err := plugin.Open(conf.Analysis_Dir + ProductProt_r.T_analysis + ".so") if err != nil { println(err) logs.PrintlnError("打开 SO 失败:", err) } if topicName[len(topicName)-6:] == "_reply" { // 返回数据 s, err := p.Lookup("T_reply") if err != nil { println(err) panic(any(err)) } // 类型转换 f := s.(func(b []byte) []byte)(message) // 开始处理 println(f) } else { var Rt_r = lib.Rt{Status: 200, Msg: "ok"} // 查找库导出信息 s, err := p.Lookup("R") if err != nil { println(err) panic(any(err)) } // 类型转换 f := s.(func(b []byte) []byte)(message) // 开始处理 println(f) a := string(f[0]) println("首字符:", a) if a != "{" { //结构体 var json_r map[string]interface{} err = json.Unmarshal(f, &json_r) if err != nil { Rt_r.Status = 203 goto Mreturn } //Handle.AnalysisMap(Device_r, ProductType_r, json_r, "") } else { //列表 var json_lr []map[string]interface{} err = json.Unmarshal(f, &json_lr) if err != nil { Rt_r.Status = 203 goto Mreturn } //for _, value := range json_lr { // //Handle.AnalysisMap(Device_r, ProductType_r, value, "") //} } // 返回 Mreturn: data, _ := json.Marshal(Rt_r) fmt.Println(string(data)) // 查找库导出信息 s, err = p.Lookup("R_reply") if err != nil { println(err) panic(any(err)) } // 类型转换 f = s.(func(b []byte) []byte)(data) // 返回数据 Mqtt_publish(topicName+"_reply", f) } } // 发送数据 func Mqtt_publish(topic string, b []byte) { // Publish a message. err := cli.Publish(&client.PublishOptions{ QoS: mqtt.QoS0, TopicName: []byte(topic), Message: b, }) logs.PrintlnMqtt("-> " + topic + "_reply " + string(b)) if err != nil { logs.PrintlnError("MqttServer", "发送消息失败 [Mqtt_publish]", "-> "+topic+" "+string(b)) } }