memory.go 1.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101
  1. package queue
  2. import (
  3. "sync"
  4. "github.com/google/uuid"
  5. "gogs.baozhida.cn/zoie/OAuth-core/storage"
  6. )
  7. type queue chan storage.Messager
  8. // NewMemory 内存模式
  9. func NewMemory(poolNum uint) *Memory {
  10. return &Memory{
  11. queue: new(sync.Map),
  12. PoolNum: poolNum,
  13. }
  14. }
  15. type Memory struct {
  16. queue *sync.Map
  17. wait sync.WaitGroup
  18. mutex sync.RWMutex
  19. PoolNum uint
  20. }
  21. func (*Memory) String() string {
  22. return "memory"
  23. }
  24. func (m *Memory) makeQueue() queue {
  25. if m.PoolNum <= 0 {
  26. return make(queue)
  27. }
  28. return make(queue, m.PoolNum)
  29. }
  30. func (m *Memory) Append(message storage.Messager) error {
  31. m.mutex.RLock()
  32. defer m.mutex.RUnlock()
  33. memoryMessage := new(Message)
  34. memoryMessage.SetID(message.GetID())
  35. memoryMessage.SetStream(message.GetStream())
  36. memoryMessage.SetValues(message.GetValues())
  37. v, ok := m.queue.Load(message.GetStream())
  38. if !ok {
  39. v = m.makeQueue()
  40. m.queue.Store(message.GetStream(), v)
  41. }
  42. var q queue
  43. switch v.(type) {
  44. case queue:
  45. q = v.(queue)
  46. default:
  47. q = m.makeQueue()
  48. m.queue.Store(message.GetStream(), q)
  49. }
  50. go func(gm storage.Messager, gq queue) {
  51. gm.SetID(uuid.New().String())
  52. gq <- gm
  53. }(memoryMessage, q)
  54. return nil
  55. }
  56. func (m *Memory) Register(name string, f storage.ConsumerFunc) {
  57. m.mutex.RLock()
  58. defer m.mutex.RUnlock()
  59. v, ok := m.queue.Load(name)
  60. if !ok {
  61. v = m.makeQueue()
  62. m.queue.Store(name, v)
  63. }
  64. var q queue
  65. switch v.(type) {
  66. case queue:
  67. q = v.(queue)
  68. default:
  69. q = m.makeQueue()
  70. m.queue.Store(name, q)
  71. }
  72. go func(out queue, gf storage.ConsumerFunc) {
  73. var err error
  74. for message := range q {
  75. err = gf(message)
  76. if err != nil {
  77. out <- message
  78. err = nil
  79. }
  80. }
  81. }(q, f)
  82. }
  83. func (m *Memory) Run() {
  84. m.wait.Add(1)
  85. m.wait.Wait()
  86. }
  87. func (m *Memory) Shutdown() {
  88. m.wait.Done()
  89. }