123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101 |
- package queue
- import (
- "sync"
- "github.com/google/uuid"
- "gogs.baozhida.cn/zoie/OAuth-core/storage"
- )
- type queue chan storage.Messager
- // NewMemory 内存模式
- func NewMemory(poolNum uint) *Memory {
- return &Memory{
- queue: new(sync.Map),
- PoolNum: poolNum,
- }
- }
- type Memory struct {
- queue *sync.Map
- wait sync.WaitGroup
- mutex sync.RWMutex
- PoolNum uint
- }
- func (*Memory) String() string {
- return "memory"
- }
- func (m *Memory) makeQueue() queue {
- if m.PoolNum <= 0 {
- return make(queue)
- }
- return make(queue, m.PoolNum)
- }
- func (m *Memory) Append(message storage.Messager) error {
- m.mutex.RLock()
- defer m.mutex.RUnlock()
- memoryMessage := new(Message)
- memoryMessage.SetID(message.GetID())
- memoryMessage.SetStream(message.GetStream())
- memoryMessage.SetValues(message.GetValues())
- v, ok := m.queue.Load(message.GetStream())
- if !ok {
- v = m.makeQueue()
- m.queue.Store(message.GetStream(), v)
- }
- var q queue
- switch v.(type) {
- case queue:
- q = v.(queue)
- default:
- q = m.makeQueue()
- m.queue.Store(message.GetStream(), q)
- }
- go func(gm storage.Messager, gq queue) {
- gm.SetID(uuid.New().String())
- gq <- gm
- }(memoryMessage, q)
- return nil
- }
- func (m *Memory) Register(name string, f storage.ConsumerFunc) {
- m.mutex.RLock()
- defer m.mutex.RUnlock()
- v, ok := m.queue.Load(name)
- if !ok {
- v = m.makeQueue()
- m.queue.Store(name, v)
- }
- var q queue
- switch v.(type) {
- case queue:
- q = v.(queue)
- default:
- q = m.makeQueue()
- m.queue.Store(name, q)
- }
- go func(out queue, gf storage.ConsumerFunc) {
- var err error
- for message := range q {
- err = gf(message)
- if err != nil {
- out <- message
- err = nil
- }
- }
- }(q, f)
- }
- func (m *Memory) Run() {
- m.wait.Add(1)
- m.wait.Wait()
- }
- func (m *Memory) Shutdown() {
- m.wait.Done()
- }
|