Nats.go 10 KB


  1. package Nats
  2. import (
  3. "ERP_project/conf"
  4. "ERP_project/models/Account"
  5. "encoding/json"
  6. "fmt"
  7. "github.com/astaxie/beego/logs"
  8. "github.com/beego/beego/v2/adapter/orm"
  9. "github.com/nats-io/nats.go"
  10. "github.com/vmihailenco/msgpack/v5"
  11. menulibs "gogs.baozhida.cn/zoie/ERP_libs/Menu"
  12. powerlibs "gogs.baozhida.cn/zoie/ERP_libs/Power"
  13. "gogs.baozhida.cn/zoie/ERP_libs/lib"
  14. )
  15. var Nats *nats.Conn
  16. func init() {
  17. fmt.Println("============Nats init============")
  18. var err error
  19. // 连接Nats服务器
  20. Nats, err = nats.Connect("nats://" + conf.NatsServer_Url)
  21. if err != nil {
  22. fmt.Println("nats 连接失败!")
  23. panic(err)
  24. }
  25. fmt.Println("nats OK!")
  26. go NatsInit()
  27. }
  28. func NatsInit() {
  29. // 发布-订阅 模式,异步订阅 test1
  30. _, _ = Nats.Subscribe(conf.NatsSubj_Prefix+conf.Sys_Name+"_Read_Menu_List", func(m *nats.Msg) {
  31. var t_R lib.JSONS
  32. o := orm.NewOrm()
  33. MenuDao := menulibs.NewMenu(o)
  34. menu, err := MenuDao.Read_Menu_List()
  35. if err != nil {
  36. logs.Error("Mats", lib.FuncName(), err)
  37. t_R.Code = 202
  38. t_R.Msg = "获取失败!"
  39. b, _ := msgpack.Marshal(&t_R)
  40. _ = Nats.Publish(m.Reply, b)
  41. return
  42. }
  43. t_R.Code = 200
  44. t_R.Msg = "ok"
  45. t_R.Data = menu
  46. b, _ := msgpack.Marshal(&t_R)
  47. _ = Nats.Publish(m.Reply, b)
  48. })
  49. _, _ = Nats.Subscribe(conf.NatsSubj_Prefix+conf.Sys_Name+"_Read_User_Bind_Menu_List", func(m *nats.Msg) {
  50. var t_R lib.JSONS
  51. o := orm.NewOrm()
  52. powerDao := powerlibs.NewPower(o)
  53. power, err := powerDao.Read_Power_ByT_id(string(m.Data))
  54. if err != nil {
  55. logs.Error("Mats", lib.FuncName(), err)
  56. t_R.Code = 202
  57. t_R.Msg = err.Error()
  58. b, _ := msgpack.Marshal(&t_R)
  59. _ = Nats.Publish(m.Reply, b)
  60. return
  61. }
  62. MenuDao := menulibs.NewMenu(o)
  63. menu, err := MenuDao.Read_Menu_List_ByPower_T_Menu(power.T_menu)
  64. if err != nil {
  65. logs.Error("Mats", lib.FuncName(), err)
  66. t_R.Code = 202
  67. t_R.Msg = err.Error()
  68. b, _ := msgpack.Marshal(&t_R)
  69. _ = Nats.Publish(m.Reply, b)
  70. return
  71. }
  72. t_R.Code = 200
  73. t_R.Msg = "ok"
  74. t_R.Data = menu
  75. b, _ := msgpack.Marshal(&t_R)
  76. _ = Nats.Publish(m.Reply, b)
  77. })
  78. _, _ = Nats.Subscribe(conf.NatsSubj_Prefix+conf.Sys_Name+"_Add_Power", func(m *nats.Msg) {
  79. var t_Req powerlibs.Power
  80. var t_R lib.JSONS
  81. err := msgpack.Unmarshal(m.Data, &t_Req)
  82. if err != nil {
  83. logs.Error("Mats", lib.FuncName(), err)
  84. t_R.Code = 202
  85. t_R.Msg = err.Error()
  86. b, _ := msgpack.Marshal(&t_R)
  87. _ = Nats.Publish(m.Reply, b)
  88. return
  89. }
  90. fmt.Printf("ERP_project_Add_Power message: %+v\n", t_Req)
  91. o := orm.NewOrm()
  92. powerDao := powerlibs.NewPower(o)
  93. id, err := powerDao.Add_Power(t_Req)
  94. if err != nil {
  95. logs.Error("Mats", lib.FuncName(), err)
  96. t_R.Code = 202
  97. t_R.Msg = err.Error()
  98. b, _ := msgpack.Marshal(&t_R)
  99. _ = Nats.Publish(m.Reply, b)
  100. return
  101. }
  102. t_R.Code = 200
  103. t_R.Msg = "ok"
  104. t_R.Data = id
  105. b, _ := msgpack.Marshal(&t_R)
  106. _ = Nats.Publish(m.Reply, b)
  107. })
  108. _, _ = Nats.Subscribe(conf.NatsSubj_Prefix+conf.Sys_Name+"_Read_Power_ByT_id", func(m *nats.Msg) {
  109. fmt.Printf("ERP_project_Read_Power_ByT_id message: %+v\n", string(m.Data))
  110. var t_R lib.JSONS
  111. o := orm.NewOrm()
  112. powerDao := powerlibs.NewPower(o)
  113. power, err := powerDao.Read_Power_ByT_id(string(m.Data))
  114. if err != nil {
  115. logs.Error("Mats", lib.FuncName(), err)
  116. t_R.Code = 202
  117. t_R.Msg = err.Error()
  118. b, _ := msgpack.Marshal(&t_R)
  119. _ = Nats.Publish(m.Reply, b)
  120. return
  121. }
  122. t_R.Code = 200
  123. t_R.Msg = "ok"
  124. t_R.Data = power
  125. b, _ := msgpack.Marshal(&t_R)
  126. _ = Nats.Publish(m.Reply, b)
  127. })
  128. _, _ = Nats.Subscribe(conf.NatsSubj_Prefix+conf.Sys_Name+"_Update_Power", func(m *nats.Msg) {
  129. var t_Req powerlibs.Power
  130. var t_R lib.JSONS
  131. err := msgpack.Unmarshal(m.Data, &t_Req)
  132. if err != nil {
  133. logs.Error("Mats", lib.FuncName(), err)
  134. t_R.Code = 202
  135. t_R.Msg = err.Error()
  136. b, _ := msgpack.Marshal(&t_R)
  137. _ = Nats.Publish(m.Reply, b)
  138. return
  139. }
  140. fmt.Printf("ERP_project_Update_Power message: %+v\n", t_Req)
  141. o := orm.NewOrm()
  142. powerDao := powerlibs.NewPower(o)
  143. id, err := powerDao.Update_Power(t_Req)
  144. if err != nil {
  145. logs.Error("Mats", lib.FuncName(), err)
  146. t_R.Code = 202
  147. t_R.Msg = err.Error()
  148. b, _ := msgpack.Marshal(&t_R)
  149. _ = Nats.Publish(m.Reply, b)
  150. return
  151. }
  152. APIDao := menulibs.NewAPI(o, Account.RedisCache_API)
  153. APIDao.Redis_API_DelK(t_Req.T_id)
  154. t_R.Code = 200
  155. t_R.Msg = "ok"
  156. t_R.Data = id
  157. b, _ := msgpack.Marshal(&t_R)
  158. _ = Nats.Publish(m.Reply, b)
  159. })
  160. _, _ = Nats.Subscribe(conf.NatsSubj_Prefix+conf.Sys_Name+"_Delete_Power", func(m *nats.Msg) {
  161. var t_Req powerlibs.Power
  162. var t_R lib.JSONS
  163. err := msgpack.Unmarshal(m.Data, &t_Req)
  164. if err != nil {
  165. logs.Error("Mats", lib.FuncName(), err)
  166. t_R.Code = 202
  167. t_R.Msg = err.Error()
  168. b, _ := msgpack.Marshal(&t_R)
  169. _ = Nats.Publish(m.Reply, b)
  170. return
  171. }
  172. fmt.Printf("ERP_project_Delete_Power message: %+v\n", t_Req)
  173. o := orm.NewOrm()
  174. powerDao := powerlibs.NewPower(o)
  175. id, err := powerDao.Delete_Power(t_Req)
  176. if err != nil {
  177. logs.Error("Mats", lib.FuncName(), err)
  178. t_R.Code = 202
  179. t_R.Msg = err.Error()
  180. b, _ := msgpack.Marshal(&t_R)
  181. _ = Nats.Publish(m.Reply, b)
  182. return
  183. }
  184. APIDao := menulibs.NewAPI(o, Account.RedisCache_API)
  185. APIDao.Redis_API_DelK(t_Req.T_id)
  186. t_R.Code = 200
  187. t_R.Msg = "ok"
  188. t_R.Data = id
  189. b, _ := msgpack.Marshal(&t_R)
  190. _ = Nats.Publish(m.Reply, b)
  191. })
  192. // ========== 菜单相关NATS服务 ==========
  193. // 请求-响应 添加菜单
  194. _, _ = Nats.Subscribe(conf.NatsSubj_Prefix+conf.Sys_Name+"_Add_Menu", func(m *nats.Msg) {
  195. type T_S struct {
  196. Menu menulibs.Menu
  197. APIs string // API JSON字符串
  198. }
  199. type T_R struct {
  200. Code int16 `xml:"Code"`
  201. Msg string `xml:"Msg"`
  202. Data int `xml:"Data"`
  203. }
  204. var t_S T_S
  205. var t_R T_R
  206. err := msgpack.Unmarshal(m.Data, &t_S)
  207. if err != nil {
  208. t_R.Code = 202
  209. t_R.Msg = "msgpack unmarshal err!"
  210. b, _ := msgpack.Marshal(&t_R)
  211. _ = Nats.Publish(m.Reply, b)
  212. return
  213. }
  214. o := orm.NewOrm()
  215. MenuDao := menulibs.NewMenu(o)
  216. id64, err := MenuDao.Add_Menu(t_S.Menu)
  217. if err != nil {
  218. t_R.Code = 202
  219. t_R.Msg = "添加菜单失败!"
  220. b, _ := msgpack.Marshal(&t_R)
  221. _ = Nats.Publish(m.Reply, b)
  222. return
  223. }
  224. id := int(id64)
  225. // 处理API JSON字符串
  226. if len(t_S.APIs) > 0 {
  227. var apiList []struct {
  228. T_name string `json:"T_name"`
  229. T_uri string `json:"T_uri"`
  230. T_method string `json:"T_method"`
  231. }
  232. err = json.Unmarshal([]byte(t_S.APIs), &apiList)
  233. if err == nil && len(apiList) > 0 {
  234. var apis []menulibs.API
  235. for _, api := range apiList {
  236. if len(api.T_method) == 0 {
  237. api.T_method = "POST"
  238. }
  239. apis = append(apis, menulibs.API{
  240. T_Menu_Id: id,
  241. T_name: api.T_name,
  242. T_uri: api.T_uri,
  243. T_method: api.T_method,
  244. T_enable: 1,
  245. })
  246. }
  247. APIDao := menulibs.NewAPI(o, Account.RedisCache_API)
  248. _, _ = APIDao.InsertMulti_API(apis)
  249. }
  250. }
  251. t_R.Code = 200
  252. t_R.Msg = "ok"
  253. t_R.Data = id
  254. b, _ := msgpack.Marshal(&t_R)
  255. _ = Nats.Publish(m.Reply, b)
  256. })
  257. // 请求-响应 更新菜单
  258. _, _ = Nats.Subscribe(conf.NatsSubj_Prefix+conf.Sys_Name+"_Update_Menu", func(m *nats.Msg) {
  259. type T_S struct {
  260. Menu menulibs.Menu
  261. Cols []string // 需要更新的字段
  262. APIs string // API JSON字符串
  263. }
  264. type T_R struct {
  265. Code int16 `xml:"Code"`
  266. Msg string `xml:"Msg"`
  267. Data int `xml:"Data"`
  268. }
  269. var t_S T_S
  270. var t_R T_R
  271. err := msgpack.Unmarshal(m.Data, &t_S)
  272. if err != nil {
  273. t_R.Code = 202
  274. t_R.Msg = "msgpack unmarshal err!"
  275. b, _ := msgpack.Marshal(&t_R)
  276. _ = Nats.Publish(m.Reply, b)
  277. return
  278. }
  279. o := orm.NewOrm()
  280. MenuDao := menulibs.NewMenu(o)
  281. APIDao := menulibs.NewAPI(o, Account.RedisCache_API)
  282. // 更新菜单
  283. if len(t_S.Cols) > 0 {
  284. _, err = MenuDao.Update_Menu(t_S.Menu, t_S.Cols...)
  285. if err != nil {
  286. t_R.Code = 202
  287. t_R.Msg = "更新菜单失败!"
  288. b, _ := msgpack.Marshal(&t_R)
  289. _ = Nats.Publish(m.Reply, b)
  290. return
  291. }
  292. }
  293. // 处理API JSON字符串
  294. if len(t_S.APIs) > 0 {
  295. // 先删除旧的API(软删除)
  296. _, _ = APIDao.Delete_API_ByMenuId(t_S.Menu.Id)
  297. var apiList []struct {
  298. T_name string `json:"T_name"`
  299. T_uri string `json:"T_uri"`
  300. T_method string `json:"T_method"`
  301. }
  302. err = json.Unmarshal([]byte(t_S.APIs), &apiList)
  303. if err == nil && len(apiList) > 0 {
  304. var apis []menulibs.API
  305. for _, api := range apiList {
  306. if len(api.T_method) == 0 {
  307. api.T_method = "POST"
  308. }
  309. apis = append(apis, menulibs.API{
  310. T_Menu_Id: t_S.Menu.Id,
  311. T_name: api.T_name,
  312. T_uri: api.T_uri,
  313. T_method: api.T_method,
  314. T_enable: 1,
  315. })
  316. }
  317. _, _ = APIDao.InsertMulti_API(apis)
  318. }
  319. }
  320. t_R.Code = 200
  321. t_R.Msg = "ok"
  322. t_R.Data = t_S.Menu.Id
  323. b, _ := msgpack.Marshal(&t_R)
  324. _ = Nats.Publish(m.Reply, b)
  325. })
  326. // 请求-响应 删除菜单
  327. _, _ = Nats.Subscribe(conf.NatsSubj_Prefix+conf.Sys_Name+"_Delete_Menu", func(m *nats.Msg) {
  328. type T_R struct {
  329. Code int16 `xml:"Code"`
  330. Msg string `xml:"Msg"`
  331. Data int `xml:"Data"`
  332. }
  333. var menu menulibs.Menu
  334. var t_R T_R
  335. err := msgpack.Unmarshal(m.Data, &menu)
  336. if err != nil {
  337. t_R.Code = 202
  338. t_R.Msg = "msgpack unmarshal err!"
  339. b, _ := msgpack.Marshal(&t_R)
  340. _ = Nats.Publish(m.Reply, b)
  341. return
  342. }
  343. o := orm.NewOrm()
  344. MenuDao := menulibs.NewMenu(o)
  345. _, err = MenuDao.Delete_Menu(menu)
  346. if err != nil {
  347. t_R.Code = 202
  348. t_R.Msg = "删除菜单失败!"
  349. b, _ := msgpack.Marshal(&t_R)
  350. _ = Nats.Publish(m.Reply, b)
  351. return
  352. }
  353. t_R.Code = 200
  354. t_R.Msg = "ok"
  355. t_R.Data = menu.Id
  356. b, _ := msgpack.Marshal(&t_R)
  357. _ = Nats.Publish(m.Reply, b)
  358. })
  359. // 请求-响应 根据ID获取菜单
  360. _, _ = Nats.Subscribe(conf.NatsSubj_Prefix+conf.Sys_Name+"_Read_Menu_ById", func(m *nats.Msg) {
  361. type T_R struct {
  362. Code int16 `xml:"Code"`
  363. Msg string `xml:"Msg"`
  364. Data menulibs.Menu `xml:"Data"`
  365. }
  366. var id int
  367. var t_R T_R
  368. err := msgpack.Unmarshal(m.Data, &id)
  369. if err != nil {
  370. t_R.Code = 202
  371. t_R.Msg = "msgpack unmarshal err!"
  372. b, _ := msgpack.Marshal(&t_R)
  373. _ = Nats.Publish(m.Reply, b)
  374. return
  375. }
  376. o := orm.NewOrm()
  377. MenuDao := menulibs.NewMenu(o)
  378. menu, err := MenuDao.Read_Menu_ById(id)
  379. if err != nil {
  380. t_R.Code = 202
  381. t_R.Msg = "查询菜单失败!"
  382. b, _ := msgpack.Marshal(&t_R)
  383. _ = Nats.Publish(m.Reply, b)
  384. return
  385. }
  386. t_R.Code = 200
  387. t_R.Msg = "ok"
  388. t_R.Data = menu
  389. b, _ := msgpack.Marshal(&t_R)
  390. _ = Nats.Publish(m.Reply, b)
  391. })
  392. }