package change import ( "context" "errors" "sync" "github.com/globalsign/mgo" ) // ErrInvalidIndex is returned when a timeline subscriber tries to get a value // with a negative id var ErrInvalidIndex = errors.New("change.timeline: Invalid ID") // ErrWaitRequired is returned when a timeline subscriber requests an entry that // has not yet been pushed var ErrWaitRequired = errors.New("change.timeline: Waiting required") // ErrExpired is returned when a timeline subscriber requests a change that happened // too long ago for it to be in the database. var ErrExpired = errors.New("change.timeline: Change has expired") type timeline struct { mutex sync.Mutex firstIndex int64 currentID int64 cache []Change notifyCh chan struct{} } func (timeline *timeline) Get(ctx context.Context, id int64) (Change, error) { change, err := timeline.get(id, false) if err == ErrWaitRequired { if err := timeline.wait(ctx); err != nil { return Change{}, err } return timeline.get(id, true) } else if err == mgo.ErrNotFound { return Change{}, ErrExpired } return change, nil } func (timeline *timeline) push(change Change) { timeline.mutex.Lock() timeline.cache = append(timeline.cache, change) if len(timeline.cache) > 128 { copy(timeline.cache, timeline.cache[32:]) timeline.cache = timeline.cache[:len(timeline.cache)-32] } // Only update the currentID if this is more recent than the existing ones if int64(change.ID) > timeline.currentID { timeline.currentID = int64(change.ID) } if timeline.notifyCh != nil { close(timeline.notifyCh) timeline.notifyCh = nil } timeline.mutex.Unlock() } func (timeline *timeline) get(id int64, hasWaited bool) (Change, error) { if id < 0 { return Change{}, ErrInvalidIndex } timeline.mutex.Lock() if !hasWaited && id > timeline.currentID { timeline.mutex.Unlock() return Change{}, ErrWaitRequired } for _, change := range timeline.cache { if change.ID == int(id) { timeline.mutex.Unlock() return change, nil } } timeline.mutex.Unlock() return FindID(int(id)) } func (timeline *timeline) wait(ctx context.Context) error { timeline.mutex.Lock() if timeline.notifyCh == nil { timeline.notifyCh = make(chan struct{}) } ch := timeline.notifyCh timeline.mutex.Unlock() select { case <-ctx.Done(): return ctx.Err() case <-ch: return nil } } // Timeline has operations for subscribing to changes in real time var Timeline = &timeline{} // InitializeTimeline fills in the timeline with existing entries func InitializeTimeline() { changes, err := List() if err != nil { return } for _, change := range changes { Timeline.push(change) } }