SubClinets.go 2.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697
  1. package lib
  2. import (
  3. "AIOTCOER/logs"
  4. "encoding/json"
  5. "fmt"
  6. "github.com/beego/beego/v2/server/web/context"
  7. "github.com/gorilla/websocket"
  8. "sync"
  9. )
  10. var WsSubClinets sync.Map // 订阅Map
  11. var SseSubClinets sync.Map // 订阅Map
  12. var SseLogSubClinets sync.Map // 订阅Map
  13. // 清楚订阅
  14. func ClearSubscription(client *websocket.Conn) {
  15. logs.Println("清楚订阅:", fmt.Sprintf("%p", client))
  16. // 编列所有订阅号
  17. WsSubClinets.Range(func(key, value interface{}) bool {
  18. ConnList := value.([]*websocket.Conn)
  19. Sn := key.(string)
  20. logs.Println("SubClinets_数量|", Sn, len(ConnList))
  21. for i, v := range ConnList {
  22. if v == client {
  23. fmt.Println("删除订阅:", fmt.Sprintf("%p", v), Sn)
  24. ConnList = append(ConnList[:i], ConnList[i+1:]...)
  25. WsSubClinets.Store(Sn, ConnList)
  26. break
  27. }
  28. }
  29. return true
  30. })
  31. }
  32. // 发送 数据
  33. func WebsocketWriteJSON(client *websocket.Conn, msg interface{}) error {
  34. err := client.WriteJSON(msg)
  35. if err != nil {
  36. ClearSubscription(client) // 清楚订阅
  37. client.Close()
  38. }
  39. return nil
  40. }
  41. func WebsocketSubscribeSendAll(Sn string, json_r map[string]interface{}) {
  42. val, ok := WsSubClinets.Load(Sn)
  43. if ok {
  44. json_r_ws := CopyMap(json_r)
  45. json_r_ws["T_sn"] = Sn
  46. SubConn := val.([]*websocket.Conn)
  47. for _, v := range SubConn {
  48. WebsocketWriteJSON(v, json_r_ws)
  49. }
  50. }
  51. }
  52. // 发送 数据
  53. func SseWriteJSON(client *context.Response, msg []byte) error {
  54. // Send data to the client
  55. client.Write(append(append([]byte("data: "), msg...), []byte("\n\n")...))
  56. //client.Write([]byte("\n"))
  57. // Flush the response writer to send the data immediately
  58. client.Flush()
  59. return nil
  60. }
  61. func SseSubscribeSendAll(Sn string, json_r map[string]interface{}) {
  62. val, ok := SseSubClinets.Load(Sn)
  63. if ok {
  64. json_r_ws_data, _ := json.Marshal(json_r)
  65. SubConn := val.([]*context.Response)
  66. for _, v := range SubConn {
  67. SseWriteJSON(v, json_r_ws_data)
  68. }
  69. }
  70. }
  71. func SseLogSubscribeSendAll(Sn string, json_r []string) {
  72. val, ok := SseLogSubClinets.Load(Sn)
  73. if ok {
  74. json_r_ws_data, _ := json.Marshal(json_r)
  75. SubConn := val.([]*context.Response)
  76. for _, v := range SubConn {
  77. SseWriteJSON(v, json_r_ws_data)
  78. }
  79. }
  80. }