ws.go 8.3 KB


  1. package ws
  2. import (
  3. "context"
  4. "fmt"
  5. "log"
  6. "net/http"
  7. "sync"
  8. "time"
  9. "github.com/gin-gonic/gin"
  10. "github.com/gorilla/websocket"
  11. "gogs.baozhida.cn/zoie/OAuth-core/pkg"
  12. )
  13. // Manager 所有 websocket 信息
  14. type Manager struct {
  15. Group map[string]map[string]*Client
  16. groupCount, clientCount uint
  17. Lock sync.Mutex
  18. Register, UnRegister chan *Client
  19. Message chan *MessageData
  20. GroupMessage chan *GroupMessageData
  21. BroadCastMessage chan *BroadCastMessageData
  22. }
  23. // Client 单个 websocket 信息
  24. type Client struct {
  25. Id, Group string
  26. Context context.Context
  27. CancelFunc context.CancelFunc
  28. Socket *websocket.Conn
  29. Message chan []byte
  30. }
  31. // messageData 单个发送数据信息
  32. type MessageData struct {
  33. Id, Group string
  34. Context context.Context
  35. Message []byte
  36. }
  37. // groupMessageData 组广播数据信息
  38. type GroupMessageData struct {
  39. Group string
  40. Message []byte
  41. }
  42. // 广播发送数据信息
  43. type BroadCastMessageData struct {
  44. Message []byte
  45. }
  46. // 读信息,从 websocket 连接直接读取数据
  47. func (c *Client) Read(cxt context.Context) {
  48. defer func(cxt context.Context) {
  49. WebsocketManager.UnRegister <- c
  50. log.Printf("client [%s] disconnect", c.Id)
  51. if err := c.Socket.Close(); err != nil {
  52. log.Printf("client [%s] disconnect err: %s", c.Id, err)
  53. }
  54. }(cxt)
  55. for {
  56. if cxt.Err() != nil {
  57. break
  58. }
  59. messageType, message, err := c.Socket.ReadMessage()
  60. if err != nil || messageType == websocket.CloseMessage {
  61. break
  62. }
  63. log.Printf("client [%s] receive message: %s", c.Id, string(message))
  64. c.Message <- message
  65. }
  66. }
  67. // 写信息,从 channel 变量 Send 中读取数据写入 websocket 连接
  68. func (c *Client) Write(cxt context.Context) {
  69. defer func(cxt context.Context) {
  70. log.Printf("client [%s] disconnect", c.Id)
  71. if err := c.Socket.Close(); err != nil {
  72. log.Printf("client [%s] disconnect err: %s", c.Id, err)
  73. }
  74. }(cxt)
  75. for {
  76. if cxt.Err() != nil {
  77. break
  78. }
  79. select {
  80. case message, ok := <-c.Message:
  81. if !ok {
  82. _ = c.Socket.WriteMessage(websocket.CloseMessage, []byte{})
  83. return
  84. }
  85. log.Printf("client [%s] write message: %s", c.Id, string(message))
  86. err := c.Socket.WriteMessage(websocket.TextMessage, message)
  87. if err != nil {
  88. log.Printf("client [%s] writemessage err: %s", c.Id, err)
  89. }
  90. case _ = <-c.Context.Done():
  91. break
  92. }
  93. }
  94. }
  95. // 启动 websocket 管理器
  96. func (manager *Manager) Start() {
  97. log.Printf("websocket manage start")
  98. for {
  99. select {
  100. // 注册
  101. case client := <-manager.Register:
  102. log.Printf("client [%s] connect", client.Id)
  103. log.Printf("register client [%s] to group [%s]", client.Id, client.Group)
  104. manager.Lock.Lock()
  105. if manager.Group[client.Group] == nil {
  106. manager.Group[client.Group] = make(map[string]*Client)
  107. manager.groupCount += 1
  108. }
  109. manager.Group[client.Group][client.Id] = client
  110. manager.clientCount += 1
  111. manager.Lock.Unlock()
  112. // 注销
  113. case client := <-manager.UnRegister:
  114. log.Printf("unregister client [%s] from group [%s]", client.Id, client.Group)
  115. manager.Lock.Lock()
  116. if mGroup, ok := manager.Group[client.Group]; ok {
  117. if mClient, ok := mGroup[client.Id]; ok {
  118. close(mClient.Message)
  119. delete(mGroup, client.Id)
  120. manager.clientCount -= 1
  121. if len(mGroup) == 0 {
  122. //log.Printf("delete empty group [%s]", client.Group)
  123. delete(manager.Group, client.Group)
  124. manager.groupCount -= 1
  125. }
  126. mClient.CancelFunc()
  127. }
  128. }
  129. manager.Lock.Unlock()
  130. // 发送广播数据到某个组的 channel 变量 Send 中
  131. //case data := <-manager.boardCast:
  132. // if groupMap, ok := manager.wsGroup[data.GroupId]; ok {
  133. // for _, conn := range groupMap {
  134. // conn.Send <- data.Data
  135. // }
  136. // }
  137. }
  138. }
  139. }
  140. // 处理单个 client 发送数据
  141. func (manager *Manager) SendService() {
  142. for {
  143. select {
  144. case data := <-manager.Message:
  145. if groupMap, ok := manager.Group[data.Group]; ok {
  146. if conn, ok := groupMap[data.Id]; ok {
  147. conn.Message <- data.Message
  148. }
  149. }
  150. }
  151. }
  152. }
  153. // 处理 group 广播数据
  154. func (manager *Manager) SendGroupService() {
  155. for {
  156. select {
  157. // 发送广播数据到某个组的 channel 变量 Send 中
  158. case data := <-manager.GroupMessage:
  159. if groupMap, ok := manager.Group[data.Group]; ok {
  160. for _, conn := range groupMap {
  161. conn.Message <- data.Message
  162. }
  163. }
  164. }
  165. }
  166. }
  167. // 处理广播数据
  168. func (manager *Manager) SendAllService() {
  169. for {
  170. select {
  171. case data := <-manager.BroadCastMessage:
  172. for _, v := range manager.Group {
  173. for _, conn := range v {
  174. conn.Message <- data.Message
  175. }
  176. }
  177. }
  178. }
  179. }
  180. // 向指定的 client 发送数据
  181. func (manager *Manager) Send(cxt context.Context, id string, group string, message []byte) {
  182. data := &MessageData{
  183. Id: id,
  184. Context: cxt,
  185. Group: group,
  186. Message: message,
  187. }
  188. manager.Message <- data
  189. }
  190. // 向指定的 Group 广播
  191. func (manager *Manager) SendGroup(group string, message []byte) {
  192. data := &GroupMessageData{
  193. Group: group,
  194. Message: message,
  195. }
  196. manager.GroupMessage <- data
  197. }
  198. // 广播
  199. func (manager *Manager) SendAll(message []byte) {
  200. data := &BroadCastMessageData{
  201. Message: message,
  202. }
  203. manager.BroadCastMessage <- data
  204. }
  205. // 注册
  206. func (manager *Manager) RegisterClient(client *Client) {
  207. manager.Register <- client
  208. }
  209. // 注销
  210. func (manager *Manager) UnRegisterClient(client *Client) {
  211. manager.UnRegister <- client
  212. }
  213. // 当前组个数
  214. func (manager *Manager) LenGroup() uint {
  215. return manager.groupCount
  216. }
  217. // 当前连接个数
  218. func (manager *Manager) LenClient() uint {
  219. return manager.clientCount
  220. }
  221. // 获取 wsManager 管理器信息
  222. func (manager *Manager) Info() map[string]interface{} {
  223. managerInfo := make(map[string]interface{})
  224. managerInfo["groupLen"] = manager.LenGroup()
  225. managerInfo["clientLen"] = manager.LenClient()
  226. managerInfo["chanRegisterLen"] = len(manager.Register)
  227. managerInfo["chanUnregisterLen"] = len(manager.UnRegister)
  228. managerInfo["chanMessageLen"] = len(manager.Message)
  229. managerInfo["chanGroupMessageLen"] = len(manager.GroupMessage)
  230. managerInfo["chanBroadCastMessageLen"] = len(manager.BroadCastMessage)
  231. return managerInfo
  232. }
  233. // 初始化 wsManager 管理器
  234. var WebsocketManager = Manager{
  235. Group: make(map[string]map[string]*Client),
  236. Register: make(chan *Client, 128),
  237. UnRegister: make(chan *Client, 128),
  238. GroupMessage: make(chan *GroupMessageData, 128),
  239. Message: make(chan *MessageData, 128),
  240. BroadCastMessage: make(chan *BroadCastMessageData, 128),
  241. groupCount: 0,
  242. clientCount: 0,
  243. }
  244. // gin 处理 websocket handler
  245. func (manager *Manager) WsClient(c *gin.Context) {
  246. ctx, cancel := context.WithCancel(context.Background())
  247. upGrader := websocket.Upgrader{
  248. // cross origin domain
  249. CheckOrigin: func(r *http.Request) bool {
  250. return true
  251. },
  252. // 处理 Sec-WebSocket-Protocol Header
  253. Subprotocols: []string{c.GetHeader("Sec-WebSocket-Protocol")},
  254. }
  255. conn, err := upGrader.Upgrade(c.Writer, c.Request, nil)
  256. if err != nil {
  257. log.Printf("websocket connect error: %s", c.Param("channel"))
  258. return
  259. }
  260. fmt.Println("token: ", c.Query("token"))
  261. client := &Client{
  262. Id: c.Param("id"),
  263. Group: c.Param("channel"),
  264. Context: ctx,
  265. CancelFunc: cancel,
  266. Socket: conn,
  267. Message: make(chan []byte, 1024),
  268. }
  269. manager.RegisterClient(client)
  270. go client.Read(ctx)
  271. go client.Write(ctx)
  272. time.Sleep(time.Second * 15)
  273. pkg.FileMonitoringById(ctx, "temp/logs/job/db-20200820.log", c.Param("id"), c.Param("channel"), SendOne)
  274. }
  275. func (manager *Manager) UnWsClient(c *gin.Context) {
  276. id := c.Param("id")
  277. group := c.Param("channel")
  278. WsLogout(id, group)
  279. c.Set("result", "ws close success")
  280. c.JSON(http.StatusOK, gin.H{
  281. "code": http.StatusOK,
  282. "data": "ws close success",
  283. "msg": "success",
  284. })
  285. }
  286. func SendGroup(msg []byte) {
  287. WebsocketManager.SendGroup("leffss", []byte("{\"code\":200,\"data\":"+string(msg)+"}"))
  288. fmt.Println(WebsocketManager.Info())
  289. }
  290. func SendAll(msg []byte) {
  291. WebsocketManager.SendAll([]byte("{\"code\":200,\"data\":" + string(msg) + "}"))
  292. fmt.Println(WebsocketManager.Info())
  293. }
  294. func SendOne(ctx context.Context, id string, group string, msg []byte) {
  295. WebsocketManager.Send(ctx, id, group, []byte("{\"code\":200,\"data\":"+string(msg)+"}"))
  296. fmt.Println(WebsocketManager.Info())
  297. }
  298. func WsLogout(id string, group string) {
  299. WebsocketManager.UnRegisterClient(&Client{Id: id, Group: group})
  300. fmt.Println(WebsocketManager.Info())
  301. }