redis_test.go 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134
  1. package queue
  2. import (
  3. "fmt"
  4. "testing"
  5. "time"
  6. "github.com/go-redis/redis/v7"
  7. "github.com/robinjoseph08/redisqueue/v2"
  8. "gogs.baozhida.cn/zoie/OAuth-core/storage"
  9. )
  10. func TestRedis_Append(t *testing.T) {
  11. type fields struct {
  12. ConnectOption *redis.Options
  13. ConsumerOptions *redisqueue.ConsumerOptions
  14. ProducerOptions *redisqueue.ProducerOptions
  15. client *redis.Client
  16. consumer *redisqueue.Consumer
  17. producer *redisqueue.Producer
  18. }
  19. type args struct {
  20. name string
  21. message storage.Messager
  22. }
  23. client := redis.NewClient(&redis.Options{})
  24. tests := []struct {
  25. name string
  26. fields fields
  27. args args
  28. wantErr bool
  29. }{
  30. {
  31. "test01",
  32. fields{
  33. ConnectOption: &redis.Options{},
  34. ConsumerOptions: &redisqueue.ConsumerOptions{
  35. VisibilityTimeout: 60 * time.Second,
  36. BlockingTimeout: 5 * time.Second,
  37. ReclaimInterval: 1 * time.Second,
  38. BufferSize: 100,
  39. Concurrency: 10,
  40. RedisClient: client,
  41. },
  42. ProducerOptions: &redisqueue.ProducerOptions{
  43. StreamMaxLength: 100,
  44. ApproximateMaxLength: false,
  45. RedisClient: client,
  46. },
  47. },
  48. args{
  49. name: "test",
  50. message: &Message{redisqueue.Message{
  51. Stream: "test",
  52. Values: map[string]interface{}{
  53. "key": "value",
  54. },
  55. }},
  56. },
  57. false,
  58. },
  59. }
  60. for _, tt := range tests {
  61. t.Run(tt.name, func(t *testing.T) {
  62. if r, err := NewRedis(tt.fields.ProducerOptions, tt.fields.ConsumerOptions); err != nil {
  63. t.Errorf("SetQueue() error = %v", err)
  64. } else {
  65. if err := r.Append(tt.args.message); (err != nil) != tt.wantErr {
  66. t.Errorf("SetQueue() error = %v, wantErr %v", err, tt.wantErr)
  67. }
  68. }
  69. })
  70. }
  71. }
  72. func TestRedis_Register(t *testing.T) {
  73. type fields struct {
  74. ConnectOption *redis.Options
  75. ConsumerOptions *redisqueue.ConsumerOptions
  76. ProducerOptions *redisqueue.ProducerOptions
  77. client *redis.Client
  78. consumer *redisqueue.Consumer
  79. producer *redisqueue.Producer
  80. }
  81. type args struct {
  82. name string
  83. f storage.ConsumerFunc
  84. }
  85. client := redis.NewClient(&redis.Options{})
  86. tests := []struct {
  87. name string
  88. fields fields
  89. args args
  90. }{
  91. {
  92. "test01",
  93. fields{
  94. ConnectOption: &redis.Options{},
  95. ConsumerOptions: &redisqueue.ConsumerOptions{
  96. VisibilityTimeout: 60 * time.Second,
  97. BlockingTimeout: 5 * time.Second,
  98. ReclaimInterval: 1 * time.Second,
  99. BufferSize: 100,
  100. Concurrency: 10,
  101. RedisClient: client,
  102. },
  103. ProducerOptions: &redisqueue.ProducerOptions{
  104. StreamMaxLength: 100,
  105. ApproximateMaxLength: true,
  106. RedisClient: client,
  107. },
  108. },
  109. args{
  110. name: "login_log_queue",
  111. f: func(message storage.Messager) error {
  112. fmt.Println("ok")
  113. fmt.Println(message.GetValues())
  114. return nil
  115. },
  116. },
  117. },
  118. }
  119. for _, tt := range tests {
  120. t.Run(tt.name, func(t *testing.T) {
  121. if r, err := NewRedis(tt.fields.ProducerOptions, tt.fields.ConsumerOptions); err != nil {
  122. t.Errorf("SetQueue() error = %v", err)
  123. } else {
  124. r.Register(tt.args.name, tt.args.f)
  125. r.Run()
  126. }
  127. })
  128. }
  129. }