package services import ( "context" "git.aiterp.net/rpdata/api/internal/notifier" "git.aiterp.net/rpdata/api/models" "git.aiterp.net/rpdata/api/repositories" "log" "sync" "time" ) type ChangeService struct { changes repositories.ChangeRepository authService *AuthService mutex sync.RWMutex buffer []models.Change offset uint64 notifier notifier.Notifier submitQueue chan *models.Change loopStarted bool } func (s *ChangeService) Find(ctx context.Context, id string) (*models.Change, error) { return s.changes.Find(ctx, id) } func (s *ChangeService) List(ctx context.Context, filter models.ChangeFilter) ([]*models.Change, error) { return s.changes.List(ctx, filter) } func (s *ChangeService) Submit(ctx context.Context, model models.ChangeModel, op string, listed bool, keys []models.ChangeKey, objects ...interface{}) { token := s.authService.TokenFromContext(ctx) if token == nil { panic("no token!") } change := &models.Change{ Model: model, Op: op, Author: token.UserID, Listed: listed, Keys: keys, Date: time.Now(), } // Do not allow * keys in unlisted objects, but allow laziness on the // call site to not bloat the code too much. if !listed { deleteList := make([]int, 0, 2) for i, key := range keys { if key.ID == "*" { deleteList = append(deleteList, i-len(deleteList)) } } for _, index := range deleteList { change.Keys = append(change.Keys[:index], change.Keys[index+1:]...) } } for _, obj := range objects { if !change.AddObject(obj) { log.Printf("Cannot add object of type %T to change", obj) } } s.mutex.Lock() if !s.loopStarted { s.loopStarted = true s.submitQueue = make(chan *models.Change, 64) go s.loop() } s.mutex.Unlock() s.submitQueue <- change } func (s *ChangeService) Subscribe(ctx context.Context, filter models.ChangeFilter) <-chan *models.Change { channel := make(chan *models.Change) go func() { defer close(channel) s.mutex.RLock() pos := s.offset + uint64(len(s.buffer)) slice := make([]models.Change, 32) s.mutex.RUnlock() count := 0 for { s.mutex.RLock() nextPos := s.offset + uint64(len(s.buffer)) length := nextPos - pos if length > 0 { index := pos - s.offset pos = nextPos copy(slice, s.buffer[index:]) } ch := s.notifier.C() s.mutex.RUnlock() for _, change := range slice[:length] { if change.PassesFilter(filter) { channel <- &change count++ if filter.Limit != nil && count == *filter.Limit { return } } } select { case <-ch: case <-ctx.Done(): return } } }() return channel } func (s *ChangeService) loop() { for change := range s.submitQueue { timeout, cancel := context.WithTimeout(context.Background(), time.Second*5) change, err := s.changes.Insert(timeout, *change) if err != nil { log.Println("Failed to insert change:") continue } log.Println("Change", change.ID, "inserted.") s.mutex.Lock() s.buffer = append(s.buffer, *change) if len(s.buffer) > 16 { copy(s.buffer, s.buffer[8:]) s.buffer = s.buffer[:len(s.buffer)-8] s.offset += 8 } s.mutex.Unlock() s.notifier.Broadcast() cancel() } }