GraphQL API and utilities for the rpdata project
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

118 lines
2.6 KiB

package change
import (
"context"
"errors"
"sync"
"github.com/globalsign/mgo"
)
// ErrInvalidIndex is returned when a timeline subscriber tries to get a value
// with a negative id
var ErrInvalidIndex = errors.New("change.timeline: Invalid ID")
// ErrWaitRequired is returned when a timeline subscriber requests an entry that
// has not yet been pushed
var ErrWaitRequired = errors.New("change.timeline: Waiting required")
// ErrExpired is returned when a timeline subscriber requests a change that happened
// too long ago for it to be in the database.
var ErrExpired = errors.New("change.timeline: Change has expired")
type timeline struct {
mutex sync.Mutex
firstIndex int64
currentID int64
cache []Change
notifyCh chan struct{}
}
func (timeline *timeline) Get(ctx context.Context, id int64) (Change, error) {
change, err := timeline.get(id, false)
if err == ErrWaitRequired {
if err := timeline.wait(ctx); err != nil {
return Change{}, err
}
return timeline.get(id, true)
} else if err == mgo.ErrNotFound {
return Change{}, ErrExpired
}
return change, nil
}
func (timeline *timeline) push(change Change) {
timeline.mutex.Lock()
timeline.cache = append(timeline.cache, change)
if len(timeline.cache) > 128 {
copy(timeline.cache, timeline.cache[32:])
timeline.cache = timeline.cache[:len(timeline.cache)-32]
}
// Only update the currentID if this is more recent than the existing ones
if int64(change.ID) > timeline.currentID {
timeline.currentID = int64(change.ID)
}
if timeline.notifyCh != nil {
close(timeline.notifyCh)
timeline.notifyCh = nil
}
timeline.mutex.Unlock()
}
func (timeline *timeline) get(id int64, hasWaited bool) (Change, error) {
if id < 0 {
return Change{}, ErrInvalidIndex
}
timeline.mutex.Lock()
if !hasWaited && id > timeline.currentID {
timeline.mutex.Unlock()
return Change{}, ErrWaitRequired
}
for _, change := range timeline.cache {
if change.ID == int(id) {
timeline.mutex.Unlock()
return change, nil
}
}
timeline.mutex.Unlock()
return FindID(int(id))
}
func (timeline *timeline) wait(ctx context.Context) error {
timeline.mutex.Lock()
if timeline.notifyCh == nil {
timeline.notifyCh = make(chan struct{})
}
ch := timeline.notifyCh
timeline.mutex.Unlock()
select {
case <-ctx.Done():
return ctx.Err()
case <-ch:
return nil
}
}
// Timeline has operations for subscribing to changes in real time
var Timeline = &timeline{}
// InitializeTimeline fills in the timeline with existing entries
func InitializeTimeline() {
changes, err := List()
if err != nil {
return
}
for _, change := range changes {
Timeline.push(change)
}
}