123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778 |
- package queue
- import (
- "github.com/go-redis/redis/v7"
- "github.com/robinjoseph08/redisqueue/v2"
- "gogs.baozhida.cn/zoie/OAuth-core/storage"
- )
- // NewRedis redis模式
- func NewRedis(
- producerOptions *redisqueue.ProducerOptions,
- consumerOptions *redisqueue.ConsumerOptions,
- ) (*Redis, error) {
- var err error
- r := &Redis{}
- r.producer, err = r.newProducer(producerOptions)
- if err != nil {
- return nil, err
- }
- r.consumer, err = r.newConsumer(consumerOptions)
- if err != nil {
- return nil, err
- }
- return r, nil
- }
- // Redis cache implement
- type Redis struct {
- client *redis.Client
- consumer *redisqueue.Consumer
- producer *redisqueue.Producer
- }
- func (Redis) String() string {
- return "redis"
- }
- func (r *Redis) newConsumer(options *redisqueue.ConsumerOptions) (*redisqueue.Consumer, error) {
- if options == nil {
- options = &redisqueue.ConsumerOptions{}
- }
- return redisqueue.NewConsumerWithOptions(options)
- }
- func (r *Redis) newProducer(options *redisqueue.ProducerOptions) (*redisqueue.Producer, error) {
- if options == nil {
- options = &redisqueue.ProducerOptions{}
- }
- return redisqueue.NewProducerWithOptions(options)
- }
- func (r *Redis) Append(message storage.Messager) error {
- err := r.producer.Enqueue(&redisqueue.Message{
- ID: message.GetID(),
- Stream: message.GetStream(),
- Values: message.GetValues(),
- })
- return err
- }
- func (r *Redis) Register(name string, f storage.ConsumerFunc) {
- r.consumer.Register(name, func(message *redisqueue.Message) error {
- m := new(Message)
- m.SetValues(message.Values)
- m.SetStream(message.Stream)
- m.SetID(message.ID)
- return f(m)
- })
- }
- func (r *Redis) Run() {
- r.consumer.Run()
- }
- func (r *Redis) Shutdown() {
- r.consumer.Shutdown()
- }
|