|
|
package changes
import ( "context" "sync"
"git.aiterp.net/rpdata/api/models" )
var subMutex sync.Mutex var subList []*subscription
type subscription struct { Keys map[string]bool Channel chan<- *models.Change WildCard bool }
// Subscribe subscribes to all changes.
func Subscribe(ctx context.Context, keys []models.ChangeKey, wildcard bool) <-chan *models.Change { channel := make(chan *models.Change, 64) sub := &subscription{ Keys: make(map[string]bool, len(keys)), Channel: channel, WildCard: wildcard, }
for _, key := range keys { sub.Keys[mapKey(key)] = true }
subMutex.Lock() subList = append(subList, sub) subMutex.Unlock()
go func() { <-ctx.Done()
subMutex.Lock() for i := range subList { if subList[i] == sub { subList = append(subList[:i], subList[i+1:]...) break } } subMutex.Unlock() }()
return channel }
func pushToSubscribers(change models.Change) { keys := make([]string, len(change.Keys)) for i := range change.Keys { keys[i] = mapKey(change.Keys[i]) }
subMutex.Lock() SubLoop: for _, sub := range subList { changeCopy := change
if sub.WildCard && change.Listed { select { case sub.Channel <- &changeCopy: default: } } else { for _, key := range keys { if sub.Keys[key] { select { case sub.Channel <- &changeCopy: default: }
continue SubLoop } } } } subMutex.Unlock() }
func mapKey(ck models.ChangeKey) string { return ck.Model.String() + "." + ck.ID }
func init() { subList = make([]*subscription, 0, 16) }
|