GraphQL API and utilities for the rpdata project
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

133 lines
2.7 KiB

  1. package services
  2. import (
  3. "context"
  4. "git.aiterp.net/rpdata/api/internal/auth"
  5. "git.aiterp.net/rpdata/api/internal/notifier"
  6. "git.aiterp.net/rpdata/api/models"
  7. "git.aiterp.net/rpdata/api/repositories"
  8. "log"
  9. "sync"
  10. "time"
  11. )
  12. type ChangeService struct {
  13. changes repositories.ChangeRepository
  14. mutex sync.RWMutex
  15. buffer []models.Change
  16. offset uint64
  17. notifier notifier.Notifier
  18. submitQueue chan *models.Change
  19. loopStarted bool
  20. }
  21. func (s *ChangeService) Find(ctx context.Context, id string) (*models.Change, error) {
  22. return s.changes.Find(ctx, id)
  23. }
  24. func (s *ChangeService) List(ctx context.Context, filter models.ChangeFilter) ([]*models.Change, error) {
  25. return s.changes.List(ctx, filter)
  26. }
  27. func (s *ChangeService) Submit(ctx context.Context, model models.ChangeModel, op string, listed bool, keys []models.ChangeKey, objects ...interface{}) {
  28. token := auth.TokenFromContext(ctx)
  29. if token == nil {
  30. panic("no token!")
  31. }
  32. change := &models.Change{
  33. Model: model,
  34. Op: op,
  35. Author: token.UserID,
  36. Listed: listed,
  37. Keys: keys,
  38. }
  39. for _, obj := range objects {
  40. if !change.AddObject(obj) {
  41. log.Printf("Cannot add object of type %T to change", obj)
  42. }
  43. }
  44. s.mutex.Lock()
  45. if !s.loopStarted {
  46. s.loopStarted = true
  47. s.submitQueue = make(chan *models.Change, 64)
  48. go s.loop()
  49. }
  50. s.mutex.Unlock()
  51. s.submitQueue <- change
  52. }
  53. func (s *ChangeService) Subscribe(ctx context.Context, filter models.ChangeFilter) <-chan *models.Change {
  54. channel := make(chan *models.Change)
  55. go func() {
  56. defer close(channel)
  57. s.mutex.RLock()
  58. pos := s.offset + uint64(len(s.buffer))
  59. slice := make([]models.Change, 32)
  60. s.mutex.RUnlock()
  61. count := 0
  62. for {
  63. s.mutex.RLock()
  64. nextPos := s.offset + uint64(len(s.buffer))
  65. length := nextPos - pos
  66. if length > 0 {
  67. index := pos - s.offset
  68. pos = nextPos
  69. copy(slice, s.buffer[index:])
  70. }
  71. ch := s.notifier.C()
  72. s.mutex.RUnlock()
  73. for _, change := range slice[:length] {
  74. if change.PassesFilter(filter) {
  75. channel <- &change
  76. count++
  77. if filter.Limit != nil && count == *filter.Limit {
  78. return
  79. }
  80. }
  81. }
  82. select {
  83. case <-ch:
  84. case <-ctx.Done():
  85. return
  86. }
  87. }
  88. }()
  89. return channel
  90. }
  91. func (s *ChangeService) loop() {
  92. for change := range s.submitQueue {
  93. timeout, cancel := context.WithTimeout(context.Background(), time.Second*15)
  94. change, err := s.changes.Insert(timeout, *change)
  95. if err != nil {
  96. log.Println("Failed to submit change:")
  97. }
  98. s.mutex.Lock()
  99. s.buffer = append(s.buffer, *change)
  100. if len(s.buffer) > 16 {
  101. copy(s.buffer, s.buffer[8:])
  102. s.buffer = s.buffer[:len(s.buffer)-8]
  103. s.offset += 8
  104. }
  105. s.mutex.Unlock()
  106. s.notifier.Broadcast()
  107. cancel()
  108. }
  109. }