1
0

default.go 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309
  1. package config
  2. import (
  3. "bytes"
  4. "sync"
  5. "time"
  6. "gogs.baozhida.cn/zoie/OAuth-core/config/loader"
  7. "gogs.baozhida.cn/zoie/OAuth-core/config/loader/memory"
  8. "gogs.baozhida.cn/zoie/OAuth-core/config/reader"
  9. "gogs.baozhida.cn/zoie/OAuth-core/config/reader/json"
  10. "gogs.baozhida.cn/zoie/OAuth-core/config/source"
  11. )
  12. type config struct {
  13. exit chan bool
  14. opts Options
  15. sync.RWMutex
  16. // the current snapshot
  17. snap *loader.Snapshot
  18. // the current values
  19. vals reader.Values
  20. }
  21. type watcher struct {
  22. lw loader.Watcher
  23. rd reader.Reader
  24. path []string
  25. value reader.Value
  26. }
  27. func newConfig(opts ...Option) (Config, error) {
  28. var c config
  29. err := c.Init(opts...)
  30. if err != nil {
  31. return nil, err
  32. }
  33. go c.run()
  34. return &c, nil
  35. }
  36. func (c *config) Init(opts ...Option) error {
  37. c.opts = Options{
  38. Reader: json.NewReader(),
  39. }
  40. c.exit = make(chan bool)
  41. for _, o := range opts {
  42. o(&c.opts)
  43. }
  44. // default loader uses the configured reader
  45. if c.opts.Loader == nil {
  46. c.opts.Loader = memory.NewLoader(memory.WithReader(c.opts.Reader))
  47. }
  48. err := c.opts.Loader.Load(c.opts.Source...)
  49. if err != nil {
  50. return err
  51. }
  52. c.snap, err = c.opts.Loader.Snapshot()
  53. if err != nil {
  54. return err
  55. }
  56. c.vals, err = c.opts.Reader.Values(c.snap.ChangeSet)
  57. if err != nil {
  58. return err
  59. }
  60. if c.opts.Entity != nil {
  61. _ = c.vals.Scan(c.opts.Entity)
  62. }
  63. return nil
  64. }
  65. func (c *config) Options() Options {
  66. return c.opts
  67. }
  68. func (c *config) run() {
  69. watch := func(w loader.Watcher) error {
  70. for {
  71. // get changeset
  72. snap, err := w.Next()
  73. if err != nil {
  74. return err
  75. }
  76. c.Lock()
  77. if c.snap.Version >= snap.Version {
  78. c.Unlock()
  79. continue
  80. }
  81. // save
  82. c.snap = snap
  83. // set values
  84. c.vals, _ = c.opts.Reader.Values(snap.ChangeSet)
  85. if c.opts.Entity != nil {
  86. _ = c.vals.Scan(c.opts.Entity)
  87. c.opts.Entity.OnChange()
  88. }
  89. c.Unlock()
  90. }
  91. }
  92. for {
  93. w, err := c.opts.Loader.Watch()
  94. if err != nil {
  95. time.Sleep(time.Second)
  96. continue
  97. }
  98. done := make(chan bool)
  99. // the stop watch func
  100. go func() {
  101. select {
  102. case <-done:
  103. case <-c.exit:
  104. }
  105. _ = w.Stop()
  106. }()
  107. // block watch
  108. if err := watch(w); err != nil {
  109. // do something better
  110. time.Sleep(time.Second)
  111. }
  112. // close done chan
  113. close(done)
  114. // if the config is closed exit
  115. select {
  116. case <-c.exit:
  117. return
  118. default:
  119. }
  120. }
  121. }
  122. func (c *config) Map() map[string]interface{} {
  123. c.RLock()
  124. defer c.RUnlock()
  125. return c.vals.Map()
  126. }
  127. func (c *config) Scan(v interface{}) error {
  128. c.RLock()
  129. defer c.RUnlock()
  130. return c.vals.Scan(v)
  131. }
  132. // Sync sync loads all the sources, calls the parser and updates the config
  133. func (c *config) Sync() error {
  134. if err := c.opts.Loader.Sync(); err != nil {
  135. return err
  136. }
  137. snap, err := c.opts.Loader.Snapshot()
  138. if err != nil {
  139. return err
  140. }
  141. c.Lock()
  142. defer c.Unlock()
  143. c.snap = snap
  144. vals, err := c.opts.Reader.Values(snap.ChangeSet)
  145. if err != nil {
  146. return err
  147. }
  148. c.vals = vals
  149. return nil
  150. }
  151. func (c *config) Close() error {
  152. select {
  153. case <-c.exit:
  154. return nil
  155. default:
  156. close(c.exit)
  157. }
  158. return nil
  159. }
  160. func (c *config) Get(path ...string) reader.Value {
  161. c.RLock()
  162. defer c.RUnlock()
  163. // did sync actually work?
  164. if c.vals != nil {
  165. return c.vals.Get(path...)
  166. }
  167. // no value
  168. return newValue()
  169. }
  170. func (c *config) Set(val interface{}, path ...string) {
  171. c.Lock()
  172. defer c.Unlock()
  173. if c.vals != nil {
  174. c.vals.Set(val, path...)
  175. }
  176. return
  177. }
  178. func (c *config) Del(path ...string) {
  179. c.Lock()
  180. defer c.Unlock()
  181. if c.vals != nil {
  182. c.vals.Del(path...)
  183. }
  184. return
  185. }
  186. func (c *config) Bytes() []byte {
  187. c.RLock()
  188. defer c.RUnlock()
  189. if c.vals == nil {
  190. return []byte{}
  191. }
  192. return c.vals.Bytes()
  193. }
  194. func (c *config) Load(sources ...source.Source) error {
  195. if err := c.opts.Loader.Load(sources...); err != nil {
  196. return err
  197. }
  198. snap, err := c.opts.Loader.Snapshot()
  199. if err != nil {
  200. return err
  201. }
  202. c.Lock()
  203. defer c.Unlock()
  204. c.snap = snap
  205. vals, err := c.opts.Reader.Values(snap.ChangeSet)
  206. if err != nil {
  207. return err
  208. }
  209. c.vals = vals
  210. return nil
  211. }
  212. func (c *config) Watch(path ...string) (Watcher, error) {
  213. value := c.Get(path...)
  214. w, err := c.opts.Loader.Watch(path...)
  215. if err != nil {
  216. return nil, err
  217. }
  218. return &watcher{
  219. lw: w,
  220. rd: c.opts.Reader,
  221. path: path,
  222. value: value,
  223. }, nil
  224. }
  225. func (c *config) String() string {
  226. return "config"
  227. }
  228. func (w *watcher) Next() (reader.Value, error) {
  229. for {
  230. s, err := w.lw.Next()
  231. if err != nil {
  232. return nil, err
  233. }
  234. // only process changes
  235. if bytes.Equal(w.value.Bytes(), s.ChangeSet.Data) {
  236. continue
  237. }
  238. v, err := w.rd.Values(s.ChangeSet)
  239. if err != nil {
  240. return nil, err
  241. }
  242. w.value = v.Get()
  243. return w.value, nil
  244. }
  245. }
  246. func (w *watcher) Stop() error {
  247. return w.lw.Stop()
  248. }