Server.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415
  1. package MqttServer
  2. import (
  3. "Yunlot/Handle"
  4. "Yunlot/conf"
  5. "Yunlot/lib"
  6. "Yunlot/logs"
  7. "Yunlot/models/Device"
  8. "Yunlot/models/Product"
  9. "encoding/json"
  10. "fmt"
  11. beego "github.com/beego/beego/v2/server/web"
  12. "github.com/gin-gonic/gin"
  13. "github.com/yosssi/gmq/mqtt"
  14. "github.com/yosssi/gmq/mqtt/client"
  15. "plugin"
  16. "strconv"
  17. "strings"
  18. "time"
  19. )
  20. var cli *client.Client
  21. func Run_MqttServer() {
  22. time.Sleep(3 * time.Second)
  23. logs.Println("============Run_MqttServer=============", "")
  24. HTTPPort, _ := beego.AppConfig.String("HTTPPort")
  25. // Create an MQTT Client.
  26. cli = client.New(&client.Options{
  27. // Define the processing of the error handler.
  28. ErrorHandler: func(err error) {
  29. logs.PrintlnError("err!!!!!! Run_MqttServer:", err.Error())
  30. time.Sleep(3 * time.Second)
  31. go Run_MqttServer() // MQTT 通讯
  32. return
  33. },
  34. })
  35. // Terminate the Client.
  36. defer cli.Terminate()
  37. c := client.ConnectOptions{
  38. Network: "tcp",
  39. Address: conf.MqttServer_Url,
  40. ClientID: []byte(conf.MqttServer_ClientID + HTTPPort),
  41. UserName: []byte(conf.MqttServer_Username),
  42. Password: []byte(conf.MqttServer_Password),
  43. }
  44. logs.Println("Address:", c.Address)
  45. logs.Println("ClientID:", string(c.ClientID))
  46. // Connect to the MQTT Server.
  47. err := cli.Connect(&c)
  48. if err != nil {
  49. logs.Println("MqttServer", "连接MQTT失败 [cli.Connect]", "")
  50. logs.Println("err!!!!!! Run_MqttServer:", "连接MQTT失败:", err)
  51. fmt.Println("err!!!!!! 连接MQTT失败:", err)
  52. cli.Terminate()
  53. time.Sleep(3 * time.Second)
  54. go Run_MqttServer() // MQTT 通讯
  55. return
  56. }
  57. logs.Println("MqttServer", "连接成功!")
  58. // Subscribe to topics.
  59. err = cli.Subscribe(&client.SubscribeOptions{
  60. SubReqs: []*client.SubReq{
  61. &client.SubReq{
  62. TopicFilter: []byte("#"),
  63. QoS: mqtt.QoS0,
  64. // Define the processing of the message handler.
  65. Handler: func(topicName, message []byte) {
  66. _, exists := lib.TopicMap[string(topicName)]
  67. if exists {
  68. //logs.Println("跳过:",string(topicName))
  69. return // 这个 订阅号 平台->设备,跳过
  70. }
  71. start := time.Now()
  72. messagePubHandler(string(topicName), message)
  73. elapsed := time.Since(start)
  74. fmt.Printf("代码运行时间:%s\n", elapsed)
  75. },
  76. },
  77. &client.SubReq{ // 设备断开
  78. TopicFilter: []byte("$SYS/brokers/+/clients/+/disconnected"),
  79. QoS: mqtt.QoS0,
  80. // Define the processing of the message handler.
  81. Handler: func(topicName, message []byte) {
  82. logs.PrintlnMqtt("<-" + string(topicName) + " " + string(message))
  83. MessageDisconnected(string(topicName), message)
  84. },
  85. },
  86. &client.SubReq{ // 设备上线
  87. TopicFilter: []byte("$SYS/brokers/+/clients/+/connected"),
  88. QoS: mqtt.QoS0,
  89. // Define the processing of the message handler.
  90. Handler: func(topicName, message []byte) {
  91. logs.PrintlnMqtt("<-" + string(topicName) + " " + string(message))
  92. MessageConnected(string(topicName), message)
  93. },
  94. },
  95. },
  96. })
  97. if err != nil {
  98. logs.Println("MqttServer", "订阅消息 [cli.Subscribe]", "")
  99. logs.Println("err!!!!!! Run_MqttServer:", "连接MQTT失败:", err)
  100. }
  101. fmt.Println("MQTT ok!")
  102. go MQTT_Gin_Run() // 初始化 MQTT ACL
  103. }
  104. func MQTT_Gin_Run() {
  105. // 创建一个Gin路由器
  106. r := gin.Default()
  107. // 定义路由
  108. r.POST("/authentication", func(c *gin.Context) {
  109. var json struct {
  110. Clientid string `json:"clientid"`
  111. Username string `json:"username"`
  112. Password string `json:"password"`
  113. }
  114. if err := c.Bind(&json); err != nil {
  115. c.JSON(200, gin.H{
  116. "result": "deny", // 认证结果 可选 "allow" | "deny" | "ignore"
  117. "is_superuser": false, // 超级用户 可选 true | false,该项为空时默认为 false
  118. })
  119. return
  120. }
  121. // 保证 Clientid唯一,否则会 下线
  122. if json.Clientid != json.Username {
  123. c.JSON(200, gin.H{
  124. "result": "deny", // 认证结果 可选 "allow" | "deny" | "ignore"
  125. "is_superuser": false, // 超级用户 可选 true | false,该项为空时默认为 false
  126. })
  127. return
  128. }
  129. Device_r := Device.Device{
  130. T_sn: json.Username,
  131. }
  132. // 判断是否存在
  133. if !Device_r.Read_Tidy() {
  134. c.JSON(200, gin.H{
  135. "result": "deny", // 认证结果 可选 "allow" | "deny" | "ignore"
  136. "is_superuser": false, // 超级用户 可选 true | false,该项为空时默认为 false
  137. })
  138. return
  139. }
  140. // 判断密码是否正确
  141. if Device_r.T_password != json.Password {
  142. c.JSON(200, gin.H{
  143. "result": "deny", // 认证结果 可选 "allow" | "deny" | "ignore"
  144. "is_superuser": false, // 超级用户 可选 true | false,该项为空时默认为 false
  145. })
  146. return
  147. }
  148. c.JSON(200, gin.H{
  149. "result": "allow", // 可选 "allow" | "deny" | "ignore"
  150. "is_superuser": false, // 可选 true | false,该项为空时默认为 false
  151. })
  152. })
  153. r.POST("/Acl", func(c *gin.Context) {
  154. logs.Println("Acl!!")
  155. var json struct {
  156. Clientid string `json:"clientid"` //${clientid} — 客户端的 ID。
  157. Username string `json:"username"` //${username} — 客户端登录是用的用户名。
  158. Peerhost string `json:"peerhost"` //${peerhost} — 客户端的源 IP 地址。
  159. Proto_name string `json:"proto_name"` //${proto_name} — 客户端使用的协议名称。例如 MQTT,CoAP 等。
  160. Mountpoint string `json:"mountpoint"` //${mountpoint} — 网关监听器的挂载点(主题前缀)。
  161. Action string `json:"action"` //${action} — 当前执行的动作请求,例如 publish,subscribe。
  162. Topic string `json:"topic"` //${topic} — 当前请求想要发布或订阅的主题(或主题过滤器)
  163. }
  164. if err := c.Bind(&json); err != nil {
  165. c.JSON(200, gin.H{
  166. "result": "deny", // 可选 "allow" | "deny" | "ignore"
  167. })
  168. }
  169. fmt.Println("json:", json)
  170. if json.Username == "admin" {
  171. c.JSON(200, gin.H{"result": "allow"})
  172. return
  173. }
  174. if json.Username == "test" {
  175. c.JSON(200, gin.H{"result": "allow"})
  176. return
  177. }
  178. // 保证 Clientid唯一,否则会 下线
  179. if json.Clientid != json.Username {
  180. c.JSON(200, gin.H{"result": "deny"})
  181. return
  182. }
  183. Device_r := Device.Device{
  184. T_sn: json.Username,
  185. }
  186. // 判断是否存在
  187. if !Device_r.Read_Tidy() {
  188. c.JSON(200, gin.H{"result": "deny"})
  189. return
  190. }
  191. //topic /sub/SN /pub/SN
  192. topic_list := strings.Split(json.Topic, "/")
  193. for _, v := range topic_list {
  194. if len(v) < 7 {
  195. continue // 不是有效SN
  196. }
  197. if v == Device_r.T_sn {
  198. c.JSON(200, gin.H{"result": "allow"})
  199. return
  200. }
  201. }
  202. logs.Println("MQTT Acl E!topic_list:", topic_list)
  203. c.JSON(200, gin.H{"result": "deny"})
  204. return
  205. //
  206. //Clientid_list := strings.Split(username+"_s", "_")
  207. //username = Clientid_list[0]
  208. //if topic_list[2] != username {
  209. // fmt.Println("topic_list[2] != username",topic_list[2])
  210. // c.JSON(200, gin.H{"result": "deny"})
  211. // return
  212. //}
  213. //c.JSON(200, gin.H{"result": "allow"})
  214. //return
  215. })
  216. r.Run(":8080")
  217. }
  218. // 开始处理
  219. func messagePubHandler(topicName string, message []byte) {
  220. // 本地测试
  221. logs.Println("", "============= Mqtt JSON =============")
  222. logs.Println("<-"+topicName, string(message))
  223. // 过滤
  224. topicNameS := strings.Split(topicName, "/")
  225. if len(topicNameS) == 1 {
  226. logs.Println("MqttServer", "订阅地址错误 len(topicNameS) == 1", strconv.Itoa(len(topicNameS)))
  227. return
  228. }
  229. // 验证设备
  230. Device_r := Device.Device{}
  231. for _, sn := range topicNameS {
  232. if len(sn) < 7 {
  233. continue // 不是有效SN
  234. }
  235. Device_r.T_sn = sn
  236. if Device_r.Read_Tidy() {
  237. if Device_r.T_state == 3 {
  238. continue // 不是有效SN
  239. }
  240. break
  241. }
  242. }
  243. if Device_r.T_state != 1 {
  244. logs.Println("MqttServer", Device_r.T_sn+" 设备未激活")
  245. return
  246. }
  247. // 设备类型
  248. ProductType_r := Product.ProductType{T_ProductID: Device_r.T_ProductID}
  249. if !ProductType_r.Read() {
  250. logs.Println("MqttServer", Device_r.T_sn+"|"+Device_r.T_ProductID+" 设备类型找不到!")
  251. return
  252. }
  253. // 设备协议
  254. ProductProt_r := Product.ProductProt{Id: ProductType_r.T_prot}
  255. if !ProductProt_r.Read() {
  256. logs.Println("MqttServer", Device_r.T_sn+"|"+Device_r.T_ProductID+"-"+fmt.Sprintf("%d", ProductType_r.T_prot)+" 设备协议找不到!")
  257. return
  258. }
  259. // Plugin 指针添加到 MAP,不知道能不能提高速度
  260. //_, exists := lib.TopicMap[string(topicName)]
  261. //if exists {
  262. // //logs.Println("跳过:",string(topicName))
  263. // return // 这个 订阅号 平台->设备,跳过
  264. //}
  265. //p, p_is := lib.PluginMap[ProductProt_r.T_analysis]
  266. //if !p_is {
  267. // var err error
  268. // // 根据库的存放路径加载库
  269. // p_p, err := plugin.Open(conf.Analysis_Dir + ProductProt_r.T_analysis + ".so")
  270. // if err != nil {
  271. // logs.PrintlnError("打开 SO 失败:", err)
  272. // return
  273. // }
  274. // p = p_p
  275. // lib.PluginMap[ProductProt_r.T_analysis] = p
  276. // logs.Println("NEW Plugin 地址:",&p_p)
  277. //}
  278. // 加载 SO 文件
  279. p, err := plugin.Open(conf.Analysis_Dir + ProductProt_r.T_analysis + ".so")
  280. if err != nil {
  281. logs.PrintlnError("打开 SO 失败:", err)
  282. return
  283. }
  284. logs.Println("Plugin 地址:", &p)
  285. var Rt_r = lib.Rt{Status: 200, Msg: "ok"}
  286. // 查找库导出信息
  287. s, err := p.Lookup("T")
  288. if err != nil {
  289. logs.Println("", err)
  290. panic(any(err))
  291. }
  292. // 类型转换
  293. f := s.(func(t string, b []byte) string)(topicName, message)
  294. // 开始处理
  295. logs.Println("协议后:", f)
  296. //logs.Println("首字符:", string(f[0]))
  297. if f[0] == '{' {
  298. //结构体
  299. var json_r map[string]interface{}
  300. err = json.Unmarshal([]byte(f), &json_r)
  301. if err != nil {
  302. Rt_r.Status = 203
  303. Rt_r.Msg = "json E!"
  304. goto Mreturn
  305. }
  306. Handle.AnalysisMap(&Device_r, ProductType_r, json_r, "")
  307. // 合并json
  308. Device_r.Read_Tidy() // 提前最新的
  309. logs.Println("Device_r.T_data:", Device_r.T_data)
  310. if len(Device_r.T_data) > 5 {
  311. json.Unmarshal([]byte(Device_r.T_data), &json_r)
  312. }
  313. Device_r.T_dataJson = json_r
  314. Device_r.UpdateTime.NowDbTime()
  315. Device_r.Update("T_data", "UpdateTime")
  316. } else {
  317. //列表
  318. var json_lr []map[string]interface{}
  319. err = json.Unmarshal([]byte(f), &json_lr)
  320. if err != nil {
  321. Rt_r.Status = 204
  322. Rt_r.Msg = "[]json E!"
  323. goto Mreturn
  324. }
  325. for _, value := range json_lr {
  326. Handle.AnalysisMap(&Device_r, ProductType_r, value, "")
  327. }
  328. // 合并json
  329. Device_r.Read_Tidy() // 提前最新的
  330. if len(Device_r.T_data) > 5 {
  331. json.Unmarshal([]byte(Device_r.T_data), &json_lr[len(json_lr)-1])
  332. }
  333. Device_r.T_dataJson = json_lr[len(json_lr)-1]
  334. Device_r.UpdateTime.NowDbTime()
  335. Device_r.Update("T_data", "UpdateTime")
  336. }
  337. // 返回
  338. Mreturn:
  339. data, _ := json.Marshal(Rt_r)
  340. fmt.Println(string(data))
  341. // 查找库导出信息
  342. s, err = p.Lookup("R")
  343. if err != nil {
  344. logs.Println("", err)
  345. panic(any(err))
  346. }
  347. // 转换
  348. t, b := s.(func(t string, b string) (string, []byte))(topicName, string(data))
  349. // 订阅号 与 内容 必须有数据
  350. if len(t) > 0 && len(b) > 0 {
  351. // 返回数据
  352. Mqtt_publish(t, b)
  353. }
  354. }
  355. // 发送数据
  356. func Mqtt_publish(topic string, b []byte) {
  357. lib.TopicMap[topic] = true
  358. // Publish a message.
  359. err := cli.Publish(&client.PublishOptions{
  360. QoS: mqtt.QoS0,
  361. TopicName: []byte(topic),
  362. Message: b,
  363. })
  364. logs.PrintlnMqtt("-> " + topic + ":" + string(b))
  365. if err != nil {
  366. logs.PrintlnError("MqttServer", "发送消息失败 [Mqtt_publish]", "-> "+topic+" "+string(b))
  367. }
  368. }