Server.go 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298
  1. package MqttServer
  2. import (
  3. "Yunlot/Handle"
  4. "Yunlot/conf"
  5. "Yunlot/lib"
  6. "Yunlot/logs"
  7. "Yunlot/models/Device"
  8. "encoding/json"
  9. "fmt"
  10. beego "github.com/beego/beego/v2/server/web"
  11. "github.com/gin-gonic/gin"
  12. "github.com/yosssi/gmq/mqtt"
  13. "github.com/yosssi/gmq/mqtt/client"
  14. "strconv"
  15. "strings"
  16. "time"
  17. )
  18. func Run_MqttServer() {
  19. time.Sleep(3 * time.Second)
  20. logs.Println("============Run_MqttServer=============", "")
  21. HTTPPort, _ := beego.AppConfig.String("HTTPPort")
  22. // Create an MQTT Client.
  23. lib.MqttClient = client.New(&client.Options{
  24. // Define the processing of the error handler.
  25. ErrorHandler: func(err error) {
  26. logs.PrintlnError("err!!!!!! Run_MqttServer:", err.Error())
  27. time.Sleep(3 * time.Second)
  28. go Run_MqttServer() // MQTT 通讯
  29. return
  30. },
  31. })
  32. // Terminate the Client.
  33. defer lib.MqttClient.Terminate()
  34. c := client.ConnectOptions{
  35. Network: "tcp",
  36. Address: conf.MqttServer_Url,
  37. ClientID: []byte(conf.MqttServer_ClientID + HTTPPort),
  38. UserName: []byte(conf.MqttServer_Username),
  39. Password: []byte(conf.MqttServer_Password),
  40. }
  41. logs.Println("Address:", c.Address)
  42. logs.Println("ClientID:", string(c.ClientID))
  43. // Connect to the MQTT Server.
  44. err := lib.MqttClient.Connect(&c)
  45. if err != nil {
  46. logs.Println("MqttServer", "连接MQTT失败 [cli.Connect]", "")
  47. logs.Println("err!!!!!! Run_MqttServer:", "连接MQTT失败:", err)
  48. fmt.Println("err!!!!!! 连接MQTT失败:", err)
  49. lib.MqttClient.Terminate()
  50. time.Sleep(3 * time.Second)
  51. go Run_MqttServer() // MQTT 通讯
  52. return
  53. }
  54. logs.Println("MqttServer", "连接成功!")
  55. // Subscribe to topics.
  56. err = lib.MqttClient.Subscribe(&client.SubscribeOptions{
  57. SubReqs: []*client.SubReq{
  58. &client.SubReq{
  59. TopicFilter: []byte("#"),
  60. QoS: mqtt.QoS0,
  61. // Define the processing of the message handler.
  62. Handler: func(topicName, message []byte) {
  63. _, exists := lib.TopicMap[string(topicName)]
  64. if exists {
  65. //logs.Println("跳过:",string(topicName))
  66. return // 这个 订阅号 平台->设备,跳过
  67. }
  68. start := time.Now()
  69. messagePubHandler(string(topicName), message)
  70. elapsed := time.Since(start)
  71. fmt.Printf("代码运行时间:%s\n", elapsed)
  72. },
  73. },
  74. &client.SubReq{ // 设备断开
  75. TopicFilter: []byte("$SYS/brokers/+/clients/+/disconnected"),
  76. QoS: mqtt.QoS0,
  77. // Define the processing of the message handler.
  78. Handler: func(topicName, message []byte) {
  79. logs.PrintlnMqtt("<-" + string(topicName) + " " + string(message))
  80. MessageDisconnected(string(topicName), message)
  81. },
  82. },
  83. &client.SubReq{ // 设备上线
  84. TopicFilter: []byte("$SYS/brokers/+/clients/+/connected"),
  85. QoS: mqtt.QoS0,
  86. // Define the processing of the message handler.
  87. Handler: func(topicName, message []byte) {
  88. logs.PrintlnMqtt("<-" + string(topicName) + " " + string(message))
  89. MessageConnected(string(topicName), message)
  90. },
  91. },
  92. },
  93. })
  94. if err != nil {
  95. logs.Println("MqttServer", "订阅消息 [cli.Subscribe]", "")
  96. logs.Println("err!!!!!! Run_MqttServer:", "连接MQTT失败:", err)
  97. }
  98. fmt.Println("MQTT ok!")
  99. go MQTT_Gin_Run() // 初始化 MQTT ACL
  100. }
  101. func MQTT_Gin_Run() {
  102. // 创建一个Gin路由器
  103. r := gin.Default()
  104. // 定义路由
  105. r.POST("/authentication", func(c *gin.Context) {
  106. var json struct {
  107. Clientid string `json:"clientid"`
  108. Username string `json:"username"`
  109. Password string `json:"password"`
  110. }
  111. if err := c.Bind(&json); err != nil {
  112. c.JSON(200, gin.H{
  113. "result": "deny", // 认证结果 可选 "allow" | "deny" | "ignore"
  114. "is_superuser": false, // 超级用户 可选 true | false,该项为空时默认为 false
  115. })
  116. return
  117. }
  118. // 保证 Clientid唯一,否则会 下线
  119. if json.Clientid != json.Username {
  120. c.JSON(200, gin.H{
  121. "result": "deny", // 认证结果 可选 "allow" | "deny" | "ignore"
  122. "is_superuser": false, // 超级用户 可选 true | false,该项为空时默认为 false
  123. })
  124. return
  125. }
  126. Device_r := Device.Device{
  127. T_sn: json.Username,
  128. }
  129. // 判断是否存在
  130. if !Device_r.Read_Tidy() {
  131. c.JSON(200, gin.H{
  132. "result": "deny", // 认证结果 可选 "allow" | "deny" | "ignore"
  133. "is_superuser": false, // 超级用户 可选 true | false,该项为空时默认为 false
  134. })
  135. return
  136. }
  137. // 判断密码是否正确
  138. if Device_r.T_password != json.Password {
  139. c.JSON(200, gin.H{
  140. "result": "deny", // 认证结果 可选 "allow" | "deny" | "ignore"
  141. "is_superuser": false, // 超级用户 可选 true | false,该项为空时默认为 false
  142. })
  143. return
  144. }
  145. c.JSON(200, gin.H{
  146. "result": "allow", // 可选 "allow" | "deny" | "ignore"
  147. "is_superuser": false, // 可选 true | false,该项为空时默认为 false
  148. })
  149. })
  150. r.POST("/Acl", func(c *gin.Context) {
  151. logs.Println("Acl!!")
  152. var json struct {
  153. Clientid string `json:"clientid"` //${clientid} — 客户端的 ID。
  154. Username string `json:"username"` //${username} — 客户端登录是用的用户名。
  155. Peerhost string `json:"peerhost"` //${peerhost} — 客户端的源 IP 地址。
  156. Proto_name string `json:"proto_name"` //${proto_name} — 客户端使用的协议名称。例如 MQTT,CoAP 等。
  157. Mountpoint string `json:"mountpoint"` //${mountpoint} — 网关监听器的挂载点(主题前缀)。
  158. Action string `json:"action"` //${action} — 当前执行的动作请求,例如 publish,subscribe。
  159. Topic string `json:"topic"` //${topic} — 当前请求想要发布或订阅的主题(或主题过滤器)
  160. }
  161. if err := c.Bind(&json); err != nil {
  162. c.JSON(200, gin.H{
  163. "result": "deny", // 可选 "allow" | "deny" | "ignore"
  164. })
  165. }
  166. fmt.Println("json:", json)
  167. if json.Username == "admin" {
  168. c.JSON(200, gin.H{"result": "allow"})
  169. return
  170. }
  171. if json.Username == "test" {
  172. c.JSON(200, gin.H{"result": "allow"})
  173. return
  174. }
  175. // 保证 Clientid唯一,否则会 下线
  176. if json.Clientid != json.Username {
  177. c.JSON(200, gin.H{"result": "deny"})
  178. return
  179. }
  180. Device_r := Device.Device{
  181. T_sn: json.Username,
  182. }
  183. // 判断是否存在
  184. if !Device_r.Read_Tidy() {
  185. c.JSON(200, gin.H{"result": "deny"})
  186. return
  187. }
  188. //topic /sub/SN /pub/SN
  189. topic_list := strings.Split(json.Topic, "/")
  190. for _, v := range topic_list {
  191. if len(v) < 7 {
  192. continue // 不是有效SN
  193. }
  194. if v == Device_r.T_sn {
  195. c.JSON(200, gin.H{"result": "allow"})
  196. return
  197. }
  198. }
  199. logs.Println("MQTT Acl E!topic_list:", topic_list)
  200. c.JSON(200, gin.H{"result": "deny"})
  201. return
  202. //
  203. //Clientid_list := strings.Split(username+"_s", "_")
  204. //username = Clientid_list[0]
  205. //if topic_list[2] != username {
  206. // fmt.Println("topic_list[2] != username",topic_list[2])
  207. // c.JSON(200, gin.H{"result": "deny"})
  208. // return
  209. //}
  210. //c.JSON(200, gin.H{"result": "allow"})
  211. //return
  212. })
  213. r.Run(":8080")
  214. }
  215. // 开始处理
  216. func messagePubHandler(topicName string, message []byte) {
  217. // 本地测试
  218. logs.Println("", "============= Mqtt JSON =============")
  219. logs.Println("<-"+topicName, string(message))
  220. // 过滤
  221. topicNameS := strings.Split(topicName, "/")
  222. if len(topicNameS) == 1 {
  223. logs.Println("MqttServer", "订阅地址错误 len(topicNameS) == 1", strconv.Itoa(len(topicNameS)))
  224. return
  225. }
  226. // 验证设备
  227. Device_r := Device.Device{}
  228. for _, sn := range topicNameS {
  229. if len(sn) < 7 {
  230. continue // 不是有效SN
  231. }
  232. Device_r.T_sn = sn
  233. if Device_r.Read_Tidy() {
  234. if Device_r.T_state == 3 {
  235. continue // 不是有效SN
  236. }
  237. break
  238. }
  239. }
  240. if Device_r.T_state != 1 {
  241. logs.Println("MqttServer", Device_r.T_sn+" 设备被 禁用、删除、无效")
  242. return
  243. }
  244. Rt_r := Handle.PullHandle(&Device_r, topicName, message)
  245. // 返回
  246. data, _ := json.Marshal(Rt_r)
  247. Handle.PushHandle(&Device_r, topicName, string(data))
  248. }
  249. //
  250. //// 发送数据
  251. //func Mqtt_publish(topic string, b []byte) {
  252. // lib.TopicMap[topic] = true
  253. // // Publish a message.
  254. // err := lib.MqttClient.Publish(&client.PublishOptions{
  255. // QoS: mqtt.QoS0,
  256. // TopicName: []byte(topic),
  257. // Message: b,
  258. // })
  259. //
  260. // logs.PrintlnMqtt("-> " + topic + ":" + string(b))
  261. // if err != nil {
  262. // logs.PrintlnError("MqttServer", "发送消息失败 [Mqtt_publish]", "-> "+topic+" "+string(b))
  263. // }
  264. //
  265. //}