redis.go 1.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778
  1. package queue
  2. import (
  3. "github.com/go-redis/redis/v7"
  4. "github.com/robinjoseph08/redisqueue/v2"
  5. "gogs.baozhida.cn/zoie/OAuth-core/storage"
  6. )
  7. // NewRedis redis模式
  8. func NewRedis(
  9. producerOptions *redisqueue.ProducerOptions,
  10. consumerOptions *redisqueue.ConsumerOptions,
  11. ) (*Redis, error) {
  12. var err error
  13. r := &Redis{}
  14. r.producer, err = r.newProducer(producerOptions)
  15. if err != nil {
  16. return nil, err
  17. }
  18. r.consumer, err = r.newConsumer(consumerOptions)
  19. if err != nil {
  20. return nil, err
  21. }
  22. return r, nil
  23. }
  24. // Redis cache implement
  25. type Redis struct {
  26. client *redis.Client
  27. consumer *redisqueue.Consumer
  28. producer *redisqueue.Producer
  29. }
  30. func (Redis) String() string {
  31. return "redis"
  32. }
  33. func (r *Redis) newConsumer(options *redisqueue.ConsumerOptions) (*redisqueue.Consumer, error) {
  34. if options == nil {
  35. options = &redisqueue.ConsumerOptions{}
  36. }
  37. return redisqueue.NewConsumerWithOptions(options)
  38. }
  39. func (r *Redis) newProducer(options *redisqueue.ProducerOptions) (*redisqueue.Producer, error) {
  40. if options == nil {
  41. options = &redisqueue.ProducerOptions{}
  42. }
  43. return redisqueue.NewProducerWithOptions(options)
  44. }
  45. func (r *Redis) Append(message storage.Messager) error {
  46. err := r.producer.Enqueue(&redisqueue.Message{
  47. ID: message.GetID(),
  48. Stream: message.GetStream(),
  49. Values: message.GetValues(),
  50. })
  51. return err
  52. }
  53. func (r *Redis) Register(name string, f storage.ConsumerFunc) {
  54. r.consumer.Register(name, func(message *redisqueue.Message) error {
  55. m := new(Message)
  56. m.SetValues(message.Values)
  57. m.SetStream(message.Stream)
  58. m.SetID(message.ID)
  59. return f(m)
  60. })
  61. }
  62. func (r *Redis) Run() {
  63. r.consumer.Run()
  64. }
  65. func (r *Redis) Shutdown() {
  66. r.consumer.Shutdown()
  67. }