queue.go 906 B

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647
  1. package runtime
  2. import "gogs.baozhida.cn/zoie/OAuth-core/storage"
  3. // NewQueue 创建对应上下文队列
  4. func NewQueue(prefix string, queue storage.AdapterQueue) storage.AdapterQueue {
  5. return &Queue{
  6. prefix: prefix,
  7. queue: queue,
  8. }
  9. }
  10. type Queue struct {
  11. prefix string
  12. queue storage.AdapterQueue
  13. }
  14. func (e *Queue) String() string {
  15. return e.queue.String()
  16. }
  17. // Register 注册消费者
  18. func (e *Queue) Register(name string, f storage.ConsumerFunc) {
  19. e.queue.Register(name, f)
  20. }
  21. // Append 增加数据到生产者
  22. func (e *Queue) Append(message storage.Messager) error {
  23. values := message.GetValues()
  24. if values == nil {
  25. values = make(map[string]interface{})
  26. }
  27. values[storage.PrefixKey] = e.prefix
  28. return e.queue.Append(message)
  29. }
  30. // Run 运行
  31. func (e *Queue) Run() {
  32. e.queue.Run()
  33. }
  34. // Shutdown 停止
  35. func (e *Queue) Shutdown() {
  36. if e.queue != nil {
  37. e.queue.Shutdown()
  38. }
  39. }