123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309 |
- package config
- import (
- "bytes"
- "sync"
- "time"
- "gogs.baozhida.cn/zoie/OAuth-core/config/loader"
- "gogs.baozhida.cn/zoie/OAuth-core/config/loader/memory"
- "gogs.baozhida.cn/zoie/OAuth-core/config/reader"
- "gogs.baozhida.cn/zoie/OAuth-core/config/reader/json"
- "gogs.baozhida.cn/zoie/OAuth-core/config/source"
- )
- type config struct {
- exit chan bool
- opts Options
- sync.RWMutex
- // the current snapshot
- snap *loader.Snapshot
- // the current values
- vals reader.Values
- }
- type watcher struct {
- lw loader.Watcher
- rd reader.Reader
- path []string
- value reader.Value
- }
- func newConfig(opts ...Option) (Config, error) {
- var c config
- err := c.Init(opts...)
- if err != nil {
- return nil, err
- }
- go c.run()
- return &c, nil
- }
- func (c *config) Init(opts ...Option) error {
- c.opts = Options{
- Reader: json.NewReader(),
- }
- c.exit = make(chan bool)
- for _, o := range opts {
- o(&c.opts)
- }
- // default loader uses the configured reader
- if c.opts.Loader == nil {
- c.opts.Loader = memory.NewLoader(memory.WithReader(c.opts.Reader))
- }
- err := c.opts.Loader.Load(c.opts.Source...)
- if err != nil {
- return err
- }
- c.snap, err = c.opts.Loader.Snapshot()
- if err != nil {
- return err
- }
- c.vals, err = c.opts.Reader.Values(c.snap.ChangeSet)
- if err != nil {
- return err
- }
- if c.opts.Entity != nil {
- _ = c.vals.Scan(c.opts.Entity)
- }
- return nil
- }
- func (c *config) Options() Options {
- return c.opts
- }
- func (c *config) run() {
- watch := func(w loader.Watcher) error {
- for {
- // get changeset
- snap, err := w.Next()
- if err != nil {
- return err
- }
- c.Lock()
- if c.snap.Version >= snap.Version {
- c.Unlock()
- continue
- }
- // save
- c.snap = snap
- // set values
- c.vals, _ = c.opts.Reader.Values(snap.ChangeSet)
- if c.opts.Entity != nil {
- _ = c.vals.Scan(c.opts.Entity)
- c.opts.Entity.OnChange()
- }
- c.Unlock()
- }
- }
- for {
- w, err := c.opts.Loader.Watch()
- if err != nil {
- time.Sleep(time.Second)
- continue
- }
- done := make(chan bool)
- // the stop watch func
- go func() {
- select {
- case <-done:
- case <-c.exit:
- }
- _ = w.Stop()
- }()
- // block watch
- if err := watch(w); err != nil {
- // do something better
- time.Sleep(time.Second)
- }
- // close done chan
- close(done)
- // if the config is closed exit
- select {
- case <-c.exit:
- return
- default:
- }
- }
- }
- func (c *config) Map() map[string]interface{} {
- c.RLock()
- defer c.RUnlock()
- return c.vals.Map()
- }
- func (c *config) Scan(v interface{}) error {
- c.RLock()
- defer c.RUnlock()
- return c.vals.Scan(v)
- }
- // Sync sync loads all the sources, calls the parser and updates the config
- func (c *config) Sync() error {
- if err := c.opts.Loader.Sync(); err != nil {
- return err
- }
- snap, err := c.opts.Loader.Snapshot()
- if err != nil {
- return err
- }
- c.Lock()
- defer c.Unlock()
- c.snap = snap
- vals, err := c.opts.Reader.Values(snap.ChangeSet)
- if err != nil {
- return err
- }
- c.vals = vals
- return nil
- }
- func (c *config) Close() error {
- select {
- case <-c.exit:
- return nil
- default:
- close(c.exit)
- }
- return nil
- }
- func (c *config) Get(path ...string) reader.Value {
- c.RLock()
- defer c.RUnlock()
- // did sync actually work?
- if c.vals != nil {
- return c.vals.Get(path...)
- }
- // no value
- return newValue()
- }
- func (c *config) Set(val interface{}, path ...string) {
- c.Lock()
- defer c.Unlock()
- if c.vals != nil {
- c.vals.Set(val, path...)
- }
- return
- }
- func (c *config) Del(path ...string) {
- c.Lock()
- defer c.Unlock()
- if c.vals != nil {
- c.vals.Del(path...)
- }
- return
- }
- func (c *config) Bytes() []byte {
- c.RLock()
- defer c.RUnlock()
- if c.vals == nil {
- return []byte{}
- }
- return c.vals.Bytes()
- }
- func (c *config) Load(sources ...source.Source) error {
- if err := c.opts.Loader.Load(sources...); err != nil {
- return err
- }
- snap, err := c.opts.Loader.Snapshot()
- if err != nil {
- return err
- }
- c.Lock()
- defer c.Unlock()
- c.snap = snap
- vals, err := c.opts.Reader.Values(snap.ChangeSet)
- if err != nil {
- return err
- }
- c.vals = vals
- return nil
- }
- func (c *config) Watch(path ...string) (Watcher, error) {
- value := c.Get(path...)
- w, err := c.opts.Loader.Watch(path...)
- if err != nil {
- return nil, err
- }
- return &watcher{
- lw: w,
- rd: c.opts.Reader,
- path: path,
- value: value,
- }, nil
- }
- func (c *config) String() string {
- return "config"
- }
- func (w *watcher) Next() (reader.Value, error) {
- for {
- s, err := w.lw.Next()
- if err != nil {
- return nil, err
- }
- // only process changes
- if bytes.Equal(w.value.Bytes(), s.ChangeSet.Data) {
- continue
- }
- v, err := w.rd.Values(s.ChangeSet)
- if err != nil {
- return nil, err
- }
- w.value = v.Get()
- return w.value, nil
- }
- }
- func (w *watcher) Stop() error {
- return w.lw.Stop()
- }
|