1
0

queue.go 1.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566
  1. package config
  2. import (
  3. "github.com/go-redis/redis/v7"
  4. "github.com/robinjoseph08/redisqueue/v2"
  5. "gogs.baozhida.cn/zoie/OAuth-core/storage"
  6. "gogs.baozhida.cn/zoie/OAuth-core/storage/queue"
  7. "time"
  8. )
  9. type Queue struct {
  10. Redis *QueueRedis
  11. Memory *QueueMemory
  12. NSQ *QueueNSQ `json:"nsq" yaml:"nsq"`
  13. }
  14. type QueueRedis struct {
  15. RedisConnectOptions
  16. Producer *redisqueue.ProducerOptions
  17. Consumer *redisqueue.ConsumerOptions
  18. }
  19. type QueueMemory struct {
  20. PoolSize uint
  21. }
  22. type QueueNSQ struct {
  23. NSQOptions
  24. ChannelPrefix string
  25. }
  26. var QueueConfig = new(Queue)
  27. // Empty 空设置
  28. func (e Queue) Empty() bool {
  29. return e.Memory == nil && e.Redis == nil && e.NSQ == nil
  30. }
  31. // Setup 启用顺序 redis > 其他 > memory
  32. func (e Queue) Setup() (storage.AdapterQueue, error) {
  33. if e.Redis != nil {
  34. e.Redis.Consumer.ReclaimInterval = e.Redis.Consumer.ReclaimInterval * time.Second
  35. e.Redis.Consumer.BlockingTimeout = e.Redis.Consumer.BlockingTimeout * time.Second
  36. e.Redis.Consumer.VisibilityTimeout = e.Redis.Consumer.VisibilityTimeout * time.Second
  37. client := GetRedisClient()
  38. if client == nil {
  39. options, err := e.Redis.RedisConnectOptions.GetRedisOptions()
  40. if err != nil {
  41. return nil, err
  42. }
  43. client = redis.NewClient(options)
  44. _redis = client
  45. }
  46. e.Redis.Producer.RedisClient = client
  47. e.Redis.Consumer.RedisClient = client
  48. return queue.NewRedis(e.Redis.Producer, e.Redis.Consumer)
  49. }
  50. if e.NSQ != nil {
  51. cfg, err := e.NSQ.GetNSQOptions()
  52. if err != nil {
  53. return nil, err
  54. }
  55. return queue.NewNSQ(e.NSQ.Addresses, cfg, e.NSQ.ChannelPrefix)
  56. }
  57. return queue.NewMemory(e.Memory.PoolSize), nil
  58. }