Nats.go 10 KB


  1. package Nats
  2. import (
  3. "Cold_Api/conf"
  4. "Cold_Api/controllers/lib"
  5. "Cold_Api/logs"
  6. "Cold_Api/models/Account"
  7. "Cold_Api/models/Device"
  8. "Cold_Api/models/System"
  9. "encoding/xml"
  10. "fmt"
  11. "github.com/astaxie/beego/cache"
  12. "github.com/nats-io/nats.go"
  13. "github.com/vmihailenco/msgpack/v5"
  14. "strconv"
  15. "strings"
  16. )
  17. var redisCache_NatsServer cache.Cache
  18. func init() {
  19. logs.Println("============Nats init============")
  20. var err error
  21. // 连接Nats服务器
  22. lib.Nats, err = nats.Connect("nats://" + conf.NatsServer_Url)
  23. if err != nil {
  24. fmt.Println("nats 连接失败!")
  25. panic(err)
  26. }
  27. logs.Println("nats OK!")
  28. go NatsInit()
  29. config := fmt.Sprintf(`{"key":"%s","conn":"%s","dbNum":"%s","password":"%s"}`,
  30. "redisCache_NatsServer", conf.Redis_address, conf.Redis_dbNum, conf.Redis_password)
  31. fmt.Println(config)
  32. redisCache_NatsServer, err = cache.NewCache("redis", config)
  33. if err != nil || redisCache_NatsServer == nil {
  34. errMsg := "failed to init redis"
  35. fmt.Println(errMsg, err)
  36. panic(errMsg)
  37. }
  38. }
  39. func NatsInit() {
  40. // 获取微信二维码 返回结果
  41. _, _ = lib.Nats.Subscribe("Wx_BasicMessage_Event_QRCode", func(m *nats.Msg) {
  42. fmt.Printf(" => Nats Wx_BasicMessage_Event_QRCode message: %s\n", string(m.Data))
  43. type Person_QRCode struct {
  44. ToUserName string `xml:"ToUserName"` //注意这里有个反引号
  45. FromUserName string `xml:"FromUserName"` //注意这里有个反引号
  46. CreateTime string `xml:"CreateTime"` //注意这里有个反引号
  47. EventKey string `xml:"EventKey"`
  48. }
  49. var person_QRCode Person_QRCode
  50. err1 := xml.Unmarshal(m.Data, &person_QRCode)
  51. if err1 != nil {
  52. fmt.Println("Unmarshal error")
  53. _ = lib.Nats.Publish(m.Reply, []byte(""))
  54. return
  55. }
  56. // 进入 二维码配对
  57. fmt.Println("FromUserName-", person_QRCode.FromUserName)
  58. fmt.Println("EventKey-", person_QRCode.EventKey)
  59. // 开始 处理消息
  60. if strings.Contains(person_QRCode.EventKey, "@宝智达冷链 微信公众号通知") {
  61. //"请将本内容发送到 深圳市宝智达科技有限公司 微信公众号-|"+lib.AesEncryptCBC(T_calss_id, "0123456789012345")+"|-",
  62. Content_r := lib.GetBetweenStr(person_QRCode.EventKey, "-|", "|-")
  63. decryptCode := lib.AesDecryptCBC(Content_r, "0123456789012345")
  64. T_uuid_name := strings.Split(decryptCode, "/")
  65. var uuid, name string
  66. if len(T_uuid_name) == 2 {
  67. uuid = strings.Split(decryptCode, "/")[0]
  68. name = strings.Split(decryptCode, "/")[1]
  69. }
  70. fmt.Println("解密结果 UUID name:", decryptCode, " Content_r:", Content_r)
  71. Admin_r, err := Account.Read_Admin_ByUuid(uuid)
  72. if err != nil {
  73. _ = lib.Nats.Publish(m.Reply, []byte(""))
  74. return
  75. }
  76. T_wx := fmt.Sprintf("%s/%s", person_QRCode.FromUserName, name)
  77. // 已绑定 name相同 提示已绑定
  78. // 已绑定 name不同 在数据库修改name
  79. if strings.Contains(Admin_r.T_wx, person_QRCode.FromUserName) {
  80. list := strings.Split(Admin_r.T_wx, "|")
  81. for _, v := range list {
  82. if strings.Split(v, "/")[0] == person_QRCode.FromUserName {
  83. if v == T_wx {
  84. _ = lib.Nats.Publish(m.Reply, []byte(name+" 已绑定!"))
  85. return
  86. }
  87. Admin_r.T_wx = strings.Replace(Admin_r.T_wx, v, T_wx, 1)
  88. }
  89. }
  90. } else {
  91. Admin_r.T_wx += T_wx + "|"
  92. }
  93. Account.Update_Admin(Admin_r, "T_wx")
  94. _ = lib.Nats.Publish(m.Reply, []byte(name+" 绑定成功!"))
  95. return
  96. }
  97. _ = lib.Nats.Publish(m.Reply, []byte(""))
  98. })
  99. // 请求-响应 验证登录
  100. _, _ = lib.Nats.Subscribe("Cold_User_verification", func(m *nats.Msg) {
  101. fmt.Printf("Cold_User_verification message: %s\n", string(m.Data))
  102. type T_R struct {
  103. Code int16 `xml:"Code"`
  104. Msg string `xml:"Msg"`
  105. Pid int `xml:"Pid"` // 公司id
  106. Data Account.Admin_R `xml:"Data"` // 泛型
  107. }
  108. var t_R T_R
  109. // 验证登录
  110. b_, admin_r := Account.Verification(string(m.Data), "")
  111. if !b_ {
  112. fmt.Println("Unmarshal error")
  113. t_R.Code = 201
  114. t_R.Msg = "User_tokey Err!"
  115. b, _ := msgpack.Marshal(&t_R)
  116. _ = lib.Nats.Publish(m.Reply, b)
  117. return
  118. }
  119. T_pid, is := Account.Redis_Tokey_T_pid_Get(string(m.Data))
  120. if !is {
  121. t_R.Code = 201
  122. t_R.Msg = "User_tokey Err!"
  123. b, _ := msgpack.Marshal(&t_R)
  124. _ = lib.Nats.Publish(m.Reply, b)
  125. return
  126. }
  127. t_R.Pid = T_pid
  128. t_R.Code = 200
  129. t_R.Msg = "ok"
  130. t_R.Data = Account.AdminToAdmin_R(admin_r)
  131. b, _ := msgpack.Marshal(&t_R)
  132. _ = lib.Nats.Publish(m.Reply, b)
  133. })
  134. // 请求-响应 通过公司名获取所有公司信息
  135. _, _ = lib.Nats.Subscribe("Cold_User_CompanyListAllByT_name", func(m *nats.Msg) {
  136. fmt.Printf("CompanyListAll message: %s\n", string(m.Data))
  137. type T_R struct {
  138. Code int16 `xml:"Code"`
  139. Msg string `xml:"Msg"`
  140. Data []Account.Company `xml:"Data"` // 泛型
  141. }
  142. var t_R T_R
  143. t_R.Code = 200
  144. t_R.Msg = "ok"
  145. t_R.Data = Account.Read_Company_List_All_ByT_name(string(m.Data))
  146. b, _ := msgpack.Marshal(&t_R)
  147. _ = lib.Nats.Publish(m.Reply, b)
  148. })
  149. // 请求-响应 获取公司
  150. _, _ = lib.Nats.Subscribe("Cold_ReadCompanyByT_id", func(m *nats.Msg) {
  151. fmt.Printf("Cold_ReadCompanyByT_id message: %s\n", string(m.Data))
  152. type T_R struct {
  153. Code int16 `xml:"Code"`
  154. Msg string `xml:"Msg"`
  155. Data Account.Company `xml:"Data"` // 泛型
  156. }
  157. var t_R T_R
  158. id, _ := strconv.Atoi(string(m.Data))
  159. company, err := Account.Read_Company_ById(id)
  160. if err != nil {
  161. t_R.Code = 202
  162. t_R.Msg = err.Error()
  163. b, _ := msgpack.Marshal(&t_R)
  164. _ = lib.Nats.Publish(m.Reply, b)
  165. }
  166. t_R.Code = 200
  167. t_R.Msg = "ok"
  168. t_R.Data = company
  169. b, _ := msgpack.Marshal(&t_R)
  170. _ = lib.Nats.Publish(m.Reply, b)
  171. })
  172. // 请求-响应 获取所有用户列表
  173. _, _ = lib.Nats.Subscribe("Cold_User_UserListAll", func(m *nats.Msg) {
  174. type T_R struct {
  175. Code int16 `xml:"Code"`
  176. Msg string `xml:"Msg"`
  177. Data []Account.Admin_R `xml:"Data"` // 泛型
  178. }
  179. var t_R T_R
  180. t_R.Code = 200
  181. t_R.Msg = "ok"
  182. t_R.Data = Account.Read_Admin_List_All()
  183. b, _ := msgpack.Marshal(&t_R)
  184. _ = lib.Nats.Publish(m.Reply, b)
  185. })
  186. // 请求-响应 检查用户权限
  187. _, _ = lib.Nats.Subscribe("Cold_User_CheckUserPermissions", func(m *nats.Msg) {
  188. fmt.Printf("Cold_User_CheckUserPermissions message: %s\n", string(m.Data))
  189. type T_Req struct {
  190. Power_Id int `xml:"Power_Id"` // 权限id
  191. Req_Url string `xml:"Req_Url"` // 请求url
  192. }
  193. type T_R struct {
  194. Code int16 `xml:"Code"`
  195. Msg string `xml:"Msg"`
  196. Pass bool `xml:"Pass"` // 泛型
  197. }
  198. var t_Req T_Req
  199. var t_R T_R
  200. err := msgpack.Unmarshal(m.Data, &t_Req)
  201. if err != nil {
  202. t_R.Code = 201
  203. t_R.Msg = "Unmarshal error"
  204. b, _ := msgpack.Marshal(&t_R)
  205. _ = lib.Nats.Publish(m.Reply, b)
  206. return
  207. }
  208. power, err := Account.Read_Power_ById(t_Req.Power_Id)
  209. if err != nil {
  210. t_R.Code = 202
  211. t_R.Msg = err.Error()
  212. b, _ := msgpack.Marshal(&t_R)
  213. _ = lib.Nats.Publish(m.Reply, b)
  214. }
  215. flag := false
  216. if power.T_menu == "*" {
  217. flag = true
  218. }
  219. if !flag {
  220. api := Account.Read_API_List_ByPower_Id(power.Id, power.T_menu)
  221. for _, v := range api {
  222. if conf.Version+v.T_uri == t_Req.Req_Url {
  223. flag = true
  224. break
  225. }
  226. }
  227. }
  228. t_R.Code = 200
  229. t_R.Msg = "ok"
  230. t_R.Pass = flag
  231. b, _ := msgpack.Marshal(&t_R)
  232. _ = lib.Nats.Publish(m.Reply, b)
  233. })
  234. // 发布-订阅 模式,异步订阅 AddSysLogs
  235. _, _ = lib.Nats.Subscribe("Cold_User_AddSysLogs", func(m *nats.Msg) {
  236. fmt.Printf("AddSysLogs message: %s\n", string(m.Data))
  237. type T_S struct {
  238. T_class string
  239. T_title string
  240. T_txt interface{}
  241. }
  242. var t_S T_S
  243. err := msgpack.Unmarshal(m.Data, &t_S)
  244. if err != nil {
  245. System.Add_Logs("Nats", "Nats AddSysLogs 解析失败", string(m.Data))
  246. return
  247. }
  248. System.Add_Logs_T(t_S.T_class, t_S.T_title, t_S.T_txt)
  249. })
  250. // 发布-订阅 模式,异步订阅 AddUserLogs
  251. _, _ = lib.Nats.Subscribe("Cold_User_AddUserLogs", func(m *nats.Msg) {
  252. fmt.Printf("AddUserLogs message: %s\n", string(m.Data))
  253. type T_S struct {
  254. T_uuid string
  255. T_class string
  256. T_title string
  257. T_txt interface{}
  258. }
  259. var t_S T_S
  260. err := msgpack.Unmarshal(m.Data, &t_S)
  261. if err != nil {
  262. System.Add_Logs("Nats", "Nats AddUserLogs 解析失败", string(m.Data))
  263. return
  264. }
  265. System.Add_UserLogs_T(t_S.T_uuid, t_S.T_class, t_S.T_title, t_S.T_txt)
  266. })
  267. // 请求-响应 获取设备
  268. _, _ = lib.Nats.Subscribe("Cold_ReadDeviceByT_sn", func(m *nats.Msg) {
  269. fmt.Printf("CheckUserPermissions message: %s\n", string(m.Data))
  270. type T_R struct {
  271. Code int16 `xml:"Code"`
  272. Msg string `xml:"Msg"`
  273. Data Device.Device `xml:"Data"` // 泛型
  274. }
  275. var t_R T_R
  276. device, err := Device.Read_Device_ByT_sn(string(m.Data))
  277. if err != nil {
  278. t_R.Code = 202
  279. t_R.Msg = "查询失败"
  280. b, _ := msgpack.Marshal(&t_R)
  281. _ = lib.Nats.Publish(m.Reply, b)
  282. }
  283. t_R.Code = 200
  284. t_R.Msg = "ok"
  285. t_R.Data = device
  286. b, _ := msgpack.Marshal(&t_R)
  287. _ = lib.Nats.Publish(m.Reply, b)
  288. })
  289. // 请求-响应 获取传感器列表
  290. _, _ = lib.Nats.Subscribe("Cold_ReadDeviceSensorALLByT_sn", func(m *nats.Msg) {
  291. fmt.Printf("CheckUserPermissions message: %s\n", string(m.Data))
  292. type T_R struct {
  293. Code int16 `xml:"Code"`
  294. Msg string `xml:"Msg"`
  295. Data []Device.DeviceSensor `xml:"Data"` // 泛型
  296. }
  297. var t_R T_R
  298. device := Device.Read_DeviceSensor_ALL_List_T_sn(string(m.Data))
  299. t_R.Code = 200
  300. t_R.Msg = "ok"
  301. t_R.Data = device
  302. b, _ := msgpack.Marshal(&t_R)
  303. _ = lib.Nats.Publish(m.Reply, b)
  304. })
  305. // 请求-响应 获取设备数据列表
  306. _, _ = lib.Nats.Subscribe("Cold_ReadDeviceDataListBy_T_snid", func(m *nats.Msg) {
  307. fmt.Printf("Cold_ReadDeviceDataListBy_T_snid message: %s\n", string(m.Data))
  308. type T_Req struct {
  309. T_snid string `xml:"T_snid"`
  310. Time_start string `xml:"Time_start"`
  311. Time_end string `xml:"Time_end"`
  312. Page int `xml:"Page"`
  313. Page_z int `xml:"Page_z"`
  314. }
  315. type T_R struct {
  316. Code int16 `xml:"Code"`
  317. Msg string `xml:"Msg"`
  318. Count int64 `xml:"Count"`
  319. Data []Device.DeviceData_R `xml:"Data"` // 泛型
  320. }
  321. var t_Req T_Req
  322. var t_R T_R
  323. err := msgpack.Unmarshal(m.Data, &t_Req)
  324. if err != nil {
  325. t_R.Code = 201
  326. t_R.Msg = "Unmarshal error"
  327. b, _ := msgpack.Marshal(&t_R)
  328. _ = lib.Nats.Publish(m.Reply, b)
  329. return
  330. }
  331. deviceData, cnt := Device.Read_DeviceData_By_T_snid_List(t_Req.T_snid, t_Req.Time_start, t_Req.Time_end, t_Req.Page, t_Req.Page_z)
  332. t_R.Code = 200
  333. t_R.Msg = "ok"
  334. t_R.Count = cnt
  335. t_R.Data = deviceData
  336. b, _ := msgpack.Marshal(&t_R)
  337. _ = lib.Nats.Publish(m.Reply, b)
  338. })
  339. }