package changes import ( "context" "sync" "git.aiterp.net/rpdata/api/models" ) var subMutex sync.Mutex var subList []*subscription type subscription struct { Keys map[string]bool Channel chan<- *models.Change WildCard bool } // Subscribe subscribes to all changes. func Subscribe(ctx context.Context, keys []models.ChangeKey, wildcard bool) <-chan *models.Change { channel := make(chan *models.Change, 64) sub := &subscription{ Keys: make(map[string]bool, len(keys)), Channel: channel, WildCard: wildcard, } for _, key := range keys { sub.Keys[mapKey(key)] = true } subMutex.Lock() subList = append(subList, sub) subMutex.Unlock() go func() { <-ctx.Done() subMutex.Lock() for i := range subList { if subList[i] == sub { subList = append(subList[:i], subList[i+1:]...) break } } subMutex.Unlock() }() return channel } func pushToSubscribers(change models.Change) { keys := make([]string, len(change.Keys)) for i := range change.Keys { keys[i] = mapKey(change.Keys[i]) } subMutex.Lock() SubLoop: for _, sub := range subList { changeCopy := change if sub.WildCard && change.Listed { select { case sub.Channel <- &changeCopy: default: } } else { for _, key := range keys { if sub.Keys[key] { select { case sub.Channel <- &changeCopy: default: } continue SubLoop } } } } subMutex.Unlock() } func mapKey(ck models.ChangeKey) string { return ck.Model.String() + "." + ck.ID } func init() { subList = make([]*subscription, 0, 16) }