You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
118 lines
2.6 KiB
118 lines
2.6 KiB
package change
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"sync"
|
|
|
|
"github.com/globalsign/mgo"
|
|
)
|
|
|
|
// ErrInvalidIndex is returned when a timeline subscriber tries to get a value
|
|
// with a negative id
|
|
var ErrInvalidIndex = errors.New("change.timeline: Invalid ID")
|
|
|
|
// ErrWaitRequired is returned when a timeline subscriber requests an entry that
|
|
// has not yet been pushed
|
|
var ErrWaitRequired = errors.New("change.timeline: Waiting required")
|
|
|
|
// ErrExpired is returned when a timeline subscriber requests a change that happened
|
|
// too long ago for it to be in the database.
|
|
var ErrExpired = errors.New("change.timeline: Change has expired")
|
|
|
|
type timeline struct {
|
|
mutex sync.Mutex
|
|
firstIndex int64
|
|
currentID int64
|
|
cache []Change
|
|
notifyCh chan struct{}
|
|
}
|
|
|
|
func (timeline *timeline) Get(ctx context.Context, id int64) (Change, error) {
|
|
change, err := timeline.get(id, false)
|
|
if err == ErrWaitRequired {
|
|
if err := timeline.wait(ctx); err != nil {
|
|
return Change{}, err
|
|
}
|
|
|
|
return timeline.get(id, true)
|
|
} else if err == mgo.ErrNotFound {
|
|
return Change{}, ErrExpired
|
|
}
|
|
|
|
return change, nil
|
|
}
|
|
|
|
func (timeline *timeline) push(change Change) {
|
|
timeline.mutex.Lock()
|
|
timeline.cache = append(timeline.cache, change)
|
|
if len(timeline.cache) > 128 {
|
|
copy(timeline.cache, timeline.cache[32:])
|
|
timeline.cache = timeline.cache[:len(timeline.cache)-32]
|
|
}
|
|
|
|
// Only update the currentID if this is more recent than the existing ones
|
|
if int64(change.ID) > timeline.currentID {
|
|
timeline.currentID = int64(change.ID)
|
|
}
|
|
|
|
if timeline.notifyCh != nil {
|
|
close(timeline.notifyCh)
|
|
timeline.notifyCh = nil
|
|
}
|
|
timeline.mutex.Unlock()
|
|
}
|
|
|
|
func (timeline *timeline) get(id int64, hasWaited bool) (Change, error) {
|
|
if id < 0 {
|
|
return Change{}, ErrInvalidIndex
|
|
}
|
|
|
|
timeline.mutex.Lock()
|
|
if !hasWaited && id > timeline.currentID {
|
|
timeline.mutex.Unlock()
|
|
return Change{}, ErrWaitRequired
|
|
}
|
|
|
|
for _, change := range timeline.cache {
|
|
if change.ID == int(id) {
|
|
timeline.mutex.Unlock()
|
|
return change, nil
|
|
}
|
|
}
|
|
timeline.mutex.Unlock()
|
|
|
|
return FindID(int(id))
|
|
}
|
|
|
|
func (timeline *timeline) wait(ctx context.Context) error {
|
|
timeline.mutex.Lock()
|
|
if timeline.notifyCh == nil {
|
|
timeline.notifyCh = make(chan struct{})
|
|
}
|
|
|
|
ch := timeline.notifyCh
|
|
timeline.mutex.Unlock()
|
|
|
|
select {
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
case <-ch:
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// Timeline has operations for subscribing to changes in real time
|
|
var Timeline = &timeline{}
|
|
|
|
// InitializeTimeline fills in the timeline with existing entries
|
|
func InitializeTimeline() {
|
|
changes, err := List()
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
for _, change := range changes {
|
|
Timeline.push(change)
|
|
}
|
|
}
|