Nats.go 8.9 KB


  1. package Nats
  2. import (
  3. "Cold_Api/conf"
  4. "Cold_Api/controllers/lib"
  5. "Cold_Api/logs"
  6. "Cold_Api/models/Account"
  7. "Cold_Api/models/Device"
  8. "Cold_Api/models/System"
  9. "encoding/xml"
  10. "fmt"
  11. "github.com/astaxie/beego/cache"
  12. "github.com/nats-io/nats.go"
  13. "github.com/vmihailenco/msgpack/v5"
  14. "strings"
  15. )
  16. var redisCache_NatsServer cache.Cache
  17. func init() {
  18. logs.Println("============Nats init============")
  19. var err error
  20. // 连接Nats服务器
  21. lib.Nats, err = nats.Connect("nats://" + conf.NatsServer_Url)
  22. if err != nil {
  23. fmt.Println("nats 连接失败!")
  24. panic(err)
  25. }
  26. logs.Println("nats OK!")
  27. go NatsInit()
  28. config := fmt.Sprintf(`{"key":"%s","conn":"%s","dbNum":"%s","password":"%s"}`,
  29. "redisCache_NatsServer", conf.Redis_address, conf.Redis_dbNum, conf.Redis_password)
  30. fmt.Println(config)
  31. redisCache_NatsServer, err = cache.NewCache("redis", config)
  32. if err != nil || redisCache_NatsServer == nil {
  33. errMsg := "failed to init redis"
  34. fmt.Println(errMsg, err)
  35. panic(errMsg)
  36. }
  37. }
  38. func NatsInit() {
  39. // 获取微信二维码 返回结果
  40. _, _ = lib.Nats.Subscribe("Wx_BasicMessage_Event_QRCode", func(m *nats.Msg) {
  41. fmt.Printf(" => Nats Wx_BasicMessage_Event_QRCode message: %s\n", string(m.Data))
  42. type Person_QRCode struct {
  43. ToUserName string `xml:"ToUserName"` //注意这里有个反引号
  44. FromUserName string `xml:"FromUserName"` //注意这里有个反引号
  45. CreateTime string `xml:"CreateTime"` //注意这里有个反引号
  46. EventKey string `xml:"EventKey"`
  47. }
  48. var person_QRCode Person_QRCode
  49. err1 := xml.Unmarshal(m.Data, &person_QRCode)
  50. if err1 != nil {
  51. fmt.Println("Unmarshal error")
  52. _ = lib.Nats.Publish(m.Reply, []byte(""))
  53. return
  54. }
  55. // 进入 二维码配对
  56. fmt.Println("FromUserName-", person_QRCode.FromUserName)
  57. fmt.Println("EventKey-", person_QRCode.EventKey)
  58. // 开始 处理消息
  59. if strings.Contains(person_QRCode.EventKey, "@宝智达冷链 微信公众号通知") {
  60. //"请将本内容发送到 深圳市宝智达科技有限公司 微信公众号-|"+lib.AesEncryptCBC(T_calss_id, "0123456789012345")+"|-",
  61. Content_r := lib.GetBetweenStr(person_QRCode.EventKey, "-|", "|-")
  62. decryptCode := lib.AesDecryptCBC(Content_r, "0123456789012345")
  63. fmt.Println("解密结果 UUID:", decryptCode, " Content_r:", Content_r)
  64. Admin_r, err := Account.Read_Admin_ByUuid(decryptCode)
  65. if err != nil {
  66. _ = lib.Nats.Publish(m.Reply, []byte(""))
  67. return
  68. }
  69. Admin_r.T_wx = person_QRCode.FromUserName
  70. Account.Update_Admin(Admin_r, "T_wx")
  71. _ = lib.Nats.Publish(m.Reply, []byte(Admin_r.T_name+" 绑定成功!"))
  72. return
  73. }
  74. _ = lib.Nats.Publish(m.Reply, []byte(""))
  75. })
  76. // 请求-响应 验证登录
  77. _, _ = lib.Nats.Subscribe("Cold_User_verification", func(m *nats.Msg) {
  78. fmt.Printf("Cold_User_verification message: %s\n", string(m.Data))
  79. type T_R struct {
  80. Code int16 `xml:"Code"`
  81. Msg string `xml:"Msg"`
  82. Pid int `xml:"Pid"` // 公司id
  83. Data Account.Admin_R `xml:"Data"` // 泛型
  84. }
  85. var t_R T_R
  86. T_pid, is := Account.Redis_Tokey_T_pid_Get(string(m.Data))
  87. if !is {
  88. t_R.Code = 201
  89. t_R.Msg = "User_tokey Err!"
  90. b, _ := msgpack.Marshal(&t_R)
  91. _ = lib.Nats.Publish(m.Reply, b)
  92. return
  93. }
  94. // 验证登录
  95. b_, admin_r := lib.Verification(string(m.Data), "")
  96. if !b_ {
  97. fmt.Println("Unmarshal error")
  98. t_R.Code = 201
  99. t_R.Msg = "User_tokey Err!"
  100. b, _ := msgpack.Marshal(&t_R)
  101. _ = lib.Nats.Publish(m.Reply, b)
  102. return
  103. }
  104. t_R.Pid = T_pid
  105. t_R.Code = 200
  106. t_R.Msg = "ok"
  107. t_R.Data = Account.AdminToAdmin_R(admin_r)
  108. b, _ := msgpack.Marshal(&t_R)
  109. _ = lib.Nats.Publish(m.Reply, b)
  110. })
  111. // 请求-响应 通过公司名获取所有公司信息
  112. _, _ = lib.Nats.Subscribe("Cold_User_CompanyListAllByT_name", func(m *nats.Msg) {
  113. fmt.Printf("CompanyListAll message: %s\n", string(m.Data))
  114. type T_R struct {
  115. Code int16 `xml:"Code"`
  116. Msg string `xml:"Msg"`
  117. Data []Account.Company `xml:"Data"` // 泛型
  118. }
  119. var t_R T_R
  120. t_R.Code = 200
  121. t_R.Msg = "ok"
  122. t_R.Data = Account.Read_Company_List_All_ByT_name(string(m.Data))
  123. b, _ := msgpack.Marshal(&t_R)
  124. _ = lib.Nats.Publish(m.Reply, b)
  125. })
  126. // 请求-响应 获取所有用户列表
  127. _, _ = lib.Nats.Subscribe("Cold_User_UserListAll", func(m *nats.Msg) {
  128. type T_R struct {
  129. Code int16 `xml:"Code"`
  130. Msg string `xml:"Msg"`
  131. Data []Account.Admin_R `xml:"Data"` // 泛型
  132. }
  133. var t_R T_R
  134. t_R.Code = 200
  135. t_R.Msg = "ok"
  136. t_R.Data = Account.Read_Admin_List_All()
  137. b, _ := msgpack.Marshal(&t_R)
  138. _ = lib.Nats.Publish(m.Reply, b)
  139. })
  140. // 请求-响应 检查用户权限
  141. _, _ = lib.Nats.Subscribe("Cold_User_CheckUserPermissions", func(m *nats.Msg) {
  142. fmt.Printf("Cold_User_CheckUserPermissions message: %s\n", string(m.Data))
  143. type T_Req struct {
  144. Power_Id int `xml:"Power_Id"` // 权限id
  145. Req_Url string `xml:"Req_Url"` // 请求url
  146. }
  147. type T_R struct {
  148. Code int16 `xml:"Code"`
  149. Msg string `xml:"Msg"`
  150. Pass bool `xml:"Pass"` // 泛型
  151. }
  152. var t_Req T_Req
  153. var t_R T_R
  154. err := msgpack.Unmarshal(m.Data, &t_Req)
  155. if err != nil {
  156. t_R.Code = 201
  157. t_R.Msg = "Unmarshal error"
  158. b, _ := msgpack.Marshal(&t_R)
  159. _ = lib.Nats.Publish(m.Reply, b)
  160. return
  161. }
  162. power, _ := Account.Read_Power_ById(t_Req.Power_Id)
  163. flag := false
  164. if power.T_menu == "*" {
  165. flag = true
  166. }
  167. if !flag {
  168. api := Account.Read_API_List_ByPower_Id(power.Id, power.T_menu)
  169. for _, v := range api {
  170. if conf.Version+v.T_uri == t_Req.Req_Url {
  171. flag = true
  172. break
  173. }
  174. }
  175. }
  176. t_R.Code = 200
  177. t_R.Msg = "ok"
  178. t_R.Pass = flag
  179. b, _ := msgpack.Marshal(&t_R)
  180. _ = lib.Nats.Publish(m.Reply, b)
  181. })
  182. // 发布-订阅 模式,异步订阅 AddSysLogs
  183. _, _ = lib.Nats.Subscribe("Cold_User_AddSysLogs", func(m *nats.Msg) {
  184. fmt.Printf("AddSysLogs message: %s\n", string(m.Data))
  185. type T_S struct {
  186. T_class string
  187. T_title string
  188. T_txt interface{}
  189. }
  190. var t_S T_S
  191. err := msgpack.Unmarshal(m.Data, &t_S)
  192. if err != nil {
  193. System.Add_Logs("Nats", "Nats AddSysLogs 解析失败", string(m.Data))
  194. return
  195. }
  196. System.Add_Logs_T(t_S.T_class, t_S.T_title, t_S.T_txt)
  197. })
  198. // 发布-订阅 模式,异步订阅 AddUserLogs
  199. _, _ = lib.Nats.Subscribe("Cold_User_AddUserLogs", func(m *nats.Msg) {
  200. fmt.Printf("AddUserLogs message: %s\n", string(m.Data))
  201. type T_S struct {
  202. T_uuid string
  203. T_class string
  204. T_title string
  205. T_txt interface{}
  206. }
  207. var t_S T_S
  208. err := msgpack.Unmarshal(m.Data, &t_S)
  209. if err != nil {
  210. System.Add_Logs("Nats", "Nats AddUserLogs 解析失败", string(m.Data))
  211. return
  212. }
  213. System.Add_UserLogs_T(t_S.T_uuid, t_S.T_class, t_S.T_title, t_S.T_txt)
  214. })
  215. // 请求-响应 获取设备
  216. _, _ = lib.Nats.Subscribe("Cold_ReadDeviceByT_sn", func(m *nats.Msg) {
  217. fmt.Printf("CheckUserPermissions message: %s\n", string(m.Data))
  218. type T_R struct {
  219. Code int16 `xml:"Code"`
  220. Msg string `xml:"Msg"`
  221. Data Device.Device `xml:"Data"` // 泛型
  222. }
  223. var t_R T_R
  224. device, err := Device.Read_Device_ByT_sn(string(m.Data))
  225. if err != nil {
  226. t_R.Code = 202
  227. t_R.Msg = "查询失败"
  228. b, _ := msgpack.Marshal(&t_R)
  229. _ = lib.Nats.Publish(m.Reply, b)
  230. }
  231. t_R.Code = 200
  232. t_R.Msg = "ok"
  233. t_R.Data = device
  234. b, _ := msgpack.Marshal(&t_R)
  235. _ = lib.Nats.Publish(m.Reply, b)
  236. })
  237. // 请求-响应 获取传感器列表
  238. _, _ = lib.Nats.Subscribe("Cold_ReadDeviceSensorALLByT_sn", func(m *nats.Msg) {
  239. fmt.Printf("CheckUserPermissions message: %s\n", string(m.Data))
  240. type T_R struct {
  241. Code int16 `xml:"Code"`
  242. Msg string `xml:"Msg"`
  243. Data []Device.DeviceSensor `xml:"Data"` // 泛型
  244. }
  245. var t_R T_R
  246. device := Device.Read_DeviceSensor_ALL_List_T_sn(string(m.Data))
  247. t_R.Code = 200
  248. t_R.Msg = "ok"
  249. t_R.Data = device
  250. b, _ := msgpack.Marshal(&t_R)
  251. _ = lib.Nats.Publish(m.Reply, b)
  252. })
  253. // 请求-响应 获取设备数据列表
  254. _, _ = lib.Nats.Subscribe("Cold_ReadDeviceDataListBy_T_snid", func(m *nats.Msg) {
  255. fmt.Printf("Cold_ReadDeviceDataListBy_T_snid message: %s\n", string(m.Data))
  256. type T_Req struct {
  257. T_snid string `xml:"T_snid"`
  258. Time_start string `xml:"Time_start"`
  259. Time_end string `xml:"Time_end"`
  260. Page int `xml:"Page"`
  261. Page_z int `xml:"Page_z"`
  262. }
  263. type T_R struct {
  264. Code int16 `xml:"Code"`
  265. Msg string `xml:"Msg"`
  266. Count int64 `xml:"Count"`
  267. Data []Device.DeviceData_R `xml:"Data"` // 泛型
  268. }
  269. var t_Req T_Req
  270. var t_R T_R
  271. err := msgpack.Unmarshal(m.Data, &t_Req)
  272. if err != nil {
  273. t_R.Code = 201
  274. t_R.Msg = "Unmarshal error"
  275. b, _ := msgpack.Marshal(&t_R)
  276. _ = lib.Nats.Publish(m.Reply, b)
  277. return
  278. }
  279. deviceData, cnt := Device.Read_DeviceData_By_T_snid_List(t_Req.T_snid, t_Req.Time_start, t_Req.Time_end, t_Req.Page, t_Req.Page_z)
  280. t_R.Code = 200
  281. t_R.Msg = "ok"
  282. t_R.Count = cnt
  283. t_R.Data = deviceData
  284. b, _ := msgpack.Marshal(&t_R)
  285. _ = lib.Nats.Publish(m.Reply, b)
  286. })
  287. }