memory.go 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454
  1. package memory
  2. import (
  3. "bytes"
  4. "container/list"
  5. "errors"
  6. "fmt"
  7. "strings"
  8. "sync"
  9. "time"
  10. "gogs.baozhida.cn/zoie/OAuth-core/config/loader"
  11. "gogs.baozhida.cn/zoie/OAuth-core/config/reader"
  12. "gogs.baozhida.cn/zoie/OAuth-core/config/reader/json"
  13. "gogs.baozhida.cn/zoie/OAuth-core/config/source"
  14. )
  15. type memory struct {
  16. exit chan bool
  17. opts loader.Options
  18. sync.RWMutex
  19. // the current snapshot
  20. snap *loader.Snapshot
  21. // the current values
  22. vals reader.Values
  23. // all the sets
  24. sets []*source.ChangeSet
  25. // all the sources
  26. sources []source.Source
  27. watchers *list.List
  28. }
  29. type updateValue struct {
  30. version string
  31. value reader.Value
  32. }
  33. type watcher struct {
  34. exit chan bool
  35. path []string
  36. value reader.Value
  37. reader reader.Reader
  38. version string
  39. updates chan updateValue
  40. }
  41. func (m *memory) watch(idx int, s source.Source) {
  42. // watches a source for changes
  43. watch := func(idx int, s source.Watcher) error {
  44. for {
  45. // get changeset
  46. cs, err := s.Next()
  47. if err != nil {
  48. return err
  49. }
  50. m.Lock()
  51. // save
  52. m.sets[idx] = cs
  53. // merge sets
  54. set, err := m.opts.Reader.Merge(m.sets...)
  55. if err != nil {
  56. m.Unlock()
  57. return err
  58. }
  59. // set values
  60. m.vals, _ = m.opts.Reader.Values(set)
  61. m.snap = &loader.Snapshot{
  62. ChangeSet: set,
  63. Version: genVer(),
  64. }
  65. m.Unlock()
  66. // send watch updates
  67. m.update()
  68. }
  69. }
  70. for {
  71. // watch the source
  72. w, err := s.Watch()
  73. if err != nil {
  74. time.Sleep(time.Second)
  75. continue
  76. }
  77. done := make(chan bool)
  78. // the stop watch func
  79. go func() {
  80. select {
  81. case <-done:
  82. case <-m.exit:
  83. }
  84. _ = w.Stop()
  85. }()
  86. // block watch
  87. if err := watch(idx, w); err != nil {
  88. // do something better
  89. time.Sleep(time.Second)
  90. }
  91. // close done chan
  92. close(done)
  93. // if the config is closed exit
  94. select {
  95. case <-m.exit:
  96. return
  97. default:
  98. }
  99. }
  100. }
  101. func (m *memory) loaded() bool {
  102. var loaded bool
  103. m.RLock()
  104. if m.vals != nil {
  105. loaded = true
  106. }
  107. m.RUnlock()
  108. return loaded
  109. }
  110. // reload reads the sets and creates new values
  111. func (m *memory) reload() error {
  112. m.Lock()
  113. // merge sets
  114. set, err := m.opts.Reader.Merge(m.sets...)
  115. if err != nil {
  116. m.Unlock()
  117. return err
  118. }
  119. // set values
  120. m.vals, _ = m.opts.Reader.Values(set)
  121. m.snap = &loader.Snapshot{
  122. ChangeSet: set,
  123. Version: genVer(),
  124. }
  125. m.Unlock()
  126. // update watchers
  127. m.update()
  128. return nil
  129. }
  130. func (m *memory) update() {
  131. watchers := make([]*watcher, 0, m.watchers.Len())
  132. m.RLock()
  133. for e := m.watchers.Front(); e != nil; e = e.Next() {
  134. watchers = append(watchers, e.Value.(*watcher))
  135. }
  136. vals := m.vals
  137. snap := m.snap
  138. m.RUnlock()
  139. for _, w := range watchers {
  140. if w.version >= snap.Version {
  141. continue
  142. }
  143. uv := updateValue{
  144. version: m.snap.Version,
  145. value: vals.Get(w.path...),
  146. }
  147. select {
  148. case w.updates <- uv:
  149. default:
  150. }
  151. }
  152. }
  153. // Snapshot returns a snapshot of the current loaded config
  154. func (m *memory) Snapshot() (*loader.Snapshot, error) {
  155. if m.loaded() {
  156. m.RLock()
  157. snap := loader.Copy(m.snap)
  158. m.RUnlock()
  159. return snap, nil
  160. }
  161. // not loaded, sync
  162. if err := m.Sync(); err != nil {
  163. return nil, err
  164. }
  165. // make copy
  166. m.RLock()
  167. snap := loader.Copy(m.snap)
  168. m.RUnlock()
  169. return snap, nil
  170. }
  171. // Sync loads all the sources, calls the parser and updates the config
  172. func (m *memory) Sync() error {
  173. //nolint:prealloc
  174. var sets []*source.ChangeSet
  175. m.Lock()
  176. // read the source
  177. var gerr []string
  178. for _, source := range m.sources {
  179. ch, err := source.Read()
  180. if err != nil {
  181. gerr = append(gerr, err.Error())
  182. continue
  183. }
  184. sets = append(sets, ch)
  185. }
  186. // merge sets
  187. set, err := m.opts.Reader.Merge(sets...)
  188. if err != nil {
  189. m.Unlock()
  190. return err
  191. }
  192. // set values
  193. vals, err := m.opts.Reader.Values(set)
  194. if err != nil {
  195. m.Unlock()
  196. return err
  197. }
  198. m.vals = vals
  199. m.snap = &loader.Snapshot{
  200. ChangeSet: set,
  201. Version: genVer(),
  202. }
  203. m.Unlock()
  204. // update watchers
  205. m.update()
  206. if len(gerr) > 0 {
  207. return fmt.Errorf("source loading errors: %s", strings.Join(gerr, "\n"))
  208. }
  209. return nil
  210. }
  211. func (m *memory) Close() error {
  212. select {
  213. case <-m.exit:
  214. return nil
  215. default:
  216. close(m.exit)
  217. }
  218. return nil
  219. }
  220. func (m *memory) Get(path ...string) (reader.Value, error) {
  221. if !m.loaded() {
  222. if err := m.Sync(); err != nil {
  223. return nil, err
  224. }
  225. }
  226. m.Lock()
  227. defer m.Unlock()
  228. // did sync actually work?
  229. if m.vals != nil {
  230. return m.vals.Get(path...), nil
  231. }
  232. // assuming vals is nil
  233. // create new vals
  234. ch := m.snap.ChangeSet
  235. // we are truly screwed, trying to load in a hacked way
  236. v, err := m.opts.Reader.Values(ch)
  237. if err != nil {
  238. return nil, err
  239. }
  240. // lets set it just because
  241. m.vals = v
  242. if m.vals != nil {
  243. return m.vals.Get(path...), nil
  244. }
  245. // ok we're going hardcore now
  246. return nil, errors.New("no values")
  247. }
  248. func (m *memory) Load(sources ...source.Source) error {
  249. var gerrors []string
  250. for _, source := range sources {
  251. set, err := source.Read()
  252. if err != nil {
  253. gerrors = append(gerrors,
  254. fmt.Sprintf("error loading source %s: %v",
  255. source,
  256. err))
  257. // continue processing
  258. continue
  259. }
  260. m.Lock()
  261. m.sources = append(m.sources, source)
  262. m.sets = append(m.sets, set)
  263. idx := len(m.sets) - 1
  264. m.Unlock()
  265. go m.watch(idx, source)
  266. }
  267. if err := m.reload(); err != nil {
  268. gerrors = append(gerrors, err.Error())
  269. }
  270. // Return errors
  271. if len(gerrors) != 0 {
  272. return errors.New(strings.Join(gerrors, "\n"))
  273. }
  274. return nil
  275. }
  276. func (m *memory) Watch(path ...string) (loader.Watcher, error) {
  277. value, err := m.Get(path...)
  278. if err != nil {
  279. return nil, err
  280. }
  281. m.Lock()
  282. w := &watcher{
  283. exit: make(chan bool),
  284. path: path,
  285. value: value,
  286. reader: m.opts.Reader,
  287. updates: make(chan updateValue, 1),
  288. version: m.snap.Version,
  289. }
  290. e := m.watchers.PushBack(w)
  291. m.Unlock()
  292. go func() {
  293. <-w.exit
  294. m.Lock()
  295. m.watchers.Remove(e)
  296. m.Unlock()
  297. }()
  298. return w, nil
  299. }
  300. func (m *memory) String() string {
  301. return "memory"
  302. }
  303. func (w *watcher) Next() (*loader.Snapshot, error) {
  304. update := func(v reader.Value) *loader.Snapshot {
  305. w.value = v
  306. cs := &source.ChangeSet{
  307. Data: v.Bytes(),
  308. Format: w.reader.String(),
  309. Source: "memory",
  310. Timestamp: time.Now(),
  311. }
  312. cs.Checksum = cs.Sum()
  313. return &loader.Snapshot{
  314. ChangeSet: cs,
  315. Version: w.version,
  316. }
  317. }
  318. for {
  319. select {
  320. case <-w.exit:
  321. return nil, errors.New("watcher stopped")
  322. case uv := <-w.updates:
  323. if uv.version <= w.version {
  324. continue
  325. }
  326. v := uv.value
  327. w.version = uv.version
  328. if bytes.Equal(w.value.Bytes(), v.Bytes()) {
  329. continue
  330. }
  331. return update(v), nil
  332. }
  333. }
  334. }
  335. func (w *watcher) Stop() error {
  336. select {
  337. case <-w.exit:
  338. default:
  339. close(w.exit)
  340. close(w.updates)
  341. }
  342. return nil
  343. }
  344. func genVer() string {
  345. return fmt.Sprintf("%d", time.Now().UnixNano())
  346. }
  347. func NewLoader(opts ...loader.Option) loader.Loader {
  348. options := loader.Options{
  349. Reader: json.NewReader(),
  350. }
  351. for _, o := range opts {
  352. o(&options)
  353. }
  354. m := &memory{
  355. exit: make(chan bool),
  356. opts: options,
  357. watchers: list.New(),
  358. sources: options.Source,
  359. }
  360. m.sets = make([]*source.ChangeSet, len(options.Source))
  361. for i, s := range options.Source {
  362. m.sets[i] = &source.ChangeSet{Source: s.String()}
  363. go m.watch(i, s)
  364. }
  365. return m
  366. }