memory_test.go 1.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107
  1. package queue
  2. import (
  3. "fmt"
  4. "log"
  5. "sync"
  6. "testing"
  7. "time"
  8. "github.com/robinjoseph08/redisqueue/v2"
  9. "gogs.baozhida.cn/zoie/OAuth-core/storage"
  10. )
  11. func TestMemory_Append(t *testing.T) {
  12. type fields struct {
  13. items *sync.Map
  14. queue *sync.Map
  15. wait sync.WaitGroup
  16. mutex sync.RWMutex
  17. }
  18. type args struct {
  19. name string
  20. message storage.Messager
  21. }
  22. tests := []struct {
  23. name string
  24. fields fields
  25. args args
  26. wantErr bool
  27. }{
  28. {
  29. "test01",
  30. fields{},
  31. args{
  32. name: "test",
  33. message: &Message{redisqueue.Message{
  34. ID: "",
  35. Stream: "test",
  36. Values: map[string]interface{}{
  37. "key": "value",
  38. },
  39. }},
  40. },
  41. false,
  42. },
  43. }
  44. for _, tt := range tests {
  45. t.Run(tt.name, func(t *testing.T) {
  46. m := NewMemory(100)
  47. if err := m.Append(tt.args.message); (err != nil) != tt.wantErr {
  48. t.Errorf("Append() error = %v, wantErr %v", err, tt.wantErr)
  49. }
  50. })
  51. }
  52. }
  53. func TestMemory_Register(t *testing.T) {
  54. log.SetFlags(19)
  55. type fields struct {
  56. items *sync.Map
  57. queue *sync.Map
  58. wait sync.WaitGroup
  59. mutex sync.RWMutex
  60. }
  61. type args struct {
  62. name string
  63. f storage.ConsumerFunc
  64. }
  65. tests := []struct {
  66. name string
  67. fields fields
  68. args args
  69. }{
  70. {
  71. "test01",
  72. fields{},
  73. args{
  74. name: "test",
  75. f: func(message storage.Messager) error {
  76. fmt.Println(message.GetValues())
  77. return nil
  78. },
  79. },
  80. },
  81. }
  82. for _, tt := range tests {
  83. t.Run(tt.name, func(t *testing.T) {
  84. m := NewMemory(100)
  85. m.Register(tt.name, tt.args.f)
  86. if err := m.Append(&Message{redisqueue.Message{
  87. ID: "",
  88. Stream: "test",
  89. Values: map[string]interface{}{
  90. "key": "value",
  91. },
  92. }}); err != nil {
  93. t.Error(err)
  94. return
  95. }
  96. go func() {
  97. m.Run()
  98. }()
  99. time.Sleep(3 * time.Second)
  100. m.Shutdown()
  101. })
  102. }
  103. }