GraphQL API and utilities for the rpdata project
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

136 lines
2.8 KiB

  1. package services
  2. import (
  3. "context"
  4. "git.aiterp.net/rpdata/api/internal/auth"
  5. "git.aiterp.net/rpdata/api/internal/notifier"
  6. "git.aiterp.net/rpdata/api/models"
  7. "git.aiterp.net/rpdata/api/repositories"
  8. "log"
  9. "sync"
  10. "time"
  11. )
  12. type ChangeService struct {
  13. changes repositories.ChangeRepository
  14. mutex sync.RWMutex
  15. buffer []models.Change
  16. offset uint64
  17. notifier notifier.Notifier
  18. submitQueue chan *models.Change
  19. loopStarted bool
  20. }
  21. func (s *ChangeService) Find(ctx context.Context, id string) (*models.Change, error) {
  22. return s.changes.Find(ctx, id)
  23. }
  24. func (s *ChangeService) List(ctx context.Context, filter models.ChangeFilter) ([]*models.Change, error) {
  25. return s.changes.List(ctx, filter)
  26. }
  27. func (s *ChangeService) Submit(ctx context.Context, model models.ChangeModel, op string, listed bool, keys []models.ChangeKey, objects ...interface{}) {
  28. token := auth.TokenFromContext(ctx)
  29. if token == nil {
  30. panic("no token!")
  31. }
  32. change := &models.Change{
  33. Model: model,
  34. Op: op,
  35. Author: token.UserID,
  36. Listed: listed,
  37. Keys: keys,
  38. Date: time.Now(),
  39. }
  40. for _, obj := range objects {
  41. if !change.AddObject(obj) {
  42. log.Printf("Cannot add object of type %T to change", obj)
  43. }
  44. }
  45. s.mutex.Lock()
  46. if !s.loopStarted {
  47. s.loopStarted = true
  48. s.submitQueue = make(chan *models.Change, 64)
  49. go s.loop()
  50. }
  51. s.mutex.Unlock()
  52. s.submitQueue <- change
  53. }
  54. func (s *ChangeService) Subscribe(ctx context.Context, filter models.ChangeFilter) <-chan *models.Change {
  55. channel := make(chan *models.Change)
  56. go func() {
  57. defer close(channel)
  58. s.mutex.RLock()
  59. pos := s.offset + uint64(len(s.buffer))
  60. slice := make([]models.Change, 32)
  61. s.mutex.RUnlock()
  62. count := 0
  63. for {
  64. s.mutex.RLock()
  65. nextPos := s.offset + uint64(len(s.buffer))
  66. length := nextPos - pos
  67. if length > 0 {
  68. index := pos - s.offset
  69. pos = nextPos
  70. copy(slice, s.buffer[index:])
  71. }
  72. ch := s.notifier.C()
  73. s.mutex.RUnlock()
  74. for _, change := range slice[:length] {
  75. if change.PassesFilter(filter) {
  76. channel <- &change
  77. count++
  78. if filter.Limit != nil && count == *filter.Limit {
  79. return
  80. }
  81. }
  82. }
  83. select {
  84. case <-ch:
  85. case <-ctx.Done():
  86. return
  87. }
  88. }
  89. }()
  90. return channel
  91. }
  92. func (s *ChangeService) loop() {
  93. for change := range s.submitQueue {
  94. timeout, cancel := context.WithTimeout(context.Background(), time.Second*15)
  95. change, err := s.changes.Insert(timeout, *change)
  96. if err != nil {
  97. log.Println("Failed to insert change:")
  98. } else {
  99. log.Println("Change", change.ID, "inserted.")
  100. }
  101. s.mutex.Lock()
  102. s.buffer = append(s.buffer, *change)
  103. if len(s.buffer) > 16 {
  104. copy(s.buffer, s.buffer[8:])
  105. s.buffer = s.buffer[:len(s.buffer)-8]
  106. s.offset += 8
  107. }
  108. s.mutex.Unlock()
  109. s.notifier.Broadcast()
  110. cancel()
  111. }
  112. }