GraphQL API and utilities for the rpdata project
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

90 lines
1.5 KiB

package changes
import (
"context"
"sync"
"git.aiterp.net/rpdata/api/models"
)
var subMutex sync.Mutex
var subList []*subscription
type subscription struct {
Keys map[string]bool
Channel chan<- *models.Change
WildCard bool
}
// Subscribe subscribes to all changes.
func Subscribe(ctx context.Context, keys []models.ChangeKey, wildcard bool) <-chan *models.Change {
channel := make(chan *models.Change, 64)
sub := &subscription{
Keys: make(map[string]bool, len(keys)),
Channel: channel,
WildCard: wildcard,
}
for _, key := range keys {
sub.Keys[mapKey(key)] = true
}
subMutex.Lock()
subList = append(subList, sub)
subMutex.Unlock()
go func() {
<-ctx.Done()
subMutex.Lock()
for i := range subList {
if subList[i] == sub {
subList = append(subList[:i], subList[i+1:]...)
break
}
}
subMutex.Unlock()
}()
return channel
}
func pushToSubscribers(change models.Change) {
keys := make([]string, len(change.Keys))
for i := range change.Keys {
keys[i] = mapKey(change.Keys[i])
}
subMutex.Lock()
SubLoop:
for _, sub := range subList {
changeCopy := change
if sub.WildCard && change.Listed {
select {
case sub.Channel <- &changeCopy:
default:
}
} else {
for _, key := range keys {
if sub.Keys[key] {
select {
case sub.Channel <- &changeCopy:
default:
}
continue SubLoop
}
}
}
}
subMutex.Unlock()
}
func mapKey(ck models.ChangeKey) string {
return ck.Model.String() + "." + ck.ID
}
func init() {
subList = make([]*subscription, 0, 16)
}