Nats.go 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268
  1. package Nats
  2. import (
  3. "ERP_storage/conf"
  4. "ERP_storage/models/Account"
  5. "ERP_storage/models/Contract"
  6. "fmt"
  7. menulibs "git.baozhida.cn/ERP_libs/Menu"
  8. powerlibs "git.baozhida.cn/ERP_libs/Power"
  9. "git.baozhida.cn/ERP_libs/lib"
  10. "github.com/astaxie/beego/logs"
  11. "github.com/beego/beego/v2/adapter/orm"
  12. "github.com/nats-io/nats.go"
  13. "github.com/vmihailenco/msgpack/v5"
  14. )
  15. var Nats *nats.Conn
  16. func init() {
  17. fmt.Println("============Nats init============")
  18. var err error
  19. // 连接Nats服务器
  20. Nats, err = nats.Connect("nats://" + conf.NatsServer_Url)
  21. if err != nil {
  22. fmt.Println("nats 连接失败!")
  23. panic(err)
  24. }
  25. fmt.Println("nats OK!")
  26. go NatsInit()
  27. }
  28. func NatsInit() {
  29. // 发布-订阅 模式,异步订阅 test1
  30. _, _ = Nats.Subscribe(conf.NatsSubj_Prefix+conf.Sys_Name+"_Read_Menu_List", func(m *nats.Msg) {
  31. var t_R lib.JSONS
  32. o := orm.NewOrm()
  33. MenuDao := menulibs.NewMenu(o)
  34. menu, err := MenuDao.Read_Menu_List()
  35. if err != nil {
  36. logs.Error("Mats", lib.FuncName(), err)
  37. t_R.Code = 202
  38. t_R.Msg = "获取失败!"
  39. b, _ := msgpack.Marshal(&t_R)
  40. _ = Nats.Publish(m.Reply, b)
  41. return
  42. }
  43. t_R.Code = 200
  44. t_R.Msg = "ok"
  45. t_R.Data = menu
  46. b, _ := msgpack.Marshal(&t_R)
  47. _ = Nats.Publish(m.Reply, b)
  48. })
  49. _, _ = Nats.Subscribe(conf.NatsSubj_Prefix+conf.Sys_Name+"_Read_User_Bind_Menu_List", func(m *nats.Msg) {
  50. var t_R lib.JSONS
  51. o := orm.NewOrm()
  52. powerDao := powerlibs.NewPower(o)
  53. power, err := powerDao.Read_Power_ByT_id(string(m.Data))
  54. if err != nil {
  55. logs.Error("Mats", lib.FuncName(), err)
  56. t_R.Code = 202
  57. t_R.Msg = err.Error()
  58. b, _ := msgpack.Marshal(&t_R)
  59. _ = Nats.Publish(m.Reply, b)
  60. return
  61. }
  62. MenuDao := menulibs.NewMenu(o)
  63. menu, err := MenuDao.Read_Menu_List_ByPower_T_Menu(power.T_menu)
  64. if err != nil {
  65. logs.Error("Mats", lib.FuncName(), err)
  66. t_R.Code = 202
  67. t_R.Msg = err.Error()
  68. b, _ := msgpack.Marshal(&t_R)
  69. _ = Nats.Publish(m.Reply, b)
  70. return
  71. }
  72. t_R.Code = 200
  73. t_R.Msg = "ok"
  74. t_R.Data = menu
  75. b, _ := msgpack.Marshal(&t_R)
  76. _ = Nats.Publish(m.Reply, b)
  77. })
  78. _, _ = Nats.Subscribe(conf.NatsSubj_Prefix+conf.Sys_Name+"_Add_Power", func(m *nats.Msg) {
  79. var t_Req powerlibs.Power
  80. var t_R lib.JSONS
  81. err := msgpack.Unmarshal(m.Data, &t_Req)
  82. if err != nil {
  83. logs.Error("Mats", lib.FuncName(), err)
  84. t_R.Code = 202
  85. t_R.Msg = err.Error()
  86. b, _ := msgpack.Marshal(&t_R)
  87. _ = Nats.Publish(m.Reply, b)
  88. return
  89. }
  90. fmt.Printf("ERP_storage_Add_Power message: %+v\n", t_Req)
  91. o := orm.NewOrm()
  92. powerDao := powerlibs.NewPower(o)
  93. id, err := powerDao.Add_Power(t_Req)
  94. if err != nil {
  95. logs.Error("Mats", lib.FuncName(), err)
  96. t_R.Code = 202
  97. t_R.Msg = err.Error()
  98. b, _ := msgpack.Marshal(&t_R)
  99. _ = Nats.Publish(m.Reply, b)
  100. return
  101. }
  102. t_R.Code = 200
  103. t_R.Msg = "ok"
  104. t_R.Data = id
  105. b, _ := msgpack.Marshal(&t_R)
  106. _ = Nats.Publish(m.Reply, b)
  107. })
  108. _, _ = Nats.Subscribe(conf.NatsSubj_Prefix+conf.Sys_Name+"_Read_Power_ByT_id", func(m *nats.Msg) {
  109. fmt.Printf("ERP_storage_Read_Power_ByT_id message: %+v\n", string(m.Data))
  110. var t_R lib.JSONS
  111. o := orm.NewOrm()
  112. powerDao := powerlibs.NewPower(o)
  113. power, err := powerDao.Read_Power_ByT_id(string(m.Data))
  114. if err != nil {
  115. logs.Error("Mats", lib.FuncName(), err)
  116. t_R.Code = 202
  117. t_R.Msg = err.Error()
  118. b, _ := msgpack.Marshal(&t_R)
  119. _ = Nats.Publish(m.Reply, b)
  120. return
  121. }
  122. t_R.Code = 200
  123. t_R.Msg = "ok"
  124. t_R.Data = power
  125. b, _ := msgpack.Marshal(&t_R)
  126. _ = Nats.Publish(m.Reply, b)
  127. })
  128. _, _ = Nats.Subscribe(conf.NatsSubj_Prefix+conf.Sys_Name+"_Update_Power", func(m *nats.Msg) {
  129. var t_Req powerlibs.Power
  130. var t_R lib.JSONS
  131. err := msgpack.Unmarshal(m.Data, &t_Req)
  132. if err != nil {
  133. logs.Error("Mats", lib.FuncName(), err)
  134. t_R.Code = 202
  135. t_R.Msg = err.Error()
  136. b, _ := msgpack.Marshal(&t_R)
  137. _ = Nats.Publish(m.Reply, b)
  138. return
  139. }
  140. fmt.Printf("ERP_storage_Update_Power message: %+v\n", t_Req)
  141. o := orm.NewOrm()
  142. powerDao := powerlibs.NewPower(o)
  143. id, err := powerDao.Update_Power(t_Req)
  144. if err != nil {
  145. logs.Error("Mats", lib.FuncName(), err)
  146. t_R.Code = 202
  147. t_R.Msg = err.Error()
  148. b, _ := msgpack.Marshal(&t_R)
  149. _ = Nats.Publish(m.Reply, b)
  150. return
  151. }
  152. APIDao := menulibs.NewAPI(o, Account.RedisCache_API)
  153. APIDao.Redis_API_DelK(t_Req.T_id)
  154. t_R.Code = 200
  155. t_R.Msg = "ok"
  156. t_R.Data = id
  157. b, _ := msgpack.Marshal(&t_R)
  158. _ = Nats.Publish(m.Reply, b)
  159. })
  160. _, _ = Nats.Subscribe(conf.NatsSubj_Prefix+conf.Sys_Name+"_Delete_Power", func(m *nats.Msg) {
  161. var t_Req powerlibs.Power
  162. var t_R lib.JSONS
  163. err := msgpack.Unmarshal(m.Data, &t_Req)
  164. if err != nil {
  165. logs.Error("Mats", lib.FuncName(), err)
  166. t_R.Code = 202
  167. t_R.Msg = err.Error()
  168. b, _ := msgpack.Marshal(&t_R)
  169. _ = Nats.Publish(m.Reply, b)
  170. return
  171. }
  172. fmt.Printf("ERP_storage_Delete_Power message: %+v\n", t_Req)
  173. o := orm.NewOrm()
  174. powerDao := powerlibs.NewPower(o)
  175. id, err := powerDao.Delete_Power(t_Req)
  176. if err != nil {
  177. logs.Error("Mats", lib.FuncName(), err)
  178. t_R.Code = 202
  179. t_R.Msg = err.Error()
  180. b, _ := msgpack.Marshal(&t_R)
  181. _ = Nats.Publish(m.Reply, b)
  182. return
  183. }
  184. APIDao := menulibs.NewAPI(o, Account.RedisCache_API)
  185. APIDao.Redis_API_DelK(t_Req.T_id)
  186. t_R.Code = 200
  187. t_R.Msg = "ok"
  188. t_R.Data = id
  189. b, _ := msgpack.Marshal(&t_R)
  190. _ = Nats.Publish(m.Reply, b)
  191. })
  192. // 更新合同状态
  193. _, _ = Nats.Subscribe(conf.NatsSubj_Prefix+conf.Sys_Name+"_Update_Contract_T_out", func(m *nats.Msg) {
  194. fmt.Printf(conf.Sys_Name+"_Update_Contract_T_out message: %v\n", string(m.Data))
  195. T_number := string(m.Data)
  196. o := orm.NewOrm()
  197. o.Begin()
  198. ContractDao := Contract.NewContract(o)
  199. ContractProductDao := Contract.NewContractProduct(o)
  200. // 1、添加出库单
  201. contract, _ := ContractDao.Read_Contract_ByT_number(T_number)
  202. T_out := 2
  203. // 查询合同产品清单是否全部都为已出库
  204. state := ContractProductDao.Read_ContractProduct_T_State_List(T_number)
  205. if state == 1 {
  206. T_out = 1
  207. }
  208. if state == 3 {
  209. T_out = 3
  210. }
  211. if T_out != contract.T_out {
  212. err := ContractDao.Update_Contract(contract, "T_out")
  213. if err != nil {
  214. o.Rollback()
  215. }
  216. }
  217. })
  218. }