Server.go 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378
  1. package MqttServer
  2. import (
  3. "Yunlot/conf"
  4. "Yunlot/lib"
  5. "Yunlot/logs"
  6. "Yunlot/models/Device"
  7. "Yunlot/models/Product"
  8. "encoding/json"
  9. "fmt"
  10. beego "github.com/beego/beego/v2/server/web"
  11. "github.com/gin-gonic/gin"
  12. "github.com/yosssi/gmq/mqtt"
  13. "github.com/yosssi/gmq/mqtt/client"
  14. "plugin"
  15. "strconv"
  16. "strings"
  17. "time"
  18. )
  19. var cli *client.Client
  20. func Run_MqttServer() {
  21. time.Sleep(3 * time.Second)
  22. logs.Println("============Run_MqttServer=============", "")
  23. HTTPPort, _ := beego.AppConfig.String("HTTPPort")
  24. // Create an MQTT Client.
  25. cli = client.New(&client.Options{
  26. // Define the processing of the error handler.
  27. ErrorHandler: func(err error) {
  28. logs.PrintlnError("err!!!!!! Run_MqttServer:", err.Error())
  29. time.Sleep(3 * time.Second)
  30. go Run_MqttServer() // MQTT 通讯
  31. return
  32. },
  33. })
  34. // Terminate the Client.
  35. defer cli.Terminate()
  36. c := client.ConnectOptions{
  37. Network: "tcp",
  38. Address: conf.MqttServer_Url,
  39. ClientID: []byte(conf.MqttServer_ClientID + HTTPPort),
  40. UserName: []byte(conf.MqttServer_Username),
  41. Password: []byte(conf.MqttServer_Password),
  42. }
  43. logs.Println("Address:", c.Address)
  44. logs.Println("ClientID:", string(c.ClientID))
  45. // Connect to the MQTT Server.
  46. err := cli.Connect(&c)
  47. if err != nil {
  48. logs.Println("MqttServer", "连接MQTT失败 [cli.Connect]", "")
  49. logs.Println("err!!!!!! Run_MqttServer:", "连接MQTT失败:", err)
  50. fmt.Println("err!!!!!! 连接MQTT失败:", err)
  51. cli.Terminate()
  52. time.Sleep(3 * time.Second)
  53. go Run_MqttServer() // MQTT 通讯
  54. return
  55. }
  56. // Subscribe to topics.
  57. err = cli.Subscribe(&client.SubscribeOptions{
  58. SubReqs: []*client.SubReq{
  59. &client.SubReq{
  60. TopicFilter: []byte("#"),
  61. QoS: mqtt.QoS0,
  62. // Define the processing of the message handler.
  63. Handler: func(topicName, message []byte) {
  64. logs.PrintlnMqtt("<-" + string(topicName) + " " + string(message))
  65. messagePubHandler(string(topicName), message)
  66. },
  67. },
  68. &client.SubReq{ // 设备断开
  69. TopicFilter: []byte("$SYS/brokers/+/clients/+/disconnected"),
  70. QoS: mqtt.QoS0,
  71. // Define the processing of the message handler.
  72. Handler: func(topicName, message []byte) {
  73. logs.PrintlnMqtt("<-" + string(topicName) + " " + string(message))
  74. MessageDisconnected(string(topicName), message)
  75. },
  76. },
  77. &client.SubReq{ // 设备上线
  78. TopicFilter: []byte("$SYS/brokers/+/clients/+/connected"),
  79. QoS: mqtt.QoS0,
  80. // Define the processing of the message handler.
  81. Handler: func(topicName, message []byte) {
  82. logs.PrintlnMqtt("<-" + string(topicName) + " " + string(message))
  83. MessageConnected(string(topicName), message)
  84. },
  85. },
  86. },
  87. })
  88. if err != nil {
  89. logs.Println("MqttServer", "订阅消息 [cli.Subscribe]", "")
  90. logs.Println("err!!!!!! Run_MqttServer:", "连接MQTT失败:", err)
  91. }
  92. fmt.Println("MQTT ok!")
  93. go MQTT_Gin_Run() // 初始化 MQTT ACL
  94. }
  95. func MQTT_Gin_Run() {
  96. // 创建一个Gin路由器
  97. r := gin.Default()
  98. // 定义路由
  99. r.POST("/authentication", func(c *gin.Context) {
  100. var json struct {
  101. Clientid string `json:"clientid"`
  102. Username string `json:"username"`
  103. Password string `json:"password"`
  104. }
  105. if err := c.Bind(&json); err != nil {
  106. c.JSON(200, gin.H{
  107. "result": "deny", // 认证结果 可选 "allow" | "deny" | "ignore"
  108. "is_superuser": false, // 超级用户 可选 true | false,该项为空时默认为 false
  109. })
  110. return
  111. }
  112. // 保证 Clientid唯一,否则会 下线
  113. if json.Clientid != json.Username {
  114. c.JSON(200, gin.H{
  115. "result": "deny", // 认证结果 可选 "allow" | "deny" | "ignore"
  116. "is_superuser": false, // 超级用户 可选 true | false,该项为空时默认为 false
  117. })
  118. return
  119. }
  120. Device_r := Device.Device{
  121. T_sn: json.Username,
  122. }
  123. // 判断是否存在
  124. if !Device_r.Read_Tidy() {
  125. c.JSON(200, gin.H{
  126. "result": "deny", // 认证结果 可选 "allow" | "deny" | "ignore"
  127. "is_superuser": false, // 超级用户 可选 true | false,该项为空时默认为 false
  128. })
  129. return
  130. }
  131. // 判断密码是否正确
  132. if Device_r.T_password != json.Password {
  133. c.JSON(200, gin.H{
  134. "result": "deny", // 认证结果 可选 "allow" | "deny" | "ignore"
  135. "is_superuser": false, // 超级用户 可选 true | false,该项为空时默认为 false
  136. })
  137. return
  138. }
  139. c.JSON(200, gin.H{
  140. "result": "allow", // 可选 "allow" | "deny" | "ignore"
  141. "is_superuser": false, // 可选 true | false,该项为空时默认为 false
  142. })
  143. })
  144. r.POST("/Acl", func(c *gin.Context) {
  145. logs.Println("Acl!!")
  146. var json struct {
  147. Clientid string `json:"clientid"` //${clientid} — 客户端的 ID。
  148. Username string `json:"username"` //${username} — 客户端登录是用的用户名。
  149. Peerhost string `json:"peerhost"` //${peerhost} — 客户端的源 IP 地址。
  150. Proto_name string `json:"proto_name"` //${proto_name} — 客户端使用的协议名称。例如 MQTT,CoAP 等。
  151. Mountpoint string `json:"mountpoint"` //${mountpoint} — 网关监听器的挂载点(主题前缀)。
  152. Action string `json:"action"` //${action} — 当前执行的动作请求,例如 publish,subscribe。
  153. Topic string `json:"topic"` //${topic} — 当前请求想要发布或订阅的主题(或主题过滤器)
  154. }
  155. if err := c.Bind(&json); err != nil {
  156. c.JSON(200, gin.H{
  157. "result": "deny", // 可选 "allow" | "deny" | "ignore"
  158. })
  159. }
  160. fmt.Println("json:", json)
  161. if json.Username == "admin" {
  162. c.JSON(200, gin.H{"result": "allow"})
  163. return
  164. }
  165. if json.Username == "test" {
  166. c.JSON(200, gin.H{"result": "allow"})
  167. return
  168. }
  169. // 保证 Clientid唯一,否则会 下线
  170. if json.Clientid != json.Username {
  171. c.JSON(200, gin.H{"result": "deny"})
  172. return
  173. }
  174. Device_r := Device.Device{
  175. T_sn: json.Username,
  176. }
  177. // 判断是否存在
  178. if !Device_r.Read_Tidy() {
  179. c.JSON(200, gin.H{"result": "deny"})
  180. return
  181. }
  182. //topic /sub/SN /pub/SN
  183. topic_list := strings.Split(json.Topic, "/")
  184. for _, v := range topic_list {
  185. if len(v) < 7 {
  186. continue // 不是有效SN
  187. }
  188. if v == Device_r.T_sn {
  189. c.JSON(200, gin.H{"result": "allow"})
  190. return
  191. }
  192. }
  193. logs.Println("MQTT Acl E!topic_list:", topic_list)
  194. c.JSON(200, gin.H{"result": "deny"})
  195. return
  196. //
  197. //Clientid_list := strings.Split(username+"_s", "_")
  198. //username = Clientid_list[0]
  199. //if topic_list[2] != username {
  200. // fmt.Println("topic_list[2] != username",topic_list[2])
  201. // c.JSON(200, gin.H{"result": "deny"})
  202. // return
  203. //}
  204. //c.JSON(200, gin.H{"result": "allow"})
  205. //return
  206. })
  207. r.Run(":8080")
  208. }
  209. // 开始处理
  210. func messagePubHandler(topicName string, message []byte) {
  211. // 本地测试
  212. logs.Println("=", "============= Mqtt2 JSON =============")
  213. logs.Println("<-"+topicName, string(message))
  214. // 过滤
  215. topicNameS := strings.Split(topicName, "/")
  216. if len(topicNameS) == 1 {
  217. logs.Println("MqttServer", "订阅地址错误 len(topicNameS) == 1", strconv.Itoa(len(topicNameS)))
  218. return
  219. }
  220. // 验证设备
  221. Device_r := Device.Device{}
  222. for _, sn := range topicNameS {
  223. if len(sn) < 7 {
  224. continue // 不是有效SN
  225. }
  226. Device_r.T_sn = sn
  227. if Device_r.Read_Tidy() {
  228. if Device_r.T_state == 3 {
  229. continue // 不是有效SN
  230. }
  231. break
  232. }
  233. }
  234. if Device_r.T_state != 1 {
  235. logs.Println("MqttServer", Device_r.T_sn+" 设备未激活")
  236. return
  237. }
  238. // 设备类型
  239. ProductType_r := Product.ProductType{T_ProductID: Device_r.T_ProductID}
  240. if !ProductType_r.Read() {
  241. logs.Println("MqttServer", Device_r.T_sn+"|"+Device_r.T_ProductID+" 设备类型找不到!")
  242. return
  243. }
  244. // 设备协议
  245. ProductProt_r := Product.ProductProt{Id: ProductType_r.T_prot}
  246. if !ProductProt_r.Read() {
  247. logs.Println("MqttServer", Device_r.T_sn+"|"+Device_r.T_ProductID+"-"+fmt.Sprintf("%d", ProductType_r.T_prot)+" 设备协议找不到!")
  248. return
  249. }
  250. // 根据库的存放路径加载库
  251. p, err := plugin.Open(conf.Analysis_Dir + ProductProt_r.T_analysis + ".so")
  252. if err != nil {
  253. println(err)
  254. logs.PrintlnError("打开 SO 失败:", err)
  255. }
  256. if topicName[len(topicName)-6:] == "_reply" {
  257. // 返回数据
  258. s, err := p.Lookup("T_reply")
  259. if err != nil {
  260. println(err)
  261. panic(any(err))
  262. }
  263. // 类型转换
  264. f := s.(func(b []byte) []byte)(message)
  265. // 开始处理
  266. println(f)
  267. } else {
  268. var Rt_r = lib.Rt{Status: 200, Msg: "ok"}
  269. // 查找库导出信息
  270. s, err := p.Lookup("R")
  271. if err != nil {
  272. println(err)
  273. panic(any(err))
  274. }
  275. // 类型转换
  276. f := s.(func(b []byte) []byte)(message)
  277. // 开始处理
  278. println(f)
  279. a := string(f[0])
  280. println("首字符:", a)
  281. if a != "{" {
  282. //结构体
  283. var json_r map[string]interface{}
  284. err = json.Unmarshal(f, &json_r)
  285. if err != nil {
  286. Rt_r.Status = 203
  287. goto Mreturn
  288. }
  289. //Handle.AnalysisMap(Device_r, ProductType_r, json_r, "")
  290. } else {
  291. //列表
  292. var json_lr []map[string]interface{}
  293. err = json.Unmarshal(f, &json_lr)
  294. if err != nil {
  295. Rt_r.Status = 203
  296. goto Mreturn
  297. }
  298. //for _, value := range json_lr {
  299. // //Handle.AnalysisMap(Device_r, ProductType_r, value, "")
  300. //}
  301. }
  302. // 返回
  303. Mreturn:
  304. data, _ := json.Marshal(Rt_r)
  305. fmt.Println(string(data))
  306. // 查找库导出信息
  307. s, err = p.Lookup("R_reply")
  308. if err != nil {
  309. println(err)
  310. panic(any(err))
  311. }
  312. // 类型转换
  313. f = s.(func(b []byte) []byte)(data)
  314. // 返回数据
  315. Mqtt_publish(topicName+"_reply", f)
  316. }
  317. }
  318. // 发送数据
  319. func Mqtt_publish(topic string, b []byte) {
  320. // Publish a message.
  321. err := cli.Publish(&client.PublishOptions{
  322. QoS: mqtt.QoS0,
  323. TopicName: []byte(topic),
  324. Message: b,
  325. })
  326. logs.PrintlnMqtt("-> " + topic + "_reply " + string(b))
  327. if err != nil {
  328. logs.PrintlnError("MqttServer", "发送消息失败 [Mqtt_publish]", "-> "+topic+" "+string(b))
  329. }
  330. }