Nats.go 8.7 KB


  1. package Nats
  2. import (
  3. "Cold_Api/conf"
  4. "Cold_Api/controllers/lib"
  5. "Cold_Api/logs"
  6. "Cold_Api/models/Account"
  7. "Cold_Api/models/Device"
  8. "Cold_Api/models/System"
  9. "encoding/xml"
  10. "fmt"
  11. "github.com/astaxie/beego/cache"
  12. "github.com/nats-io/nats.go"
  13. "github.com/vmihailenco/msgpack/v5"
  14. "strings"
  15. )
  16. var redisCache_NatsServer cache.Cache
  17. func init() {
  18. logs.Println("============Nats init============")
  19. var err error
  20. // 连接Nats服务器
  21. lib.Nats, err = nats.Connect("nats://" + conf.NatsServer_Url)
  22. if err != nil {
  23. fmt.Println("nats 连接失败!")
  24. panic(err)
  25. }
  26. logs.Println("nats OK!")
  27. go NatsInit()
  28. config := fmt.Sprintf(`{"key":"%s","conn":"%s","dbNum":"%s","password":"%s"}`,
  29. "redisCache_NatsServer", conf.Redis_address, conf.Redis_dbNum, conf.Redis_password)
  30. fmt.Println(config)
  31. redisCache_NatsServer, err = cache.NewCache("redis", config)
  32. if err != nil || redisCache_NatsServer == nil {
  33. errMsg := "failed to init redis"
  34. fmt.Println(errMsg, err)
  35. panic(errMsg)
  36. }
  37. }
  38. func NatsInit() {
  39. // 获取微信二维码 返回结果
  40. _, _ = lib.Nats.Subscribe("Wx_BasicMessage_Event_QRCode", func(m *nats.Msg) {
  41. fmt.Printf(" => Nats Wx_BasicMessage_Event_QRCode message: %s\n", string(m.Data))
  42. type Person_QRCode struct {
  43. ToUserName string `xml:"ToUserName"` //注意这里有个反引号
  44. FromUserName string `xml:"FromUserName"` //注意这里有个反引号
  45. CreateTime string `xml:"CreateTime"` //注意这里有个反引号
  46. EventKey string `xml:"EventKey"`
  47. }
  48. var person_QRCode Person_QRCode
  49. err1 := xml.Unmarshal(m.Data, &person_QRCode)
  50. if err1 != nil {
  51. fmt.Println("Unmarshal error")
  52. _ = lib.Nats.Publish(m.Reply, []byte(""))
  53. return
  54. }
  55. // 进入 二维码配对
  56. fmt.Println("FromUserName-", person_QRCode.FromUserName)
  57. fmt.Println("EventKey-", person_QRCode.EventKey)
  58. // 开始 处理消息
  59. if strings.Contains(person_QRCode.EventKey, "@宝智达冷链 微信公众号通知") {
  60. //"请将本内容发送到 深圳市宝智达科技有限公司 微信公众号-|"+lib.AesEncryptCBC(T_calss_id, "0123456789012345")+"|-",
  61. Content_r := lib.GetBetweenStr(person_QRCode.EventKey, "-|", "|-")
  62. decryptCode := lib.AesDecryptCBC(Content_r, "0123456789012345")
  63. fmt.Println("解密结果 UUID:", decryptCode, " Content_r:", Content_r)
  64. Admin_r, err := Account.Read_Admin_ByUuid(decryptCode)
  65. if err != nil {
  66. _ = lib.Nats.Publish(m.Reply, []byte(""))
  67. return
  68. }
  69. Admin_r.T_wx = person_QRCode.FromUserName
  70. Account.Update_Admin(Admin_r, "T_wx")
  71. _ = lib.Nats.Publish(m.Reply, []byte(Admin_r.T_name+" 绑定成功!"))
  72. return
  73. }
  74. _ = lib.Nats.Publish(m.Reply, []byte(""))
  75. })
  76. // 请求-响应 验证登录
  77. _, _ = lib.Nats.Subscribe("Cold_User_verification", func(m *nats.Msg) {
  78. fmt.Printf("Cold_User_verification message: %s\n", string(m.Data))
  79. type T_R struct {
  80. Code int16 `xml:"Code"`
  81. Msg string `xml:"Msg"`
  82. Data Account.Admin_R `xml:"Data"` // 泛型
  83. }
  84. var t_R T_R
  85. // 验证登录
  86. b_, admin_r := lib.Verification(string(m.Data), "")
  87. if !b_ {
  88. fmt.Println("Unmarshal error")
  89. t_R.Code = 201
  90. t_R.Msg = "User_tokey Err!"
  91. b, _ := msgpack.Marshal(&t_R)
  92. _ = lib.Nats.Publish(m.Reply, b)
  93. return
  94. }
  95. t_R.Code = 200
  96. t_R.Msg = "ok"
  97. t_R.Data = Account.AdminToAdmin_R(admin_r)
  98. b, _ := msgpack.Marshal(&t_R)
  99. _ = lib.Nats.Publish(m.Reply, b)
  100. })
  101. // 请求-响应 通过公司名获取所有公司信息
  102. _, _ = lib.Nats.Subscribe("Cold_User_CompanyListAllByT_name", func(m *nats.Msg) {
  103. fmt.Printf("CompanyListAll message: %s\n", string(m.Data))
  104. type T_R struct {
  105. Code int16 `xml:"Code"`
  106. Msg string `xml:"Msg"`
  107. Data []Account.Company `xml:"Data"` // 泛型
  108. }
  109. var t_R T_R
  110. t_R.Code = 200
  111. t_R.Msg = "ok"
  112. t_R.Data = Account.Read_Company_List_All_ByT_name(string(m.Data))
  113. b, _ := msgpack.Marshal(&t_R)
  114. _ = lib.Nats.Publish(m.Reply, b)
  115. })
  116. // 请求-响应 获取所有用户列表
  117. _, _ = lib.Nats.Subscribe("Cold_User_UserListAll", func(m *nats.Msg) {
  118. type T_R struct {
  119. Code int16 `xml:"Code"`
  120. Msg string `xml:"Msg"`
  121. Data []Account.Admin_R `xml:"Data"` // 泛型
  122. }
  123. var t_R T_R
  124. t_R.Code = 200
  125. t_R.Msg = "ok"
  126. t_R.Data = Account.Read_Admin_List_All()
  127. b, _ := msgpack.Marshal(&t_R)
  128. _ = lib.Nats.Publish(m.Reply, b)
  129. })
  130. // 请求-响应 检查用户权限
  131. _, _ = lib.Nats.Subscribe("Cold_User_CheckUserPermissions", func(m *nats.Msg) {
  132. fmt.Printf("Cold_User_CheckUserPermissions message: %s\n", string(m.Data))
  133. type T_Req struct {
  134. Power_Id int `xml:"Power_Id"` // 权限id
  135. Req_Url string `xml:"Req_Url"` // 请求url
  136. }
  137. type T_R struct {
  138. Code int16 `xml:"Code"`
  139. Msg string `xml:"Msg"`
  140. Pass bool `xml:"Pass"` // 泛型
  141. }
  142. var t_Req T_Req
  143. var t_R T_R
  144. err := msgpack.Unmarshal(m.Data, &t_Req)
  145. if err != nil {
  146. t_R.Code = 201
  147. t_R.Msg = "Unmarshal error"
  148. b, _ := msgpack.Marshal(&t_R)
  149. _ = lib.Nats.Publish(m.Reply, b)
  150. return
  151. }
  152. power, _ := Account.Read_Power_ById(t_Req.Power_Id)
  153. flag := false
  154. if power.T_menu == "*" {
  155. flag = true
  156. }
  157. if !flag {
  158. api := Account.Read_API_List_ByPower_Id(power.Id, power.T_menu)
  159. for _, v := range api {
  160. if conf.Version+v.T_uri == t_Req.Req_Url {
  161. flag = true
  162. break
  163. }
  164. }
  165. }
  166. t_R.Code = 200
  167. t_R.Msg = "ok"
  168. t_R.Pass = flag
  169. b, _ := msgpack.Marshal(&t_R)
  170. _ = lib.Nats.Publish(m.Reply, b)
  171. })
  172. // 发布-订阅 模式,异步订阅 AddSysLogs
  173. _, _ = lib.Nats.Subscribe("Cold_User_AddSysLogs", func(m *nats.Msg) {
  174. fmt.Printf("AddSysLogs message: %s\n", string(m.Data))
  175. type T_S struct {
  176. T_class string
  177. T_title string
  178. T_txt interface{}
  179. }
  180. var t_S T_S
  181. err := msgpack.Unmarshal(m.Data, &t_S)
  182. if err != nil {
  183. System.Add_Logs("Nats", "Nats AddSysLogs 解析失败", string(m.Data))
  184. return
  185. }
  186. System.Add_Logs_T(t_S.T_class, t_S.T_title, t_S.T_txt)
  187. })
  188. // 发布-订阅 模式,异步订阅 AddUserLogs
  189. _, _ = lib.Nats.Subscribe("Cold_User_AddUserLogs", func(m *nats.Msg) {
  190. fmt.Printf("AddUserLogs message: %s\n", string(m.Data))
  191. type T_S struct {
  192. T_uuid string
  193. T_class string
  194. T_title string
  195. T_txt interface{}
  196. }
  197. var t_S T_S
  198. err := msgpack.Unmarshal(m.Data, &t_S)
  199. if err != nil {
  200. System.Add_Logs("Nats", "Nats AddUserLogs 解析失败", string(m.Data))
  201. return
  202. }
  203. System.Add_UserLogs_T(t_S.T_uuid, t_S.T_class, t_S.T_title, t_S.T_txt)
  204. })
  205. // 请求-响应 获取设备
  206. _, _ = lib.Nats.Subscribe("Cold_ReadDeviceByT_sn", func(m *nats.Msg) {
  207. fmt.Printf("CheckUserPermissions message: %s\n", string(m.Data))
  208. type T_R struct {
  209. Code int16 `xml:"Code"`
  210. Msg string `xml:"Msg"`
  211. Data Device.Device `xml:"Data"` // 泛型
  212. }
  213. var t_R T_R
  214. device, err := Device.Read_Device_ByT_sn(string(m.Data))
  215. if err != nil {
  216. t_R.Code = 202
  217. t_R.Msg = "查询失败"
  218. b, _ := msgpack.Marshal(&t_R)
  219. _ = lib.Nats.Publish(m.Reply, b)
  220. }
  221. t_R.Code = 200
  222. t_R.Msg = "ok"
  223. t_R.Data = device
  224. b, _ := msgpack.Marshal(&t_R)
  225. _ = lib.Nats.Publish(m.Reply, b)
  226. })
  227. // 请求-响应 获取传感器列表
  228. _, _ = lib.Nats.Subscribe("Cold_ReadDeviceSensorALLByT_sn", func(m *nats.Msg) {
  229. fmt.Printf("CheckUserPermissions message: %s\n", string(m.Data))
  230. type T_R struct {
  231. Code int16 `xml:"Code"`
  232. Msg string `xml:"Msg"`
  233. Data []Device.DeviceSensor `xml:"Data"` // 泛型
  234. }
  235. var t_R T_R
  236. device := Device.Read_DeviceSensor_ALL_List_T_sn(string(m.Data))
  237. t_R.Code = 200
  238. t_R.Msg = "ok"
  239. t_R.Data = device
  240. b, _ := msgpack.Marshal(&t_R)
  241. _ = lib.Nats.Publish(m.Reply, b)
  242. })
  243. // 请求-响应 获取设备数据列表
  244. _, _ = lib.Nats.Subscribe("Cold_ReadDeviceDataListBy_T_snid", func(m *nats.Msg) {
  245. fmt.Printf("Cold_ReadDeviceDataListBy_T_snid message: %s\n", string(m.Data))
  246. type T_Req struct {
  247. T_snid string `xml:"T_snid"`
  248. Time_start string `xml:"Time_start"`
  249. Time_end string `xml:"Time_end"`
  250. Page int `xml:"Page"`
  251. Page_z int `xml:"Page_z"`
  252. }
  253. type T_R struct {
  254. Code int16 `xml:"Code"`
  255. Msg string `xml:"Msg"`
  256. Count int64 `xml:"Count"`
  257. Data []Device.DeviceData_R `xml:"Data"` // 泛型
  258. }
  259. var t_Req T_Req
  260. var t_R T_R
  261. err := msgpack.Unmarshal(m.Data, &t_Req)
  262. if err != nil {
  263. t_R.Code = 201
  264. t_R.Msg = "Unmarshal error"
  265. b, _ := msgpack.Marshal(&t_R)
  266. _ = lib.Nats.Publish(m.Reply, b)
  267. return
  268. }
  269. deviceData, cnt := Device.Read_DeviceData_By_T_snid_List(t_Req.T_snid, t_Req.Time_start, t_Req.Time_end, t_Req.Page, t_Req.Page_z)
  270. t_R.Code = 200
  271. t_R.Msg = "ok"
  272. t_R.Count = cnt
  273. t_R.Data = deviceData
  274. b, _ := msgpack.Marshal(&t_R)
  275. _ = lib.Nats.Publish(m.Reply, b)
  276. })
  277. }