GraphQL API and utilities for the rpdata project
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

152 lines
3.2 KiB

package services
import (
"context"
"git.aiterp.net/rpdata/api/internal/notifier"
"git.aiterp.net/rpdata/api/models"
"git.aiterp.net/rpdata/api/repositories"
"log"
"sync"
"time"
)
type ChangeService struct {
changes repositories.ChangeRepository
authService *AuthService
mutex sync.RWMutex
buffer []models.Change
offset uint64
notifier notifier.Notifier
submitQueue chan *models.Change
loopStarted bool
}
func (s *ChangeService) Find(ctx context.Context, id string) (*models.Change, error) {
return s.changes.Find(ctx, id)
}
func (s *ChangeService) List(ctx context.Context, filter models.ChangeFilter) ([]*models.Change, error) {
return s.changes.List(ctx, filter)
}
func (s *ChangeService) Submit(ctx context.Context, model models.ChangeModel, op string, listed bool, keys []models.ChangeKey, objects ...interface{}) {
token := s.authService.TokenFromContext(ctx)
if token == nil {
panic("no token!")
}
change := &models.Change{
Model: model,
Op: op,
Author: token.UserID,
Listed: listed,
Keys: keys,
Date: time.Now(),
}
// Do not allow * keys in unlisted objects, but allow laziness on the
// call site to not bloat the code too much.
if !listed {
deleteList := make([]int, 0, 2)
for i, key := range keys {
if key.ID == "*" {
deleteList = append(deleteList, i-len(deleteList))
}
}
for _, index := range deleteList {
change.Keys = append(change.Keys[:index], change.Keys[index+1:]...)
}
}
for _, obj := range objects {
if !change.AddObject(obj) {
log.Printf("Cannot add object of type %T to change", obj)
}
}
s.mutex.Lock()
if !s.loopStarted {
s.loopStarted = true
s.submitQueue = make(chan *models.Change, 64)
go s.loop()
}
s.mutex.Unlock()
s.submitQueue <- change
}
func (s *ChangeService) Subscribe(ctx context.Context, filter models.ChangeFilter) <-chan *models.Change {
channel := make(chan *models.Change)
go func() {
defer close(channel)
s.mutex.RLock()
pos := s.offset + uint64(len(s.buffer))
slice := make([]models.Change, 32)
s.mutex.RUnlock()
count := 0
for {
s.mutex.RLock()
nextPos := s.offset + uint64(len(s.buffer))
length := nextPos - pos
if length > 0 {
index := pos - s.offset
pos = nextPos
copy(slice, s.buffer[index:])
}
ch := s.notifier.C()
s.mutex.RUnlock()
for _, change := range slice[:length] {
if change.PassesFilter(filter) {
channel <- &change
count++
if filter.Limit != nil && count == *filter.Limit {
return
}
}
}
select {
case <-ch:
case <-ctx.Done():
return
}
}
}()
return channel
}
func (s *ChangeService) loop() {
for change := range s.submitQueue {
timeout, cancel := context.WithTimeout(context.Background(), time.Second*5)
change, err := s.changes.Insert(timeout, *change)
if err != nil {
log.Println("Failed to insert change:")
continue
}
log.Println("Change", change.ID, "inserted.")
s.mutex.Lock()
s.buffer = append(s.buffer, *change)
if len(s.buffer) > 16 {
copy(s.buffer, s.buffer[8:])
s.buffer = s.buffer[:len(s.buffer)-8]
s.offset += 8
}
s.mutex.Unlock()
s.notifier.Broadcast()
cancel()
}
}