Nats.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494
  1. package Nats
  2. import (
  3. "ERP_storage/conf"
  4. "ERP_storage/models/Account"
  5. "ERP_storage/models/Contract"
  6. "encoding/json"
  7. "fmt"
  8. "github.com/astaxie/beego/logs"
  9. "github.com/beego/beego/v2/adapter/orm"
  10. "github.com/nats-io/nats.go"
  11. "github.com/vmihailenco/msgpack/v5"
  12. menulibs "gogs.baozhida.cn/zoie/ERP_libs/Menu"
  13. powerlibs "gogs.baozhida.cn/zoie/ERP_libs/Power"
  14. "gogs.baozhida.cn/zoie/ERP_libs/lib"
  15. )
  16. var Nats *nats.Conn
  17. func init() {
  18. fmt.Println("============Nats init============")
  19. var err error
  20. // 连接Nats服务器
  21. Nats, err = nats.Connect("nats://" + conf.NatsServer_Url)
  22. if err != nil {
  23. fmt.Println("nats 连接失败!")
  24. panic(err)
  25. }
  26. fmt.Println("nats OK!")
  27. go NatsInit()
  28. }
  29. func NatsInit() {
  30. // 发布-订阅 模式,异步订阅 test1
  31. _, _ = Nats.Subscribe(conf.NatsSubj_Prefix+conf.Sys_Name+"_Read_Menu_List", func(m *nats.Msg) {
  32. var t_R lib.JSONS
  33. o := orm.NewOrm()
  34. MenuDao := menulibs.NewMenu(o)
  35. menu, err := MenuDao.Read_Menu_List()
  36. if err != nil {
  37. logs.Error("Mats", lib.FuncName(), err)
  38. t_R.Code = 202
  39. t_R.Msg = "获取失败!"
  40. b, _ := msgpack.Marshal(&t_R)
  41. _ = Nats.Publish(m.Reply, b)
  42. return
  43. }
  44. t_R.Code = 200
  45. t_R.Msg = "ok"
  46. t_R.Data = menu
  47. b, _ := msgpack.Marshal(&t_R)
  48. _ = Nats.Publish(m.Reply, b)
  49. })
  50. _, _ = Nats.Subscribe(conf.NatsSubj_Prefix+conf.Sys_Name+"_Read_User_Bind_Menu_List", func(m *nats.Msg) {
  51. var t_R lib.JSONS
  52. o := orm.NewOrm()
  53. powerDao := powerlibs.NewPower(o)
  54. power, err := powerDao.Read_Power_ByT_id(string(m.Data))
  55. if err != nil {
  56. logs.Error("Mats", lib.FuncName(), err)
  57. t_R.Code = 202
  58. t_R.Msg = err.Error()
  59. b, _ := msgpack.Marshal(&t_R)
  60. _ = Nats.Publish(m.Reply, b)
  61. return
  62. }
  63. MenuDao := menulibs.NewMenu(o)
  64. menu, err := MenuDao.Read_Menu_List_ByPower_T_Menu(power.T_menu)
  65. if err != nil {
  66. logs.Error("Mats", lib.FuncName(), err)
  67. t_R.Code = 202
  68. t_R.Msg = err.Error()
  69. b, _ := msgpack.Marshal(&t_R)
  70. _ = Nats.Publish(m.Reply, b)
  71. return
  72. }
  73. t_R.Code = 200
  74. t_R.Msg = "ok"
  75. t_R.Data = menu
  76. b, _ := msgpack.Marshal(&t_R)
  77. _ = Nats.Publish(m.Reply, b)
  78. })
  79. _, _ = Nats.Subscribe(conf.NatsSubj_Prefix+conf.Sys_Name+"_Add_Power", func(m *nats.Msg) {
  80. var t_Req powerlibs.Power
  81. var t_R lib.JSONS
  82. err := msgpack.Unmarshal(m.Data, &t_Req)
  83. if err != nil {
  84. logs.Error("Mats", lib.FuncName(), err)
  85. t_R.Code = 202
  86. t_R.Msg = err.Error()
  87. b, _ := msgpack.Marshal(&t_R)
  88. _ = Nats.Publish(m.Reply, b)
  89. return
  90. }
  91. fmt.Printf("ERP_storage_Add_Power message: %+v\n", t_Req)
  92. o := orm.NewOrm()
  93. powerDao := powerlibs.NewPower(o)
  94. id, err := powerDao.Add_Power(t_Req)
  95. if err != nil {
  96. logs.Error("Mats", lib.FuncName(), err)
  97. t_R.Code = 202
  98. t_R.Msg = err.Error()
  99. b, _ := msgpack.Marshal(&t_R)
  100. _ = Nats.Publish(m.Reply, b)
  101. return
  102. }
  103. t_R.Code = 200
  104. t_R.Msg = "ok"
  105. t_R.Data = id
  106. b, _ := msgpack.Marshal(&t_R)
  107. _ = Nats.Publish(m.Reply, b)
  108. })
  109. _, _ = Nats.Subscribe(conf.NatsSubj_Prefix+conf.Sys_Name+"_Read_Power_ByT_id", func(m *nats.Msg) {
  110. fmt.Printf("ERP_storage_Read_Power_ByT_id message: %+v\n", string(m.Data))
  111. var t_R lib.JSONS
  112. o := orm.NewOrm()
  113. powerDao := powerlibs.NewPower(o)
  114. power, err := powerDao.Read_Power_ByT_id(string(m.Data))
  115. if err != nil {
  116. logs.Error("Mats", lib.FuncName(), err)
  117. t_R.Code = 202
  118. t_R.Msg = err.Error()
  119. b, _ := msgpack.Marshal(&t_R)
  120. _ = Nats.Publish(m.Reply, b)
  121. return
  122. }
  123. t_R.Code = 200
  124. t_R.Msg = "ok"
  125. t_R.Data = power
  126. b, _ := msgpack.Marshal(&t_R)
  127. _ = Nats.Publish(m.Reply, b)
  128. })
  129. _, _ = Nats.Subscribe(conf.NatsSubj_Prefix+conf.Sys_Name+"_Update_Power", func(m *nats.Msg) {
  130. var t_Req powerlibs.Power
  131. var t_R lib.JSONS
  132. err := msgpack.Unmarshal(m.Data, &t_Req)
  133. if err != nil {
  134. logs.Error("Mats", lib.FuncName(), err)
  135. t_R.Code = 202
  136. t_R.Msg = err.Error()
  137. b, _ := msgpack.Marshal(&t_R)
  138. _ = Nats.Publish(m.Reply, b)
  139. return
  140. }
  141. fmt.Printf("ERP_storage_Update_Power message: %+v\n", t_Req)
  142. o := orm.NewOrm()
  143. powerDao := powerlibs.NewPower(o)
  144. id, err := powerDao.Update_Power(t_Req)
  145. if err != nil {
  146. logs.Error("Mats", lib.FuncName(), err)
  147. t_R.Code = 202
  148. t_R.Msg = err.Error()
  149. b, _ := msgpack.Marshal(&t_R)
  150. _ = Nats.Publish(m.Reply, b)
  151. return
  152. }
  153. APIDao := menulibs.NewAPI(o, Account.RedisCache_API)
  154. APIDao.Redis_API_DelK(t_Req.T_id)
  155. t_R.Code = 200
  156. t_R.Msg = "ok"
  157. t_R.Data = id
  158. b, _ := msgpack.Marshal(&t_R)
  159. _ = Nats.Publish(m.Reply, b)
  160. })
  161. _, _ = Nats.Subscribe(conf.NatsSubj_Prefix+conf.Sys_Name+"_Delete_Power", func(m *nats.Msg) {
  162. var t_Req powerlibs.Power
  163. var t_R lib.JSONS
  164. err := msgpack.Unmarshal(m.Data, &t_Req)
  165. if err != nil {
  166. logs.Error("Mats", lib.FuncName(), err)
  167. t_R.Code = 202
  168. t_R.Msg = err.Error()
  169. b, _ := msgpack.Marshal(&t_R)
  170. _ = Nats.Publish(m.Reply, b)
  171. return
  172. }
  173. fmt.Printf("ERP_storage_Delete_Power message: %+v\n", t_Req)
  174. o := orm.NewOrm()
  175. powerDao := powerlibs.NewPower(o)
  176. id, err := powerDao.Delete_Power(t_Req)
  177. if err != nil {
  178. logs.Error("Mats", lib.FuncName(), err)
  179. t_R.Code = 202
  180. t_R.Msg = err.Error()
  181. b, _ := msgpack.Marshal(&t_R)
  182. _ = Nats.Publish(m.Reply, b)
  183. return
  184. }
  185. APIDao := menulibs.NewAPI(o, Account.RedisCache_API)
  186. APIDao.Redis_API_DelK(t_Req.T_id)
  187. t_R.Code = 200
  188. t_R.Msg = "ok"
  189. t_R.Data = id
  190. b, _ := msgpack.Marshal(&t_R)
  191. _ = Nats.Publish(m.Reply, b)
  192. })
  193. // 更新合同状态
  194. _, _ = Nats.Subscribe(conf.NatsSubj_Prefix+conf.Sys_Name+"_Update_Contract_T_out", func(m *nats.Msg) {
  195. fmt.Printf(conf.Sys_Name+"_Update_Contract_T_out message: %v\n", string(m.Data))
  196. T_number := string(m.Data)
  197. o := orm.NewOrm()
  198. o.Begin()
  199. ContractDao := Contract.NewContract(o)
  200. ContractProductDao := Contract.NewContractProduct(o)
  201. // 1、添加出库单
  202. contract, _ := ContractDao.Read_Contract_ByT_number(T_number)
  203. T_out := 2
  204. // 查询合同产品清单是否全部都为已出库
  205. state := ContractProductDao.Read_ContractProduct_T_State_List(T_number)
  206. if state == 1 {
  207. T_out = 1
  208. }
  209. if state == 3 {
  210. T_out = 3
  211. }
  212. if T_out != contract.T_out {
  213. err := ContractDao.Update_Contract(contract, "T_out")
  214. if err != nil {
  215. o.Rollback()
  216. }
  217. }
  218. })
  219. // ========== 菜单相关NATS服务 ==========
  220. // 请求-响应 添加菜单
  221. _, _ = Nats.Subscribe(conf.NatsSubj_Prefix+conf.Sys_Name+"_Add_Menu", func(m *nats.Msg) {
  222. type T_S struct {
  223. Menu menulibs.Menu
  224. APIs string // API JSON字符串
  225. }
  226. type T_R struct {
  227. Code int16 `xml:"Code"`
  228. Msg string `xml:"Msg"`
  229. Data int `xml:"Data"`
  230. }
  231. var t_S T_S
  232. var t_R T_R
  233. err := msgpack.Unmarshal(m.Data, &t_S)
  234. if err != nil {
  235. t_R.Code = 202
  236. t_R.Msg = "msgpack unmarshal err!"
  237. b, _ := msgpack.Marshal(&t_R)
  238. _ = Nats.Publish(m.Reply, b)
  239. return
  240. }
  241. o := orm.NewOrm()
  242. MenuDao := menulibs.NewMenu(o)
  243. id64, err := MenuDao.Add_Menu(t_S.Menu)
  244. if err != nil {
  245. t_R.Code = 202
  246. t_R.Msg = "添加菜单失败!"
  247. b, _ := msgpack.Marshal(&t_R)
  248. _ = Nats.Publish(m.Reply, b)
  249. return
  250. }
  251. id := int(id64)
  252. // 处理API JSON字符串
  253. if len(t_S.APIs) > 0 {
  254. var apiList []struct {
  255. T_name string `json:"T_name"`
  256. T_uri string `json:"T_uri"`
  257. T_method string `json:"T_method"`
  258. }
  259. err = json.Unmarshal([]byte(t_S.APIs), &apiList)
  260. if err == nil && len(apiList) > 0 {
  261. var apis []menulibs.API
  262. for _, api := range apiList {
  263. if len(api.T_method) == 0 {
  264. api.T_method = "POST"
  265. }
  266. apis = append(apis, menulibs.API{
  267. T_Menu_Id: id,
  268. T_name: api.T_name,
  269. T_uri: api.T_uri,
  270. T_method: api.T_method,
  271. T_enable: 1,
  272. })
  273. }
  274. APIDao := menulibs.NewAPI(o, Account.RedisCache_API)
  275. _, _ = APIDao.InsertMulti_API(apis)
  276. }
  277. }
  278. t_R.Code = 200
  279. t_R.Msg = "ok"
  280. t_R.Data = id
  281. b, _ := msgpack.Marshal(&t_R)
  282. _ = Nats.Publish(m.Reply, b)
  283. })
  284. // 请求-响应 更新菜单
  285. _, _ = Nats.Subscribe(conf.NatsSubj_Prefix+conf.Sys_Name+"_Update_Menu", func(m *nats.Msg) {
  286. type T_S struct {
  287. Menu menulibs.Menu
  288. Cols []string // 需要更新的字段
  289. APIs string // API JSON字符串
  290. }
  291. type T_R struct {
  292. Code int16 `xml:"Code"`
  293. Msg string `xml:"Msg"`
  294. Data int `xml:"Data"`
  295. }
  296. var t_S T_S
  297. var t_R T_R
  298. err := msgpack.Unmarshal(m.Data, &t_S)
  299. if err != nil {
  300. t_R.Code = 202
  301. t_R.Msg = "msgpack unmarshal err!"
  302. b, _ := msgpack.Marshal(&t_R)
  303. _ = Nats.Publish(m.Reply, b)
  304. return
  305. }
  306. o := orm.NewOrm()
  307. MenuDao := menulibs.NewMenu(o)
  308. APIDao := menulibs.NewAPI(o, Account.RedisCache_API)
  309. // 更新菜单
  310. if len(t_S.Cols) > 0 {
  311. _, err = MenuDao.Update_Menu(t_S.Menu, t_S.Cols...)
  312. if err != nil {
  313. t_R.Code = 202
  314. t_R.Msg = "更新菜单失败!"
  315. b, _ := msgpack.Marshal(&t_R)
  316. _ = Nats.Publish(m.Reply, b)
  317. return
  318. }
  319. }
  320. // 处理API JSON字符串
  321. if len(t_S.APIs) > 0 {
  322. // 先删除旧的API(软删除)
  323. _, _ = APIDao.Delete_API_ByMenuId(t_S.Menu.Id)
  324. var apiList []struct {
  325. T_name string `json:"T_name"`
  326. T_uri string `json:"T_uri"`
  327. T_method string `json:"T_method"`
  328. }
  329. err = json.Unmarshal([]byte(t_S.APIs), &apiList)
  330. if err == nil && len(apiList) > 0 {
  331. var apis []menulibs.API
  332. for _, api := range apiList {
  333. if len(api.T_method) == 0 {
  334. api.T_method = "POST"
  335. }
  336. apis = append(apis, menulibs.API{
  337. T_Menu_Id: t_S.Menu.Id,
  338. T_name: api.T_name,
  339. T_uri: api.T_uri,
  340. T_method: api.T_method,
  341. T_enable: 1,
  342. })
  343. }
  344. _, _ = APIDao.InsertMulti_API(apis)
  345. }
  346. }
  347. t_R.Code = 200
  348. t_R.Msg = "ok"
  349. t_R.Data = t_S.Menu.Id
  350. b, _ := msgpack.Marshal(&t_R)
  351. _ = Nats.Publish(m.Reply, b)
  352. })
  353. // 请求-响应 删除菜单
  354. _, _ = Nats.Subscribe(conf.NatsSubj_Prefix+conf.Sys_Name+"_Delete_Menu", func(m *nats.Msg) {
  355. type T_R struct {
  356. Code int16 `xml:"Code"`
  357. Msg string `xml:"Msg"`
  358. Data int `xml:"Data"`
  359. }
  360. var menu menulibs.Menu
  361. var t_R T_R
  362. err := msgpack.Unmarshal(m.Data, &menu)
  363. if err != nil {
  364. t_R.Code = 202
  365. t_R.Msg = "msgpack unmarshal err!"
  366. b, _ := msgpack.Marshal(&t_R)
  367. _ = Nats.Publish(m.Reply, b)
  368. return
  369. }
  370. o := orm.NewOrm()
  371. MenuDao := menulibs.NewMenu(o)
  372. _, err = MenuDao.Delete_Menu(menu)
  373. if err != nil {
  374. t_R.Code = 202
  375. t_R.Msg = "删除菜单失败!"
  376. b, _ := msgpack.Marshal(&t_R)
  377. _ = Nats.Publish(m.Reply, b)
  378. return
  379. }
  380. t_R.Code = 200
  381. t_R.Msg = "ok"
  382. t_R.Data = menu.Id
  383. b, _ := msgpack.Marshal(&t_R)
  384. _ = Nats.Publish(m.Reply, b)
  385. })
  386. // 请求-响应 根据ID获取菜单
  387. _, _ = Nats.Subscribe(conf.NatsSubj_Prefix+conf.Sys_Name+"_Read_Menu_ById", func(m *nats.Msg) {
  388. type T_R struct {
  389. Code int16 `xml:"Code"`
  390. Msg string `xml:"Msg"`
  391. Data menulibs.Menu `xml:"Data"`
  392. }
  393. var id int
  394. var t_R T_R
  395. err := msgpack.Unmarshal(m.Data, &id)
  396. if err != nil {
  397. t_R.Code = 202
  398. t_R.Msg = "msgpack unmarshal err!"
  399. b, _ := msgpack.Marshal(&t_R)
  400. _ = Nats.Publish(m.Reply, b)
  401. return
  402. }
  403. o := orm.NewOrm()
  404. MenuDao := menulibs.NewMenu(o)
  405. menu, err := MenuDao.Read_Menu_ById(id)
  406. if err != nil {
  407. t_R.Code = 202
  408. t_R.Msg = "查询菜单失败!"
  409. b, _ := msgpack.Marshal(&t_R)
  410. _ = Nats.Publish(m.Reply, b)
  411. return
  412. }
  413. t_R.Code = 200
  414. t_R.Msg = "ok"
  415. t_R.Data = menu
  416. b, _ := msgpack.Marshal(&t_R)
  417. _ = Nats.Publish(m.Reply, b)
  418. })
  419. }