GraphQL API and utilities for the rpdata project
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

118 lines
2.6 KiB

  1. package change
  2. import (
  3. "context"
  4. "errors"
  5. "sync"
  6. "github.com/globalsign/mgo"
  7. )
  8. // ErrInvalidIndex is returned when a timeline subscriber tries to get a value
  9. // with a negative id
  10. var ErrInvalidIndex = errors.New("change.timeline: Invalid ID")
  11. // ErrWaitRequired is returned when a timeline subscriber requests an entry that
  12. // has not yet been pushed
  13. var ErrWaitRequired = errors.New("change.timeline: Waiting required")
  14. // ErrExpired is returned when a timeline subscriber requests a change that happened
  15. // too long ago for it to be in the database.
  16. var ErrExpired = errors.New("change.timeline: Change has expired")
  17. type timeline struct {
  18. mutex sync.Mutex
  19. firstIndex int64
  20. currentID int64
  21. cache []Change
  22. notifyCh chan struct{}
  23. }
  24. func (timeline *timeline) Get(ctx context.Context, id int64) (Change, error) {
  25. change, err := timeline.get(id, false)
  26. if err == ErrWaitRequired {
  27. if err := timeline.wait(ctx); err != nil {
  28. return Change{}, err
  29. }
  30. return timeline.get(id, true)
  31. } else if err == mgo.ErrNotFound {
  32. return Change{}, ErrExpired
  33. }
  34. return change, nil
  35. }
  36. func (timeline *timeline) push(change Change) {
  37. timeline.mutex.Lock()
  38. timeline.cache = append(timeline.cache, change)
  39. if len(timeline.cache) > 128 {
  40. copy(timeline.cache, timeline.cache[32:])
  41. timeline.cache = timeline.cache[:len(timeline.cache)-32]
  42. }
  43. // Only update the currentID if this is more recent than the existing ones
  44. if int64(change.ID) > timeline.currentID {
  45. timeline.currentID = int64(change.ID)
  46. }
  47. if timeline.notifyCh != nil {
  48. close(timeline.notifyCh)
  49. timeline.notifyCh = nil
  50. }
  51. timeline.mutex.Unlock()
  52. }
  53. func (timeline *timeline) get(id int64, hasWaited bool) (Change, error) {
  54. if id < 0 {
  55. return Change{}, ErrInvalidIndex
  56. }
  57. timeline.mutex.Lock()
  58. if !hasWaited && id > timeline.currentID {
  59. timeline.mutex.Unlock()
  60. return Change{}, ErrWaitRequired
  61. }
  62. for _, change := range timeline.cache {
  63. if change.ID == int(id) {
  64. timeline.mutex.Unlock()
  65. return change, nil
  66. }
  67. }
  68. timeline.mutex.Unlock()
  69. return FindID(int(id))
  70. }
  71. func (timeline *timeline) wait(ctx context.Context) error {
  72. timeline.mutex.Lock()
  73. if timeline.notifyCh == nil {
  74. timeline.notifyCh = make(chan struct{})
  75. }
  76. ch := timeline.notifyCh
  77. timeline.mutex.Unlock()
  78. select {
  79. case <-ctx.Done():
  80. return ctx.Err()
  81. case <-ch:
  82. return nil
  83. }
  84. }
  85. // Timeline has operations for subscribing to changes in real time
  86. var Timeline = &timeline{}
  87. // InitializeTimeline fills in the timeline with existing entries
  88. func InitializeTimeline() {
  89. changes, err := List()
  90. if err != nil {
  91. return
  92. }
  93. for _, change := range changes {
  94. Timeline.push(change)
  95. }
  96. }