Nats.go 14 KB


  1. package Nats
  2. import (
  3. "Cold_Api/conf"
  4. "Cold_Api/controllers/lib"
  5. "Cold_Api/logs"
  6. "Cold_Api/models/Account"
  7. "Cold_Api/models/Company"
  8. "Cold_Api/models/Device"
  9. "Cold_Api/models/System"
  10. "encoding/xml"
  11. "fmt"
  12. "github.com/astaxie/beego/cache"
  13. "github.com/nats-io/nats.go"
  14. "github.com/vmihailenco/msgpack/v5"
  15. "strconv"
  16. "strings"
  17. "time"
  18. )
  19. var redisCache_NatsServer cache.Cache
  20. func init() {
  21. logs.Println("============Nats init============")
  22. var err error
  23. // 连接Nats服务器
  24. lib.Nats, err = nats.Connect("nats://" + conf.NatsServer_Url)
  25. if err != nil {
  26. fmt.Println("nats 连接失败!")
  27. panic(err)
  28. }
  29. logs.Println("nats OK!")
  30. go NatsInit()
  31. config := fmt.Sprintf(`{"key":"%s","conn":"%s","dbNum":"%s","password":"%s"}`,
  32. "redisCache_NatsServer", conf.Redis_address, conf.Redis_dbNum, conf.Redis_password)
  33. fmt.Println(config)
  34. redisCache_NatsServer, err = cache.NewCache("redis", config)
  35. if err != nil || redisCache_NatsServer == nil {
  36. errMsg := "failed to init redis"
  37. fmt.Println(errMsg, err)
  38. panic(errMsg)
  39. }
  40. }
  41. func NatsInit() {
  42. // 请求-响应, 响应 test3 消息。
  43. _, _ = lib.Nats.Subscribe("Wx_BasicMessage_Event_QRCode", func(m *nats.Msg) {
  44. fmt.Printf(" => Nats Wx_BasicMessage_Event_QRCode message: %s\n", string(m.Data))
  45. type Person_QRCode struct {
  46. ToUserName string `xml:"ToUserName"` //注意这里有个反引号
  47. FromUserName string `xml:"FromUserName"` //注意这里有个反引号
  48. CreateTime string `xml:"CreateTime"` //注意这里有个反引号
  49. EventKey string `xml:"EventKey"`
  50. }
  51. var person_QRCode Person_QRCode
  52. err1 := xml.Unmarshal(m.Data, &person_QRCode)
  53. if err1 != nil {
  54. fmt.Println("Unmarshal error")
  55. _ = lib.Nats.Publish(m.Reply, []byte(""))
  56. return
  57. }
  58. // 进入 二维码配对
  59. fmt.Println("FromUserName-", person_QRCode.FromUserName)
  60. fmt.Println("EventKey-", person_QRCode.EventKey)
  61. // 开始 处理消息
  62. if strings.Contains(person_QRCode.EventKey, "@宝智达 微信公众号通知") {
  63. //"请将本内容发送到 深圳市宝智达科技有限公司 微信公众号-|"+lib.AesEncryptCBC(T_calss_id, "0123456789012345")+"|-",
  64. Content_r := lib.GetBetweenStr(person_QRCode.EventKey, "-|", "|-")
  65. decryptCode := lib.AesDecryptCBC(Content_r, "0123456789012345")
  66. decryptCode_int, err := strconv.Atoi(decryptCode)
  67. fmt.Println("解密结果:", decryptCode, " decryptCode_int", decryptCode_int, " Content_r:", Content_r)
  68. R_DeviceNotice, err := Company.Read_CompanyNotice_ById(decryptCode_int)
  69. if err != nil {
  70. _ = lib.Nats.Publish(m.Reply, []byte(""))
  71. return
  72. }
  73. if strings.Contains(R_DeviceNotice.T_Notice_wx, person_QRCode.FromUserName) {
  74. _ = lib.Nats.Publish(m.Reply, []byte(R_DeviceNotice.T_name+" 已绑定,无需重复绑定!"))
  75. return
  76. }
  77. R_DeviceNotice.T_Notice_wx = R_DeviceNotice.T_Notice_wx + person_QRCode.FromUserName + "/重令名|"
  78. Company.Update_CompanyNotice(R_DeviceNotice, "T_Notice_wx")
  79. _ = lib.Nats.Publish(m.Reply, []byte(R_DeviceNotice.T_name+" 绑定成功!"))
  80. return
  81. }
  82. _ = lib.Nats.Publish(m.Reply, []byte(""))
  83. })
  84. // 请求-响应, 响应 test3 消息。
  85. _, _ = lib.Nats.Subscribe("Wx2_BasicMessage_Event_QRCode", func(m *nats.Msg) {
  86. fmt.Printf(" => Nats Wx2_BasicMessage_Event_QRCode message: %s\n", string(m.Data))
  87. type Person_QRCode struct {
  88. ToUserName string `xml:"ToUserName"` //注意这里有个反引号
  89. FromUserName string `xml:"FromUserName"` //注意这里有个反引号
  90. CreateTime string `xml:"CreateTime"` //注意这里有个反引号
  91. EventKey string `xml:"EventKey"`
  92. }
  93. var person_QRCode Person_QRCode
  94. err1 := xml.Unmarshal(m.Data, &person_QRCode)
  95. if err1 != nil {
  96. fmt.Println("Unmarshal error")
  97. _ = lib.Nats.Publish(m.Reply, []byte(""))
  98. return
  99. }
  100. // 进入 二维码配对
  101. fmt.Println("FromUserName-", person_QRCode.FromUserName)
  102. fmt.Println("EventKey-", person_QRCode.EventKey)
  103. // 开始 处理消息
  104. if strings.Contains(person_QRCode.EventKey, "@宝智达 微信公众号通知") {
  105. //"请将本内容发送到 深圳市宝智达科技有限公司 微信公众号-|"+lib.AesEncryptCBC(T_calss_id, "0123456789012345")+"|-",
  106. Content_r := lib.GetBetweenStr(person_QRCode.EventKey, "-|", "|-")
  107. decryptCode := lib.AesDecryptCBC(Content_r, "0123456789012345")
  108. decryptCode_int, err := strconv.Atoi(decryptCode)
  109. fmt.Println("解密结果:", decryptCode, " decryptCode_int", decryptCode_int, " Content_r:", Content_r)
  110. R_DeviceNotice, err := Company.Read_CompanyNotice_ById(decryptCode_int)
  111. if err != nil {
  112. _ = lib.Nats.Publish(m.Reply, []byte(""))
  113. return
  114. }
  115. if strings.Contains(R_DeviceNotice.T_Notice_wx2, person_QRCode.FromUserName+"/重令名|") {
  116. _ = lib.Nats.Publish(m.Reply, []byte(R_DeviceNotice.T_name+" 重复扫码!请在 5分钟内 回复您的名字,否则将 无法收到报警消息!如回复:张三"))
  117. return
  118. }
  119. if strings.Contains(R_DeviceNotice.T_Notice_wx2, person_QRCode.FromUserName) {
  120. _ = lib.Nats.Publish(m.Reply, []byte(R_DeviceNotice.T_name+" 已绑定,无需重复绑定!"))
  121. return
  122. }
  123. R_DeviceNotice.T_Notice_wx2 = R_DeviceNotice.T_Notice_wx2 + person_QRCode.FromUserName + "/重令名|"
  124. Company.Update_CompanyNotice(R_DeviceNotice, "T_Notice_wx2")
  125. redisCache_NatsServer.Put(person_QRCode.FromUserName, decryptCode_int, 5*time.Minute)
  126. _ = lib.Nats.Publish(m.Reply, []byte(R_DeviceNotice.T_name+" 扫码成功!请在 5分钟内 回复您的名字,否则将 无法收到报警消息!如回复:张三"))
  127. return
  128. }
  129. _ = lib.Nats.Publish(m.Reply, []byte(""))
  130. })
  131. // 请求-响应, 响应 test3 消息。
  132. _, _ = lib.Nats.Subscribe("Wx2_BasicMessage_Text", func(m *nats.Msg) {
  133. logs.Println(" => Nats Wx2_BasicMessage_Text message: %s\n", string(m.Data))
  134. type Person_Text struct {
  135. ToUserName string `xml:"ToUserName"` //注意这里有个反引号
  136. FromUserName string `xml:"FromUserName"` //注意这里有个反引号
  137. CreateTime string `xml:"CreateTime"` //注意这里有个反引号
  138. MsgType string `xml:"MsgType"`
  139. Content string `xml:"Content"`
  140. }
  141. var person_Text Person_Text
  142. err1 := xml.Unmarshal(m.Data, &person_Text)
  143. if err1 != nil {
  144. fmt.Println("Unmarshal error")
  145. _ = lib.Nats.Publish(m.Reply, []byte(""))
  146. return
  147. }
  148. // 进入 二维码配对
  149. fmt.Println("FromUserName-", person_Text.FromUserName)
  150. fmt.Println("Content-", person_Text.Content)
  151. if len(person_Text.Content) > 8*3 || len(person_Text.Content) <= 1*3 {
  152. _ = lib.Nats.Publish(m.Reply, []byte("请正确输入您的名字!"))
  153. return
  154. }
  155. if !redisCache_NatsServer.IsExist(person_Text.FromUserName) {
  156. return
  157. }
  158. Class_ById := lib.To_int(redisCache_NatsServer.Get(person_Text.FromUserName))
  159. R_DeviceNotice, err := Company.Read_CompanyNotice_ById(Class_ById)
  160. if err != nil {
  161. _ = lib.Nats.Publish(m.Reply, []byte(""))
  162. return
  163. }
  164. R_DeviceNotice.T_Notice_wx2 = strings.Replace(R_DeviceNotice.T_Notice_wx2,
  165. person_Text.FromUserName+"/重令名|",
  166. person_Text.FromUserName+"/"+person_Text.Content+"|",
  167. -1)
  168. Company.Update_CompanyNotice(R_DeviceNotice, "T_Notice_wx2")
  169. // 删除 缓存
  170. redisCache_NatsServer.Delete(person_Text.FromUserName)
  171. _ = lib.Nats.Publish(m.Reply, []byte("尊敬的 "+person_Text.Content+",您以成功绑定 "+R_DeviceNotice.T_name))
  172. })
  173. //// 发布-订阅 模式,异步订阅 test1
  174. //_, _ = Nats.Subscribe("test1", func(m *nats.Msg) {
  175. // fmt.Printf("Received a message: %s\n", string(m.Data))
  176. //})
  177. //// 队列 模式,订阅 test2, 队列为queue, test2 发向所有队列,同一队列只有一个能收到消息
  178. //_, _ = Nats.QueueSubscribe("test2", "queue", func(msg *nats.Msg) {
  179. // fmt.Printf("Queue a message: %s\n", string(msg.Data))
  180. //})
  181. //// 请求-响应, 响应 test3 消息。
  182. //_, _ = Nats.Subscribe("test3", func(m *nats.Msg) {
  183. // fmt.Printf("Reply a message: %s\n", string(m.Data))
  184. // _ = Nats.Publish(m.Reply, []byte("I can help!!"))
  185. //})
  186. // 请求-响应 验证登录
  187. _, _ = lib.Nats.Subscribe("Cold_User_verification", func(m *nats.Msg) {
  188. fmt.Printf("Cold_User_verification message: %s\n", string(m.Data))
  189. type T_R struct {
  190. Code int16 `xml:"Code"`
  191. Msg string `xml:"Msg"`
  192. Data Account.Admin_R `xml:"Data"` // 泛型
  193. }
  194. var t_R T_R
  195. // 验证登录
  196. b_, admin_r := lib.Verification(string(m.Data), "")
  197. if !b_ {
  198. fmt.Println("Unmarshal error")
  199. t_R.Code = 201
  200. t_R.Msg = "User_tokey Err!"
  201. b, _ := msgpack.Marshal(&t_R)
  202. _ = lib.Nats.Publish(m.Reply, b)
  203. return
  204. }
  205. t_R.Code = 200
  206. t_R.Msg = "ok"
  207. t_R.Data = Account.AdminToAdmin_R(admin_r)
  208. b, _ := msgpack.Marshal(&t_R)
  209. _ = lib.Nats.Publish(m.Reply, b)
  210. })
  211. // 请求-响应 通过公司名获取所有公司信息
  212. _, _ = lib.Nats.Subscribe("Cold_User_CompanyListAllByT_name", func(m *nats.Msg) {
  213. fmt.Printf("CompanyListAll message: %s\n", string(m.Data))
  214. type T_R struct {
  215. Code int16 `xml:"Code"`
  216. Msg string `xml:"Msg"`
  217. Data []Account.Company `xml:"Data"` // 泛型
  218. }
  219. var t_R T_R
  220. t_R.Code = 200
  221. t_R.Msg = "ok"
  222. t_R.Data = Account.Read_Company_List_All_ByT_name(string(m.Data))
  223. b, _ := msgpack.Marshal(&t_R)
  224. _ = lib.Nats.Publish(m.Reply, b)
  225. })
  226. // 请求-响应 获取所有用户列表
  227. _, _ = lib.Nats.Subscribe("Cold_User_UserListAll", func(m *nats.Msg) {
  228. type T_R struct {
  229. Code int16 `xml:"Code"`
  230. Msg string `xml:"Msg"`
  231. Data []Account.Admin_R `xml:"Data"` // 泛型
  232. }
  233. var t_R T_R
  234. t_R.Code = 200
  235. t_R.Msg = "ok"
  236. t_R.Data = Account.Read_Admin_List_All()
  237. b, _ := msgpack.Marshal(&t_R)
  238. _ = lib.Nats.Publish(m.Reply, b)
  239. })
  240. // 请求-响应 检查用户权限
  241. _, _ = lib.Nats.Subscribe("Cold_User_CheckUserPermissions", func(m *nats.Msg) {
  242. fmt.Printf("Cold_User_CheckUserPermissions message: %s\n", string(m.Data))
  243. type T_Req struct {
  244. Power_Id int `xml:"Power_Id"` // 权限id
  245. Req_Url string `xml:"Req_Url"` // 请求url
  246. }
  247. type T_R struct {
  248. Code int16 `xml:"Code"`
  249. Msg string `xml:"Msg"`
  250. Pass bool `xml:"Pass"` // 泛型
  251. }
  252. var t_Req T_Req
  253. var t_R T_R
  254. err := msgpack.Unmarshal(m.Data, &t_Req)
  255. if err != nil {
  256. t_R.Code = 201
  257. t_R.Msg = "Unmarshal error"
  258. b, _ := msgpack.Marshal(&t_R)
  259. _ = lib.Nats.Publish(m.Reply, b)
  260. return
  261. }
  262. power, _ := Account.Read_Power_ById(t_Req.Power_Id)
  263. flag := false
  264. if power.T_Menu_Bind == "*" {
  265. flag = true
  266. }
  267. if !flag {
  268. api := Account.Read_API_List_ByPower_Id(power.Id, power.T_Menu_Bind)
  269. for _, v := range api {
  270. if conf.Version+v.T_uri == t_Req.Req_Url {
  271. flag = true
  272. break
  273. }
  274. }
  275. }
  276. t_R.Code = 200
  277. t_R.Msg = "ok"
  278. t_R.Pass = flag
  279. b, _ := msgpack.Marshal(&t_R)
  280. _ = lib.Nats.Publish(m.Reply, b)
  281. })
  282. // 发布-订阅 模式,异步订阅 AddSysLogs
  283. _, _ = lib.Nats.Subscribe("Cold_User_AddSysLogs", func(m *nats.Msg) {
  284. fmt.Printf("AddSysLogs message: %s\n", string(m.Data))
  285. type T_S struct {
  286. T_class string
  287. T_title string
  288. T_txt interface{}
  289. }
  290. var t_S T_S
  291. err := msgpack.Unmarshal(m.Data, &t_S)
  292. if err != nil {
  293. System.Add_Logs("Nats", "Nats AddSysLogs 解析失败", string(m.Data))
  294. return
  295. }
  296. System.Add_Logs_T(t_S.T_class, t_S.T_title, t_S.T_txt)
  297. })
  298. // 发布-订阅 模式,异步订阅 AddUserLogs
  299. _, _ = lib.Nats.Subscribe("Cold_User_AddUserLogs", func(m *nats.Msg) {
  300. fmt.Printf("AddUserLogs message: %s\n", string(m.Data))
  301. type T_S struct {
  302. T_uuid string
  303. T_class string
  304. T_title string
  305. T_txt interface{}
  306. }
  307. var t_S T_S
  308. err := msgpack.Unmarshal(m.Data, &t_S)
  309. if err != nil {
  310. System.Add_Logs("Nats", "Nats AddUserLogs 解析失败", string(m.Data))
  311. return
  312. }
  313. System.Add_UserLogs_T(t_S.T_uuid, t_S.T_class, t_S.T_title, t_S.T_txt)
  314. })
  315. // 请求-响应 获取设备
  316. _, _ = lib.Nats.Subscribe("Cold_ReadDeviceByT_sn", func(m *nats.Msg) {
  317. fmt.Printf("CheckUserPermissions message: %s\n", string(m.Data))
  318. type T_R struct {
  319. Code int16 `xml:"Code"`
  320. Msg string `xml:"Msg"`
  321. Data Device.Device `xml:"Data"` // 泛型
  322. }
  323. var t_R T_R
  324. device, err := Device.Read_Device_ByT_sn(string(m.Data))
  325. if err != nil {
  326. t_R.Code = 202
  327. t_R.Msg = "查询失败"
  328. b, _ := msgpack.Marshal(&t_R)
  329. _ = lib.Nats.Publish(m.Reply, b)
  330. }
  331. t_R.Code = 200
  332. t_R.Msg = "ok"
  333. t_R.Data = device
  334. b, _ := msgpack.Marshal(&t_R)
  335. _ = lib.Nats.Publish(m.Reply, b)
  336. })
  337. // 请求-响应 获取传感器列表
  338. _, _ = lib.Nats.Subscribe("Cold_ReadDeviceSensorALLByT_sn", func(m *nats.Msg) {
  339. fmt.Printf("CheckUserPermissions message: %s\n", string(m.Data))
  340. type T_R struct {
  341. Code int16 `xml:"Code"`
  342. Msg string `xml:"Msg"`
  343. Data []Device.DeviceSensor `xml:"Data"` // 泛型
  344. }
  345. var t_R T_R
  346. device := Device.Read_DeviceSensor_ALL_List_T_sn(string(m.Data))
  347. t_R.Code = 200
  348. t_R.Msg = "ok"
  349. t_R.Data = device
  350. b, _ := msgpack.Marshal(&t_R)
  351. _ = lib.Nats.Publish(m.Reply, b)
  352. })
  353. // 请求-响应 获取设备数据列表
  354. _, _ = lib.Nats.Subscribe("Cold_ReadDeviceDataListBy_T_snid", func(m *nats.Msg) {
  355. fmt.Printf("Cold_ReadDeviceDataListBy_T_snid message: %s\n", string(m.Data))
  356. type T_Req struct {
  357. T_snid string `xml:"T_snid"`
  358. Time_start string `xml:"Time_start"`
  359. Time_end string `xml:"Time_end"`
  360. Page int `xml:"Page"`
  361. Page_z int `xml:"Page_z"`
  362. }
  363. type T_R struct {
  364. Code int16 `xml:"Code"`
  365. Msg string `xml:"Msg"`
  366. Count int64 `xml:"Count"`
  367. Data []Device.DeviceData_R `xml:"Data"` // 泛型
  368. }
  369. var t_Req T_Req
  370. var t_R T_R
  371. err := msgpack.Unmarshal(m.Data, &t_Req)
  372. if err != nil {
  373. t_R.Code = 201
  374. t_R.Msg = "Unmarshal error"
  375. b, _ := msgpack.Marshal(&t_R)
  376. _ = lib.Nats.Publish(m.Reply, b)
  377. return
  378. }
  379. deviceData, cnt := Device.Read_DeviceData_By_T_snid_List(t_Req.T_snid, t_Req.Time_start, t_Req.Time_end, t_Req.Page, t_Req.Page_z)
  380. t_R.Code = 200
  381. t_R.Msg = "ok"
  382. t_R.Count = cnt
  383. t_R.Data = deviceData
  384. b, _ := msgpack.Marshal(&t_R)
  385. _ = lib.Nats.Publish(m.Reply, b)
  386. })
  387. }