GraphQL API and utilities for the rpdata project
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

151 lines
3.2 KiB

  1. package services
  2. import (
  3. "context"
  4. "git.aiterp.net/rpdata/api/internal/auth"
  5. "git.aiterp.net/rpdata/api/internal/notifier"
  6. "git.aiterp.net/rpdata/api/models"
  7. "git.aiterp.net/rpdata/api/repositories"
  8. "log"
  9. "sync"
  10. "time"
  11. )
  12. type ChangeService struct {
  13. changes repositories.ChangeRepository
  14. mutex sync.RWMutex
  15. buffer []models.Change
  16. offset uint64
  17. notifier notifier.Notifier
  18. submitQueue chan *models.Change
  19. loopStarted bool
  20. }
  21. func (s *ChangeService) Find(ctx context.Context, id string) (*models.Change, error) {
  22. return s.changes.Find(ctx, id)
  23. }
  24. func (s *ChangeService) List(ctx context.Context, filter models.ChangeFilter) ([]*models.Change, error) {
  25. return s.changes.List(ctx, filter)
  26. }
  27. func (s *ChangeService) Submit(ctx context.Context, model models.ChangeModel, op string, listed bool, keys []models.ChangeKey, objects ...interface{}) {
  28. token := auth.TokenFromContext(ctx)
  29. if token == nil {
  30. panic("no token!")
  31. }
  32. change := &models.Change{
  33. Model: model,
  34. Op: op,
  35. Author: token.UserID,
  36. Listed: listed,
  37. Keys: keys,
  38. Date: time.Now(),
  39. }
  40. // Do not allow * keys in unlisted objects, but allow laziness on the
  41. // call site to not bloat the code too much.
  42. if !listed {
  43. deleteList := make([]int, 0, 2)
  44. for i, key := range keys {
  45. if key.ID == "*" {
  46. deleteList = append(deleteList, i-len(deleteList))
  47. }
  48. }
  49. for _, index := range deleteList {
  50. change.Keys = append(change.Keys[:index], change.Keys[index+1:]...)
  51. }
  52. }
  53. for _, obj := range objects {
  54. if !change.AddObject(obj) {
  55. log.Printf("Cannot add object of type %T to change", obj)
  56. }
  57. }
  58. s.mutex.Lock()
  59. if !s.loopStarted {
  60. s.loopStarted = true
  61. s.submitQueue = make(chan *models.Change, 64)
  62. go s.loop()
  63. }
  64. s.mutex.Unlock()
  65. s.submitQueue <- change
  66. }
  67. func (s *ChangeService) Subscribe(ctx context.Context, filter models.ChangeFilter) <-chan *models.Change {
  68. channel := make(chan *models.Change)
  69. go func() {
  70. defer close(channel)
  71. s.mutex.RLock()
  72. pos := s.offset + uint64(len(s.buffer))
  73. slice := make([]models.Change, 32)
  74. s.mutex.RUnlock()
  75. count := 0
  76. for {
  77. s.mutex.RLock()
  78. nextPos := s.offset + uint64(len(s.buffer))
  79. length := nextPos - pos
  80. if length > 0 {
  81. index := pos - s.offset
  82. pos = nextPos
  83. copy(slice, s.buffer[index:])
  84. }
  85. ch := s.notifier.C()
  86. s.mutex.RUnlock()
  87. for _, change := range slice[:length] {
  88. if change.PassesFilter(filter) {
  89. channel <- &change
  90. count++
  91. if filter.Limit != nil && count == *filter.Limit {
  92. return
  93. }
  94. }
  95. }
  96. select {
  97. case <-ch:
  98. case <-ctx.Done():
  99. return
  100. }
  101. }
  102. }()
  103. return channel
  104. }
  105. func (s *ChangeService) loop() {
  106. for change := range s.submitQueue {
  107. timeout, cancel := context.WithTimeout(context.Background(), time.Second*15)
  108. change, err := s.changes.Insert(timeout, *change)
  109. if err != nil {
  110. log.Println("Failed to insert change:")
  111. } else {
  112. log.Println("Change", change.ID, "inserted.")
  113. }
  114. s.mutex.Lock()
  115. s.buffer = append(s.buffer, *change)
  116. if len(s.buffer) > 16 {
  117. copy(s.buffer, s.buffer[8:])
  118. s.buffer = s.buffer[:len(s.buffer)-8]
  119. s.offset += 8
  120. }
  121. s.mutex.Unlock()
  122. s.notifier.Broadcast()
  123. cancel()
  124. }
  125. }