WebSocket.go 5.1 KB


  1. package WebSocket
  2. import (
  3. "Cold_Api/Nats/NatsServer"
  4. "Cold_Api/controllers/lib"
  5. "Cold_Api/logs"
  6. "Cold_Api/models/System"
  7. "container/list"
  8. "encoding/json"
  9. "fmt"
  10. beego "github.com/beego/beego/v2/server/web"
  11. "github.com/gorilla/websocket"
  12. "net/http"
  13. "sync"
  14. "time"
  15. )
  16. // WebSocketController handles WebSocket requests.
  17. type WebSocketController struct {
  18. beego.Controller
  19. }
  20. // 网关 -》服务端 json 通用模板
  21. type Event_w struct {
  22. Type int // JOIN, LEAVE, MESSAGE
  23. Timestamp int // Unix timestamp (secs)
  24. Content interface{}
  25. }
  26. type Ms_Project struct {
  27. Cmd string `json:"Cmd"`
  28. Sn string `json:"Sn"`
  29. }
  30. // Event archives.
  31. var archive = list.New()
  32. type WsConn struct {
  33. Conn *websocket.Conn
  34. Mux sync.RWMutex
  35. }
  36. var (
  37. countryCapitalMap map[string]WsConn /*创建集合 */
  38. )
  39. func Join_wc(user string, ws *websocket.Conn) bool {
  40. // 有先加入 给全部人发消息
  41. _, ok := countryCapitalMap[user] /*如果确定是真实的,则存在,否则不存在 */
  42. if ok {
  43. fmt.Println(user + " 重复")
  44. countryCapitalMap[user] = WsConn{Conn: ws}
  45. return true
  46. } else {
  47. fmt.Println(user + " 注册成功")
  48. countryCapitalMap[user] = WsConn{Conn: ws}
  49. return false
  50. }
  51. }
  52. func Leave(user string) {
  53. fmt.Println("注销:" + user)
  54. for k, _ := range lib.CountrySnMap {
  55. _, ok := lib.CountrySnMap[k].Uuid_list[user]
  56. if ok {
  57. fmt.Println("清楚成功 用户! KEY:", k, " Uuid:", lib.CountrySnMap[k].Uuid_list[user])
  58. delete(lib.CountrySnMap[k].Uuid_list, user)
  59. if len(lib.CountrySnMap[k].Uuid_list) == 0 {
  60. fmt.Println("清楚成功 SN! KEY:", k)
  61. delete(lib.CountrySnMap, k)
  62. }
  63. }
  64. }
  65. delete(countryCapitalMap, user)
  66. }
  67. // This function handles all incoming chan messages.
  68. func chatroom() {
  69. countryCapitalMap = make(map[string]WsConn)
  70. }
  71. func init() {
  72. go chatroom()
  73. go Realtime()
  74. }
  75. // 连接 注册 Join method handles WebSocket requests for WebSocketController.
  76. func (this *WebSocketController) Join() {
  77. // 验证登录
  78. b_, admin_r := lib.Verification(this.Ctx.GetCookie("User_tokey"), this.GetString("User_tokey"))
  79. if !b_ {
  80. this.Redirect("/", 302)
  81. return
  82. }
  83. // Upgrade from http request to WebSocket.
  84. ws, err := websocket.Upgrade(this.Ctx.ResponseWriter, this.Ctx.Request, nil, 1024, 1024)
  85. if _, ok := err.(websocket.HandshakeError); ok {
  86. http.Error(this.Ctx.ResponseWriter, "Not a websocket handshake", 400)
  87. return
  88. } else if err != nil {
  89. fmt.Println("无法设置WebSocket连接:", err)
  90. return
  91. }
  92. // Join chat room.
  93. is := Join_wc(admin_r.Admin_uuid, ws)
  94. if !is {
  95. defer Leave(admin_r.Admin_uuid) // 退后 会自动执行
  96. time.Sleep(3 * time.Second)
  97. for {
  98. _, p, err := ws.ReadMessage()
  99. if err != nil {
  100. return
  101. }
  102. fmt.Println("============= WebSocket JSON =============")
  103. fmt.Println(admin_r.Admin_uuid, "收到信息:", string(p))
  104. var Ms_project Ms_Project
  105. err = json.Unmarshal(p, &Ms_project)
  106. if err != nil {
  107. System.Add_Logs("WebSocket", "JSON反序列化失败[Ms_Project]", string(p))
  108. fmt.Println("JSON反序列化失败[Ms_Project],err=", err)
  109. return
  110. }
  111. //fmt.Println("Cmd:", Ms_project.Cmd)
  112. fmt.Println("Sn:", Ms_project.Sn)
  113. //Parameter.Read_DeviceParameter(admin_r.Admin_uuid,Ms_project.Sn)
  114. _, ok := lib.CountrySnMap[Ms_project.Sn] /*如果确定是真实的,则存在,否则不存在 */
  115. if ok {
  116. } else {
  117. fmt.Println("CountrySnMap 没有,新建", Ms_project.Sn)
  118. lib.CountrySnMap[Ms_project.Sn] = lib.Cl_{
  119. Uuid_list: make(map[string]string),
  120. }
  121. }
  122. // 是否 有相同 用户
  123. _, ok = lib.CountrySnMap[Ms_project.Sn].Uuid_list[admin_r.Admin_uuid]
  124. if ok {
  125. fmt.Println("用户重复 ", admin_r.Admin_uuid)
  126. data, _ := json.Marshal(lib.JSONS{Code: 201, Msg: "用户重复!", Data: admin_r.Admin_uuid})
  127. Send_WebSocket(admin_r.Admin_uuid, string(data))
  128. } else {
  129. fmt.Println("用户新建 ", admin_r.Admin_uuid)
  130. lib.CountrySnMap[Ms_project.Sn].Uuid_list[admin_r.Admin_uuid] = admin_r.Admin_uuid
  131. data, _ := json.Marshal(lib.JSONS{Code: 200, Msg: "ok!", Data: admin_r.Admin_uuid})
  132. Send_WebSocket(admin_r.Admin_uuid, string(data))
  133. }
  134. }
  135. } else {
  136. this.Redirect("/", 302)
  137. return
  138. }
  139. }
  140. /// ------------- ---------------------------------------------
  141. func Send_WebSocket(Admin_uuid string, T_json string) {
  142. defer func() {
  143. if err := recover(); err != nil {
  144. fmt.Println(err)
  145. logs.Println("Send_WebSocket ok err:!", err)
  146. System.Add_Logs("WebSocket", "Send_WebSocket Err", Admin_uuid+T_json)
  147. }
  148. }()
  149. ws, ok := countryCapitalMap[Admin_uuid] /*如果确定是真实的,则存在,否则不存在 */
  150. if ok && ws.Conn != nil {
  151. ws.Mux.Lock()
  152. if ws.Conn.WriteMessage(websocket.TextMessage, []byte(T_json)) != nil {
  153. println("ok!")
  154. }
  155. ws.Mux.Unlock()
  156. }
  157. }
  158. // 循环刷新
  159. func Realtime() {
  160. fmt.Println("=====================Realtime GO===============")
  161. time.Sleep(time.Second * 10)
  162. for true {
  163. //fmt.Println("=====================Realtime GO===============")
  164. //fmt.Println("CountrySnMap_z:", len(lib.CountrySnMap))
  165. for k, _ := range lib.CountrySnMap {
  166. fmt.Println("CountrySnMap:", k)
  167. NatsServer.Get_Device_Realtime(k)
  168. time.Sleep(time.Millisecond * 100)
  169. }
  170. time.Sleep(time.Second * 10)
  171. }
  172. }