package MqttServer import ( "Yunlot/Handle" "Yunlot/conf" "Yunlot/lib" "Yunlot/logs" "Yunlot/models/Device" "encoding/json" "fmt" beego "github.com/beego/beego/v2/server/web" "github.com/gin-gonic/gin" "github.com/yosssi/gmq/mqtt" "github.com/yosssi/gmq/mqtt/client" "strconv" "strings" "time" ) func Run_MqttServer() { time.Sleep(3 * time.Second) logs.Println("============Run_MqttServer=============", "") HTTPPort, _ := beego.AppConfig.String("HTTPPort") // Create an MQTT Client. lib.MqttClient = client.New(&client.Options{ // Define the processing of the error handler. ErrorHandler: func(err error) { logs.PrintlnError("err!!!!!! Run_MqttServer:", err.Error()) time.Sleep(3 * time.Second) go Run_MqttServer() // MQTT 通讯 return }, }) // Terminate the Client. defer lib.MqttClient.Terminate() c := client.ConnectOptions{ Network: "tcp", Address: conf.MqttServer_Url, ClientID: []byte(conf.MqttServer_ClientID + HTTPPort), UserName: []byte(conf.MqttServer_Username), Password: []byte(conf.MqttServer_Password), } logs.Println("Address:", c.Address) logs.Println("ClientID:", string(c.ClientID)) // Connect to the MQTT Server. err := lib.MqttClient.Connect(&c) if err != nil { logs.Println("MqttServer", "连接MQTT失败 [cli.Connect]", "") logs.Println("err!!!!!! Run_MqttServer:", "连接MQTT失败:", err) fmt.Println("err!!!!!! 连接MQTT失败:", err) lib.MqttClient.Terminate() time.Sleep(3 * time.Second) go Run_MqttServer() // MQTT 通讯 return } logs.Println("MqttServer", "连接成功!") // Subscribe to topics. err = lib.MqttClient.Subscribe(&client.SubscribeOptions{ SubReqs: []*client.SubReq{ &client.SubReq{ TopicFilter: []byte("#"), QoS: mqtt.QoS0, // Define the processing of the message handler. Handler: func(topicName, message []byte) { _, exists := lib.TopicMap[string(topicName)] if exists { //logs.Println("跳过:",string(topicName)) return // 这个 订阅号 平台->设备,跳过 } start := time.Now() messagePubHandler(string(topicName), message) elapsed := time.Since(start) fmt.Printf("代码运行时间:%s\n", elapsed) }, }, &client.SubReq{ // 设备断开 TopicFilter: []byte("$SYS/brokers/+/clients/+/disconnected"), QoS: mqtt.QoS0, // Define the processing of the message handler. Handler: func(topicName, message []byte) { logs.PrintlnMqtt("<-" + string(topicName) + " " + string(message)) MessageDisconnected(string(topicName), message) }, }, &client.SubReq{ // 设备上线 TopicFilter: []byte("$SYS/brokers/+/clients/+/connected"), QoS: mqtt.QoS0, // Define the processing of the message handler. Handler: func(topicName, message []byte) { logs.PrintlnMqtt("<-" + string(topicName) + " " + string(message)) MessageConnected(string(topicName), message) }, }, }, }) if err != nil { logs.Println("MqttServer", "订阅消息 [cli.Subscribe]", "") logs.Println("err!!!!!! Run_MqttServer:", "连接MQTT失败:", err) } fmt.Println("MQTT ok!") go MQTT_Gin_Run() // 初始化 MQTT ACL } func MQTT_Gin_Run() { // 创建一个Gin路由器 r := gin.Default() // 定义路由 r.POST("/authentication", func(c *gin.Context) { var json struct { Clientid string `json:"clientid"` Username string `json:"username"` Password string `json:"password"` } if err := c.Bind(&json); err != nil { c.JSON(200, gin.H{ "result": "deny", // 认证结果 可选 "allow" | "deny" | "ignore" "is_superuser": false, // 超级用户 可选 true | false,该项为空时默认为 false }) return } // 保证 Clientid唯一,否则会 下线 if json.Clientid != json.Username { c.JSON(200, gin.H{ "result": "deny", // 认证结果 可选 "allow" | "deny" | "ignore" "is_superuser": false, // 超级用户 可选 true | false,该项为空时默认为 false }) return } Device_r := Device.Device{ T_sn: json.Username, } // 判断是否存在 if !Device_r.Read_Tidy() { c.JSON(200, gin.H{ "result": "deny", // 认证结果 可选 "allow" | "deny" | "ignore" "is_superuser": false, // 超级用户 可选 true | false,该项为空时默认为 false }) return } // 判断密码是否正确 if Device_r.T_password != json.Password { c.JSON(200, gin.H{ "result": "deny", // 认证结果 可选 "allow" | "deny" | "ignore" "is_superuser": false, // 超级用户 可选 true | false,该项为空时默认为 false }) return } c.JSON(200, gin.H{ "result": "allow", // 可选 "allow" | "deny" | "ignore" "is_superuser": false, // 可选 true | false,该项为空时默认为 false }) }) r.POST("/Acl", func(c *gin.Context) { logs.Println("Acl!!") var json struct { Clientid string `json:"clientid"` //${clientid} — 客户端的 ID。 Username string `json:"username"` //${username} — 客户端登录是用的用户名。 Peerhost string `json:"peerhost"` //${peerhost} — 客户端的源 IP 地址。 Proto_name string `json:"proto_name"` //${proto_name} — 客户端使用的协议名称。例如 MQTT,CoAP 等。 Mountpoint string `json:"mountpoint"` //${mountpoint} — 网关监听器的挂载点(主题前缀)。 Action string `json:"action"` //${action} — 当前执行的动作请求,例如 publish,subscribe。 Topic string `json:"topic"` //${topic} — 当前请求想要发布或订阅的主题(或主题过滤器) } if err := c.Bind(&json); err != nil { c.JSON(200, gin.H{ "result": "deny", // 可选 "allow" | "deny" | "ignore" }) } fmt.Println("json:", json) if json.Username == "admin" { c.JSON(200, gin.H{"result": "allow"}) return } if json.Username == "test" { c.JSON(200, gin.H{"result": "allow"}) return } // 保证 Clientid唯一,否则会 下线 if json.Clientid != json.Username { c.JSON(200, gin.H{"result": "deny"}) return } Device_r := Device.Device{ T_sn: json.Username, } // 判断是否存在 if !Device_r.Read_Tidy() { c.JSON(200, gin.H{"result": "deny"}) return } //topic /sub/SN /pub/SN topic_list := strings.Split(json.Topic, "/") for _, v := range topic_list { if len(v) < 7 { continue // 不是有效SN } if v == Device_r.T_sn { c.JSON(200, gin.H{"result": "allow"}) return } } logs.Println("MQTT Acl E!topic_list:", topic_list) c.JSON(200, gin.H{"result": "deny"}) return // //Clientid_list := strings.Split(username+"_s", "_") //username = Clientid_list[0] //if topic_list[2] != username { // fmt.Println("topic_list[2] != username",topic_list[2]) // c.JSON(200, gin.H{"result": "deny"}) // return //} //c.JSON(200, gin.H{"result": "allow"}) //return }) r.Run(":8080") } // 开始处理 func messagePubHandler(topicName string, message []byte) { // 本地测试 logs.Println("", "============= Mqtt JSON =============") logs.Println("<-"+topicName, string(message)) // 过滤 topicNameS := strings.Split(topicName, "/") if len(topicNameS) == 1 { logs.Println("MqttServer", "订阅地址错误 len(topicNameS) == 1", strconv.Itoa(len(topicNameS))) return } // 验证设备 Device_r := Device.Device{} for _, sn := range topicNameS { if len(sn) < 7 { continue // 不是有效SN } Device_r.T_sn = sn if Device_r.Read_Tidy() { if Device_r.T_state == 3 { continue // 不是有效SN } break } } if Device_r.T_state != 1 { logs.Println("MqttServer", Device_r.T_sn+" 设备被 禁用、删除、无效") return } Rt_r := Handle.PullHandle(&Device_r, topicName, message) // 返回 data, _ := json.Marshal(Rt_r) Handle.PushHandle(&Device_r, topicName, string(data)) } // //// 发送数据 //func Mqtt_publish(topic string, b []byte) { // lib.TopicMap[topic] = true // // Publish a message. // err := lib.MqttClient.Publish(&client.PublishOptions{ // QoS: mqtt.QoS0, // TopicName: []byte(topic), // Message: b, // }) // // logs.PrintlnMqtt("-> " + topic + ":" + string(b)) // if err != nil { // logs.PrintlnError("MqttServer", "发送消息失败 [Mqtt_publish]", "-> "+topic+" "+string(b)) // } // //}