option_nsq.go 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172
  1. /*
  2. * @Author: lwnmengjing
  3. * @Date: 2021/5/31 9:10 上午
  4. * @Last Modified by: lwnmengjing
  5. * @Last Modified time: 2021/5/31 9:10 上午
  6. */
  7. package config
  8. import (
  9. "time"
  10. "github.com/nsqio/go-nsq"
  11. )
  12. type NSQOptions struct {
  13. DialTimeout time.Duration `opt:"dial_timeout" default:"1s"`
  14. // Deadlines for network reads and writes
  15. ReadTimeout time.Duration `opt:"read_timeout" min:"100ms" max:"5m" default:"60s"`
  16. WriteTimeout time.Duration `opt:"write_timeout" min:"100ms" max:"5m" default:"1s"`
  17. // Addresses is the local address to use when dialing an nsqd.
  18. Addresses []string `opt:"addresses"`
  19. // Duration between polling lookupd for new producers, and fractional jitter to add to
  20. // the lookupd pool loop. this helps evenly distribute requests even if multiple consumers
  21. // restart at the same time
  22. //
  23. // NOTE: when not using nsqlookupd, LookupdPollInterval represents the duration of time between
  24. // reconnection attempts
  25. LookupdPollInterval time.Duration `opt:"lookupd_poll_interval" min:"10ms" max:"5m" default:"60s"`
  26. LookupdPollJitter float64 `opt:"lookupd_poll_jitter" min:"0" max:"1" default:"0.3"`
  27. // Maximum duration when REQueueing (for doubling of deferred requeue)
  28. MaxRequeueDelay time.Duration `opt:"max_requeue_delay" min:"0" max:"60m" default:"15m"`
  29. DefaultRequeueDelay time.Duration `opt:"default_requeue_delay" min:"0" max:"60m" default:"90s"`
  30. // Maximum amount of time to backoff when processing fails 0 == no backoff
  31. MaxBackoffDuration time.Duration `opt:"max_backoff_duration" min:"0" max:"60m" default:"2m"`
  32. // Unit of time for calculating consumer backoff
  33. BackoffMultiplier time.Duration `opt:"backoff_multiplier" min:"0" max:"60m" default:"1s"`
  34. // Maximum number of times this consumer will attempt to process a message before giving up
  35. MaxAttempts uint16 `opt:"max_attempts" min:"0" max:"65535" default:"5"`
  36. // Duration to wait for a message from an nsqd when in a state where RDY
  37. // counts are re-distributed (e.g. max_in_flight < num_producers)
  38. LowRdyIdleTimeout time.Duration `opt:"low_rdy_idle_timeout" min:"1s" max:"5m" default:"10s"`
  39. // Duration to wait until redistributing RDY for an nsqd regardless of LowRdyIdleTimeout
  40. LowRdyTimeout time.Duration `opt:"low_rdy_timeout" min:"1s" max:"5m" default:"30s"`
  41. // Duration between redistributing max-in-flight to connections
  42. RDYRedistributeInterval time.Duration `opt:"rdy_redistribute_interval" min:"1ms" max:"5s" default:"5s"`
  43. // Identifiers sent to nsqd representing this client
  44. // UserAgent is in the spirit of HTTP (default: "<client_library_name>/<version>")
  45. ClientID string `opt:"client_id"` // (defaults: short hostname)
  46. Hostname string `opt:"hostname"`
  47. UserAgent string `opt:"user_agent"`
  48. // Duration of time between heartbeats. This must be less than ReadTimeout
  49. HeartbeatInterval time.Duration `opt:"heartbeat_interval" default:"30s"`
  50. // Integer percentage to sample the channel (requires nsqd 0.2.25+)
  51. SampleRate int32 `opt:"sample_rate" min:"0" max:"99"`
  52. Tls *Tls `yaml:"tls" json:"tls"`
  53. // Compression Settings
  54. Deflate bool `opt:"deflate"`
  55. DeflateLevel int `opt:"deflate_level" min:"1" max:"9" default:"6"`
  56. Snappy bool `opt:"snappy"`
  57. // Size of the buffer (in bytes) used by nsqd for buffering writes to this connection
  58. OutputBufferSize int64 `opt:"output_buffer_size" default:"16384"`
  59. // Timeout used by nsqd before flushing buffered writes (set to 0 to disable).
  60. //
  61. // WARNING: configuring clients with an extremely low
  62. // (< 25ms) output_buffer_timeout has a significant effect
  63. // on nsqd CPU usage (particularly with > 50 clients connected).
  64. OutputBufferTimeout time.Duration `opt:"output_buffer_timeout" default:"250ms"`
  65. // Maximum number of messages to allow in flight (concurrency knob)
  66. MaxInFlight int `opt:"max_in_flight" min:"0" default:"1"`
  67. // The server-side message timeout for messages delivered to this client
  68. MsgTimeout time.Duration `opt:"msg_timeout" min:"0"`
  69. // secret for nsqd authentication (requires nsqd 0.2.29+)
  70. AuthSecret string `opt:"auth_secret"`
  71. }
  72. func (e NSQOptions) GetNSQOptions() (*nsq.Config, error) {
  73. cfg := nsq.NewConfig()
  74. var err error
  75. cfg.TlsConfig, err = getTLS(e.Tls)
  76. if err != nil {
  77. return nil, err
  78. }
  79. if e.DialTimeout > 0 {
  80. cfg.DialTimeout = e.DialTimeout * time.Second
  81. }
  82. if e.ReadTimeout > 0 {
  83. cfg.ReadTimeout = e.ReadTimeout * time.Second
  84. }
  85. if e.WriteTimeout > 0 {
  86. cfg.WriteTimeout = e.WriteTimeout * time.Second
  87. }
  88. if e.LookupdPollInterval > 0 {
  89. cfg.LookupdPollInterval = e.LookupdPollInterval * time.Second
  90. }
  91. if e.MaxRequeueDelay > 0 {
  92. cfg.MaxRequeueDelay = e.MaxRequeueDelay * time.Second
  93. }
  94. if e.DefaultRequeueDelay > 0 {
  95. cfg.DefaultRequeueDelay = e.DefaultRequeueDelay * time.Second
  96. }
  97. if e.MaxBackoffDuration > 0 {
  98. cfg.MaxBackoffDuration = e.MaxBackoffDuration * time.Millisecond
  99. }
  100. if e.BackoffMultiplier > 0 {
  101. cfg.BackoffMultiplier = e.BackoffMultiplier * time.Second
  102. }
  103. if e.LowRdyIdleTimeout > 0 {
  104. cfg.LowRdyIdleTimeout = e.LowRdyIdleTimeout * time.Second
  105. }
  106. if e.LowRdyTimeout > 0 {
  107. cfg.LowRdyTimeout = e.LowRdyTimeout * time.Second
  108. }
  109. if e.RDYRedistributeInterval > 0 {
  110. cfg.RDYRedistributeInterval = e.RDYRedistributeInterval * time.Second
  111. }
  112. if e.HeartbeatInterval > 0 {
  113. cfg.HeartbeatInterval = e.HeartbeatInterval * time.Second
  114. }
  115. if e.OutputBufferTimeout > 0 {
  116. cfg.OutputBufferTimeout = e.OutputBufferTimeout * time.Second
  117. }
  118. if e.MsgTimeout > 0 {
  119. cfg.MsgTimeout = e.MsgTimeout * time.Second
  120. }
  121. if e.LookupdPollJitter > 0 {
  122. cfg.LookupdPollJitter = e.LookupdPollJitter
  123. }
  124. cfg.MaxAttempts = e.MaxAttempts
  125. if e.ClientID != "" {
  126. cfg.ClientID = e.ClientID
  127. }
  128. if e.Hostname != "" {
  129. cfg.Hostname = e.Hostname
  130. }
  131. if e.UserAgent != "" {
  132. cfg.UserAgent = e.UserAgent
  133. }
  134. if e.SampleRate > 0 {
  135. cfg.SampleRate = e.SampleRate
  136. }
  137. cfg.Deflate = e.Deflate
  138. if e.DeflateLevel >= 6 && e.DeflateLevel <= 9 {
  139. cfg.DeflateLevel = e.DeflateLevel
  140. }
  141. cfg.Snappy = e.Snappy
  142. if e.OutputBufferSize > 0 {
  143. cfg.OutputBufferSize = e.OutputBufferSize
  144. }
  145. if e.MaxInFlight > 0 {
  146. cfg.MaxInFlight = e.MaxInFlight
  147. }
  148. if e.AuthSecret != "" {
  149. cfg.AuthSecret = e.AuthSecret
  150. }
  151. return cfg, nil
  152. }