|
|
package services
import ( "context" "git.aiterp.net/rpdata/api/internal/notifier" "git.aiterp.net/rpdata/api/models" "git.aiterp.net/rpdata/api/repositories" "log" "sync" "time" )
type ChangeService struct { changes repositories.ChangeRepository authService *AuthService
mutex sync.RWMutex buffer []models.Change offset uint64 notifier notifier.Notifier submitQueue chan *models.Change loopStarted bool }
func (s *ChangeService) Find(ctx context.Context, id string) (*models.Change, error) { return s.changes.Find(ctx, id) }
func (s *ChangeService) List(ctx context.Context, filter models.ChangeFilter) ([]*models.Change, error) { return s.changes.List(ctx, filter) }
func (s *ChangeService) Submit(ctx context.Context, model models.ChangeModel, op string, listed bool, keys []models.ChangeKey, objects ...interface{}) { token := s.authService.TokenFromContext(ctx) if token == nil { panic("no token!") }
change := &models.Change{ Model: model, Op: op, Author: token.UserID, Listed: listed, Keys: keys, Date: time.Now(), }
// Do not allow * keys in unlisted objects, but allow laziness on the
// call site to not bloat the code too much.
if !listed { deleteList := make([]int, 0, 2) for i, key := range keys { if key.ID == "*" { deleteList = append(deleteList, i-len(deleteList)) } }
for _, index := range deleteList { change.Keys = append(change.Keys[:index], change.Keys[index+1:]...) } }
for _, obj := range objects { if !change.AddObject(obj) { log.Printf("Cannot add object of type %T to change", obj) } }
s.mutex.Lock() if !s.loopStarted { s.loopStarted = true s.submitQueue = make(chan *models.Change, 64) go s.loop() } s.mutex.Unlock()
s.submitQueue <- change }
func (s *ChangeService) Subscribe(ctx context.Context, filter models.ChangeFilter) <-chan *models.Change { channel := make(chan *models.Change)
go func() { defer close(channel)
s.mutex.RLock() pos := s.offset + uint64(len(s.buffer)) slice := make([]models.Change, 32) s.mutex.RUnlock()
count := 0
for { s.mutex.RLock() nextPos := s.offset + uint64(len(s.buffer)) length := nextPos - pos if length > 0 { index := pos - s.offset
pos = nextPos copy(slice, s.buffer[index:]) } ch := s.notifier.C() s.mutex.RUnlock()
for _, change := range slice[:length] { if change.PassesFilter(filter) { channel <- &change
count++ if filter.Limit != nil && count == *filter.Limit { return } } }
select { case <-ch: case <-ctx.Done(): return } } }()
return channel }
func (s *ChangeService) loop() { for change := range s.submitQueue { timeout, cancel := context.WithTimeout(context.Background(), time.Second*5)
change, err := s.changes.Insert(timeout, *change) if err != nil { log.Println("Failed to insert change:") continue }
log.Println("Change", change.ID, "inserted.")
s.mutex.Lock() s.buffer = append(s.buffer, *change) if len(s.buffer) > 16 { copy(s.buffer, s.buffer[8:]) s.buffer = s.buffer[:len(s.buffer)-8] s.offset += 8 } s.mutex.Unlock() s.notifier.Broadcast()
cancel() } }
|