GraphQL API and utilities for the rpdata project
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
|
|
package change
import ( "context" "errors" "sync"
"github.com/globalsign/mgo" )
// ErrInvalidIndex is returned when a timeline subscriber tries to get a value
// with a negative id
var ErrInvalidIndex = errors.New("change.timeline: Invalid ID")
// ErrWaitRequired is returned when a timeline subscriber requests an entry that
// has not yet been pushed
var ErrWaitRequired = errors.New("change.timeline: Waiting required")
// ErrExpired is returned when a timeline subscriber requests a change that happened
// too long ago for it to be in the database.
var ErrExpired = errors.New("change.timeline: Change has expired")
type timeline struct { mutex sync.Mutex firstIndex int64 currentID int64 cache []Change notifyCh chan struct{} }
func (timeline *timeline) Get(ctx context.Context, id int64) (Change, error) { change, err := timeline.get(id, false) if err == ErrWaitRequired { if err := timeline.wait(ctx); err != nil { return Change{}, err }
return timeline.get(id, true) } else if err == mgo.ErrNotFound { return Change{}, ErrExpired }
return change, nil }
func (timeline *timeline) push(change Change) { timeline.mutex.Lock() timeline.cache = append(timeline.cache, change) if len(timeline.cache) > 128 { copy(timeline.cache, timeline.cache[32:]) timeline.cache = timeline.cache[:len(timeline.cache)-32] }
// Only update the currentID if this is more recent than the existing ones
if int64(change.ID) > timeline.currentID { timeline.currentID = int64(change.ID) }
if timeline.notifyCh != nil { close(timeline.notifyCh) timeline.notifyCh = nil } timeline.mutex.Unlock() }
func (timeline *timeline) get(id int64, hasWaited bool) (Change, error) { if id < 0 { return Change{}, ErrInvalidIndex }
timeline.mutex.Lock() if !hasWaited && id > timeline.currentID { timeline.mutex.Unlock() return Change{}, ErrWaitRequired }
for _, change := range timeline.cache { if change.ID == int(id) { timeline.mutex.Unlock() return change, nil } } timeline.mutex.Unlock()
return FindID(int(id)) }
func (timeline *timeline) wait(ctx context.Context) error { timeline.mutex.Lock() if timeline.notifyCh == nil { timeline.notifyCh = make(chan struct{}) }
ch := timeline.notifyCh timeline.mutex.Unlock()
select { case <-ctx.Done(): return ctx.Err() case <-ch: return nil } }
// Timeline has operations for subscribing to changes in real time
var Timeline = &timeline{}
// InitializeTimeline fills in the timeline with existing entries
func InitializeTimeline() { changes, err := List() if err != nil { return }
for _, change := range changes { Timeline.push(change) } }
|