123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566 |
- package config
- import (
- "github.com/go-redis/redis/v7"
- "github.com/robinjoseph08/redisqueue/v2"
- "gogs.baozhida.cn/zoie/OAuth-core/storage"
- "gogs.baozhida.cn/zoie/OAuth-core/storage/queue"
- "time"
- )
- type Queue struct {
- Redis *QueueRedis
- Memory *QueueMemory
- NSQ *QueueNSQ `json:"nsq" yaml:"nsq"`
- }
- type QueueRedis struct {
- RedisConnectOptions
- Producer *redisqueue.ProducerOptions
- Consumer *redisqueue.ConsumerOptions
- }
- type QueueMemory struct {
- PoolSize uint
- }
- type QueueNSQ struct {
- NSQOptions
- ChannelPrefix string
- }
- var QueueConfig = new(Queue)
- // Empty 空设置
- func (e Queue) Empty() bool {
- return e.Memory == nil && e.Redis == nil && e.NSQ == nil
- }
- // Setup 启用顺序 redis > 其他 > memory
- func (e Queue) Setup() (storage.AdapterQueue, error) {
- if e.Redis != nil {
- e.Redis.Consumer.ReclaimInterval = e.Redis.Consumer.ReclaimInterval * time.Second
- e.Redis.Consumer.BlockingTimeout = e.Redis.Consumer.BlockingTimeout * time.Second
- e.Redis.Consumer.VisibilityTimeout = e.Redis.Consumer.VisibilityTimeout * time.Second
- client := GetRedisClient()
- if client == nil {
- options, err := e.Redis.RedisConnectOptions.GetRedisOptions()
- if err != nil {
- return nil, err
- }
- client = redis.NewClient(options)
- _redis = client
- }
- e.Redis.Producer.RedisClient = client
- e.Redis.Consumer.RedisClient = client
- return queue.NewRedis(e.Redis.Producer, e.Redis.Consumer)
- }
- if e.NSQ != nil {
- cfg, err := e.NSQ.GetNSQOptions()
- if err != nil {
- return nil, err
- }
- return queue.NewNSQ(e.NSQ.Addresses, cfg, e.NSQ.ChannelPrefix)
- }
- return queue.NewMemory(e.Memory.PoolSize), nil
- }
|