123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415 |
- package MqttServer
- import (
- "Yunlot/Handle"
- "Yunlot/conf"
- "Yunlot/lib"
- "Yunlot/logs"
- "Yunlot/models/Device"
- "Yunlot/models/Product"
- "encoding/json"
- "fmt"
- beego "github.com/beego/beego/v2/server/web"
- "github.com/gin-gonic/gin"
- "github.com/yosssi/gmq/mqtt"
- "github.com/yosssi/gmq/mqtt/client"
- "plugin"
- "strconv"
- "strings"
- "time"
- )
- var cli *client.Client
- func Run_MqttServer() {
- time.Sleep(3 * time.Second)
- logs.Println("============Run_MqttServer=============", "")
- HTTPPort, _ := beego.AppConfig.String("HTTPPort")
- // Create an MQTT Client.
- cli = client.New(&client.Options{
- // Define the processing of the error handler.
- ErrorHandler: func(err error) {
- logs.PrintlnError("err!!!!!! Run_MqttServer:", err.Error())
- time.Sleep(3 * time.Second)
- go Run_MqttServer() // MQTT 通讯
- return
- },
- })
- // Terminate the Client.
- defer cli.Terminate()
- c := client.ConnectOptions{
- Network: "tcp",
- Address: conf.MqttServer_Url,
- ClientID: []byte(conf.MqttServer_ClientID + HTTPPort),
- UserName: []byte(conf.MqttServer_Username),
- Password: []byte(conf.MqttServer_Password),
- }
- logs.Println("Address:", c.Address)
- logs.Println("ClientID:", string(c.ClientID))
- // Connect to the MQTT Server.
- err := cli.Connect(&c)
- if err != nil {
- logs.Println("MqttServer", "连接MQTT失败 [cli.Connect]", "")
- logs.Println("err!!!!!! Run_MqttServer:", "连接MQTT失败:", err)
- fmt.Println("err!!!!!! 连接MQTT失败:", err)
- cli.Terminate()
- time.Sleep(3 * time.Second)
- go Run_MqttServer() // MQTT 通讯
- return
- }
- logs.Println("MqttServer", "连接成功!")
- // Subscribe to topics.
- err = cli.Subscribe(&client.SubscribeOptions{
- SubReqs: []*client.SubReq{
- &client.SubReq{
- TopicFilter: []byte("#"),
- QoS: mqtt.QoS0,
- // Define the processing of the message handler.
- Handler: func(topicName, message []byte) {
- _, exists := lib.TopicMap[string(topicName)]
- if exists {
- //logs.Println("跳过:",string(topicName))
- return // 这个 订阅号 平台->设备,跳过
- }
- start := time.Now()
- messagePubHandler(string(topicName), message)
- elapsed := time.Since(start)
- fmt.Printf("代码运行时间:%s\n", elapsed)
- },
- },
- &client.SubReq{ // 设备断开
- TopicFilter: []byte("$SYS/brokers/+/clients/+/disconnected"),
- QoS: mqtt.QoS0,
- // Define the processing of the message handler.
- Handler: func(topicName, message []byte) {
- logs.PrintlnMqtt("<-" + string(topicName) + " " + string(message))
- MessageDisconnected(string(topicName), message)
- },
- },
- &client.SubReq{ // 设备上线
- TopicFilter: []byte("$SYS/brokers/+/clients/+/connected"),
- QoS: mqtt.QoS0,
- // Define the processing of the message handler.
- Handler: func(topicName, message []byte) {
- logs.PrintlnMqtt("<-" + string(topicName) + " " + string(message))
- MessageConnected(string(topicName), message)
- },
- },
- },
- })
- if err != nil {
- logs.Println("MqttServer", "订阅消息 [cli.Subscribe]", "")
- logs.Println("err!!!!!! Run_MqttServer:", "连接MQTT失败:", err)
- }
- fmt.Println("MQTT ok!")
- go MQTT_Gin_Run() // 初始化 MQTT ACL
- }
- func MQTT_Gin_Run() {
- // 创建一个Gin路由器
- r := gin.Default()
- // 定义路由
- r.POST("/authentication", func(c *gin.Context) {
- var json struct {
- Clientid string `json:"clientid"`
- Username string `json:"username"`
- Password string `json:"password"`
- }
- if err := c.Bind(&json); err != nil {
- c.JSON(200, gin.H{
- "result": "deny", // 认证结果 可选 "allow" | "deny" | "ignore"
- "is_superuser": false, // 超级用户 可选 true | false,该项为空时默认为 false
- })
- return
- }
- // 保证 Clientid唯一,否则会 下线
- if json.Clientid != json.Username {
- c.JSON(200, gin.H{
- "result": "deny", // 认证结果 可选 "allow" | "deny" | "ignore"
- "is_superuser": false, // 超级用户 可选 true | false,该项为空时默认为 false
- })
- return
- }
- Device_r := Device.Device{
- T_sn: json.Username,
- }
- // 判断是否存在
- if !Device_r.Read_Tidy() {
- c.JSON(200, gin.H{
- "result": "deny", // 认证结果 可选 "allow" | "deny" | "ignore"
- "is_superuser": false, // 超级用户 可选 true | false,该项为空时默认为 false
- })
- return
- }
- // 判断密码是否正确
- if Device_r.T_password != json.Password {
- c.JSON(200, gin.H{
- "result": "deny", // 认证结果 可选 "allow" | "deny" | "ignore"
- "is_superuser": false, // 超级用户 可选 true | false,该项为空时默认为 false
- })
- return
- }
- c.JSON(200, gin.H{
- "result": "allow", // 可选 "allow" | "deny" | "ignore"
- "is_superuser": false, // 可选 true | false,该项为空时默认为 false
- })
- })
- r.POST("/Acl", func(c *gin.Context) {
- logs.Println("Acl!!")
- var json struct {
- Clientid string `json:"clientid"` //${clientid} — 客户端的 ID。
- Username string `json:"username"` //${username} — 客户端登录是用的用户名。
- Peerhost string `json:"peerhost"` //${peerhost} — 客户端的源 IP 地址。
- Proto_name string `json:"proto_name"` //${proto_name} — 客户端使用的协议名称。例如 MQTT,CoAP 等。
- Mountpoint string `json:"mountpoint"` //${mountpoint} — 网关监听器的挂载点(主题前缀)。
- Action string `json:"action"` //${action} — 当前执行的动作请求,例如 publish,subscribe。
- Topic string `json:"topic"` //${topic} — 当前请求想要发布或订阅的主题(或主题过滤器)
- }
- if err := c.Bind(&json); err != nil {
- c.JSON(200, gin.H{
- "result": "deny", // 可选 "allow" | "deny" | "ignore"
- })
- }
- fmt.Println("json:", json)
- if json.Username == "admin" {
- c.JSON(200, gin.H{"result": "allow"})
- return
- }
- if json.Username == "test" {
- c.JSON(200, gin.H{"result": "allow"})
- return
- }
- // 保证 Clientid唯一,否则会 下线
- if json.Clientid != json.Username {
- c.JSON(200, gin.H{"result": "deny"})
- return
- }
- Device_r := Device.Device{
- T_sn: json.Username,
- }
- // 判断是否存在
- if !Device_r.Read_Tidy() {
- c.JSON(200, gin.H{"result": "deny"})
- return
- }
- //topic /sub/SN /pub/SN
- topic_list := strings.Split(json.Topic, "/")
- for _, v := range topic_list {
- if len(v) < 7 {
- continue // 不是有效SN
- }
- if v == Device_r.T_sn {
- c.JSON(200, gin.H{"result": "allow"})
- return
- }
- }
- logs.Println("MQTT Acl E!topic_list:", topic_list)
- c.JSON(200, gin.H{"result": "deny"})
- return
- //
- //Clientid_list := strings.Split(username+"_s", "_")
- //username = Clientid_list[0]
- //if topic_list[2] != username {
- // fmt.Println("topic_list[2] != username",topic_list[2])
- // c.JSON(200, gin.H{"result": "deny"})
- // return
- //}
- //c.JSON(200, gin.H{"result": "allow"})
- //return
- })
- r.Run(":8080")
- }
- // 开始处理
- func messagePubHandler(topicName string, message []byte) {
- // 本地测试
- logs.Println("", "============= Mqtt JSON =============")
- logs.Println("<-"+topicName, string(message))
- // 过滤
- topicNameS := strings.Split(topicName, "/")
- if len(topicNameS) == 1 {
- logs.Println("MqttServer", "订阅地址错误 len(topicNameS) == 1", strconv.Itoa(len(topicNameS)))
- return
- }
- // 验证设备
- Device_r := Device.Device{}
- for _, sn := range topicNameS {
- if len(sn) < 7 {
- continue // 不是有效SN
- }
- Device_r.T_sn = sn
- if Device_r.Read_Tidy() {
- if Device_r.T_state == 3 {
- continue // 不是有效SN
- }
- break
- }
- }
- if Device_r.T_state != 1 {
- logs.Println("MqttServer", Device_r.T_sn+" 设备未激活")
- return
- }
- // 设备类型
- ProductType_r := Product.ProductType{T_ProductID: Device_r.T_ProductID}
- if !ProductType_r.Read() {
- logs.Println("MqttServer", Device_r.T_sn+"|"+Device_r.T_ProductID+" 设备类型找不到!")
- return
- }
- // 设备协议
- ProductProt_r := Product.ProductProt{Id: ProductType_r.T_prot}
- if !ProductProt_r.Read() {
- logs.Println("MqttServer", Device_r.T_sn+"|"+Device_r.T_ProductID+"-"+fmt.Sprintf("%d", ProductType_r.T_prot)+" 设备协议找不到!")
- return
- }
- // Plugin 指针添加到 MAP,不知道能不能提高速度
- //_, exists := lib.TopicMap[string(topicName)]
- //if exists {
- // //logs.Println("跳过:",string(topicName))
- // return // 这个 订阅号 平台->设备,跳过
- //}
- //p, p_is := lib.PluginMap[ProductProt_r.T_analysis]
- //if !p_is {
- // var err error
- // // 根据库的存放路径加载库
- // p_p, err := plugin.Open(conf.Analysis_Dir + ProductProt_r.T_analysis + ".so")
- // if err != nil {
- // logs.PrintlnError("打开 SO 失败:", err)
- // return
- // }
- // p = p_p
- // lib.PluginMap[ProductProt_r.T_analysis] = p
- // logs.Println("NEW Plugin 地址:",&p_p)
- //}
- // 加载 SO 文件
- p, err := plugin.Open(conf.Analysis_Dir + ProductProt_r.T_analysis + ".so")
- if err != nil {
- logs.PrintlnError("打开 SO 失败:", err)
- return
- }
- logs.Println("Plugin 地址:", &p)
- var Rt_r = lib.Rt{Status: 200, Msg: "ok"}
- // 查找库导出信息
- s, err := p.Lookup("T")
- if err != nil {
- logs.Println("", err)
- panic(any(err))
- }
- // 类型转换
- f := s.(func(t string, b []byte) string)(topicName, message)
- // 开始处理
- logs.Println("协议后:", f)
- //logs.Println("首字符:", string(f[0]))
- if f[0] == '{' {
- //结构体
- var json_r map[string]interface{}
- err = json.Unmarshal([]byte(f), &json_r)
- if err != nil {
- Rt_r.Status = 203
- Rt_r.Msg = "json E!"
- goto Mreturn
- }
- Handle.AnalysisMap(&Device_r, ProductType_r, json_r, "")
- // 合并json
- Device_r.Read_Tidy() // 提前最新的
- logs.Println("Device_r.T_data:", Device_r.T_data)
- if len(Device_r.T_data) > 5 {
- json.Unmarshal([]byte(Device_r.T_data), &json_r)
- }
- Device_r.T_dataJson = json_r
- Device_r.UpdateTime.NowDbTime()
- Device_r.Update("T_data", "UpdateTime")
- } else {
- //列表
- var json_lr []map[string]interface{}
- err = json.Unmarshal([]byte(f), &json_lr)
- if err != nil {
- Rt_r.Status = 204
- Rt_r.Msg = "[]json E!"
- goto Mreturn
- }
- for _, value := range json_lr {
- Handle.AnalysisMap(&Device_r, ProductType_r, value, "")
- }
- // 合并json
- Device_r.Read_Tidy() // 提前最新的
- if len(Device_r.T_data) > 5 {
- json.Unmarshal([]byte(Device_r.T_data), &json_lr[len(json_lr)-1])
- }
- Device_r.T_dataJson = json_lr[len(json_lr)-1]
- Device_r.UpdateTime.NowDbTime()
- Device_r.Update("T_data", "UpdateTime")
- }
- // 返回
- Mreturn:
- data, _ := json.Marshal(Rt_r)
- fmt.Println(string(data))
- // 查找库导出信息
- s, err = p.Lookup("R")
- if err != nil {
- logs.Println("", err)
- panic(any(err))
- }
- // 转换
- t, b := s.(func(t string, b string) (string, []byte))(topicName, string(data))
- // 订阅号 与 内容 必须有数据
- if len(t) > 0 && len(b) > 0 {
- // 返回数据
- Mqtt_publish(t, b)
- }
- }
- // 发送数据
- func Mqtt_publish(topic string, b []byte) {
- lib.TopicMap[topic] = true
- // Publish a message.
- err := cli.Publish(&client.PublishOptions{
- QoS: mqtt.QoS0,
- TopicName: []byte(topic),
- Message: b,
- })
- logs.PrintlnMqtt("-> " + topic + ":" + string(b))
- if err != nil {
- logs.PrintlnError("MqttServer", "发送消息失败 [Mqtt_publish]", "-> "+topic+" "+string(b))
- }
- }
|