You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							118 lines
						
					
					
						
							2.6 KiB
						
					
					
				
			
		
		
		
			
			
			
				
					
				
				
					
				
			
		
		
	
	
							118 lines
						
					
					
						
							2.6 KiB
						
					
					
				| package change | |
| 
 | |
| import ( | |
| 	"context" | |
| 	"errors" | |
| 	"sync" | |
| 
 | |
| 	"github.com/globalsign/mgo" | |
| ) | |
| 
 | |
| // ErrInvalidIndex is returned when a timeline subscriber tries to get a value | |
| // with a negative id | |
| var ErrInvalidIndex = errors.New("change.timeline: Invalid ID") | |
| 
 | |
| // ErrWaitRequired is returned when a timeline subscriber requests an entry that | |
| // has not yet been pushed | |
| var ErrWaitRequired = errors.New("change.timeline: Waiting required") | |
| 
 | |
| // ErrExpired is returned when a timeline subscriber requests a change that happened | |
| // too long ago for it to be in the database. | |
| var ErrExpired = errors.New("change.timeline: Change has expired") | |
| 
 | |
| type timeline struct { | |
| 	mutex      sync.Mutex | |
| 	firstIndex int64 | |
| 	currentID  int64 | |
| 	cache      []Change | |
| 	notifyCh   chan struct{} | |
| } | |
| 
 | |
| func (timeline *timeline) Get(ctx context.Context, id int64) (Change, error) { | |
| 	change, err := timeline.get(id, false) | |
| 	if err == ErrWaitRequired { | |
| 		if err := timeline.wait(ctx); err != nil { | |
| 			return Change{}, err | |
| 		} | |
| 
 | |
| 		return timeline.get(id, true) | |
| 	} else if err == mgo.ErrNotFound { | |
| 		return Change{}, ErrExpired | |
| 	} | |
| 
 | |
| 	return change, nil | |
| } | |
| 
 | |
| func (timeline *timeline) push(change Change) { | |
| 	timeline.mutex.Lock() | |
| 	timeline.cache = append(timeline.cache, change) | |
| 	if len(timeline.cache) > 128 { | |
| 		copy(timeline.cache, timeline.cache[32:]) | |
| 		timeline.cache = timeline.cache[:len(timeline.cache)-32] | |
| 	} | |
| 
 | |
| 	// Only update the currentID if this is more recent than the existing ones | |
| 	if int64(change.ID) > timeline.currentID { | |
| 		timeline.currentID = int64(change.ID) | |
| 	} | |
| 
 | |
| 	if timeline.notifyCh != nil { | |
| 		close(timeline.notifyCh) | |
| 		timeline.notifyCh = nil | |
| 	} | |
| 	timeline.mutex.Unlock() | |
| } | |
| 
 | |
| func (timeline *timeline) get(id int64, hasWaited bool) (Change, error) { | |
| 	if id < 0 { | |
| 		return Change{}, ErrInvalidIndex | |
| 	} | |
| 
 | |
| 	timeline.mutex.Lock() | |
| 	if !hasWaited && id > timeline.currentID { | |
| 		timeline.mutex.Unlock() | |
| 		return Change{}, ErrWaitRequired | |
| 	} | |
| 
 | |
| 	for _, change := range timeline.cache { | |
| 		if change.ID == int(id) { | |
| 			timeline.mutex.Unlock() | |
| 			return change, nil | |
| 		} | |
| 	} | |
| 	timeline.mutex.Unlock() | |
| 
 | |
| 	return FindID(int(id)) | |
| } | |
| 
 | |
| func (timeline *timeline) wait(ctx context.Context) error { | |
| 	timeline.mutex.Lock() | |
| 	if timeline.notifyCh == nil { | |
| 		timeline.notifyCh = make(chan struct{}) | |
| 	} | |
| 
 | |
| 	ch := timeline.notifyCh | |
| 	timeline.mutex.Unlock() | |
| 
 | |
| 	select { | |
| 	case <-ctx.Done(): | |
| 		return ctx.Err() | |
| 	case <-ch: | |
| 		return nil | |
| 	} | |
| } | |
| 
 | |
| // Timeline has operations for subscribing to changes in real time | |
| var Timeline = &timeline{} | |
| 
 | |
| // InitializeTimeline fills in the timeline with existing entries | |
| func InitializeTimeline() { | |
| 	changes, err := List() | |
| 	if err != nil { | |
| 		return | |
| 	} | |
| 
 | |
| 	for _, change := range changes { | |
| 		Timeline.push(change) | |
| 	} | |
| }
 |