package lucifer3 import ( "fmt" "strings" "sync" "sync/atomic" ) type ServiceKey struct{} type Event interface { EventDescription() string } // A TriggerEvent is a catch-all for events that may trigger a command from the event handler. type TriggerEvent interface { Event TriggerKind() string TriggerValue(key string) (string, bool) } type Command interface { CommandDescription() string } type EventBus struct { mu sync.Mutex listeners []*serviceListener privilegedList []ActiveService signal chan struct{} setStates int32 } // JoinCallback joins the event bus for a moment. func (b *EventBus) JoinCallback(cb func(event Event) bool) { b.Join(newCallbackService(cb)) } func (b *EventBus) Join(service Service) { // Take the signal here so that it will receive the events sent by the calling function // so that they're processed without having to wait for a new event. signal := b.signalCh() listener := &serviceListener{ bus: b, queue: make([]serviceMessage, 0, 16), service: service, } go listener.run(signal) b.mu.Lock() b.listeners = append(b.listeners, listener) b.mu.Unlock() } // JoinPrivileged will add the services to a list that gets first look at // all events, but they are not allowed to issue any commands. This is for // things that cannot have eventual consistency. func (b *EventBus) JoinPrivileged(service ActiveService) { b.mu.Lock() b.privilegedList = append(b.privilegedList, service) b.mu.Unlock() } func (b *EventBus) RunCommand(command Command) { if cd := command.CommandDescription(); !strings.HasPrefix(cd, "SetState") { if setStates := atomic.LoadInt32(&b.setStates); setStates > 0 { fmt.Println("[INFO]", setStates, "SetStates commands hidden.") atomic.AddInt32(&b.setStates, -setStates) } fmt.Println("[COMMAND]", cd) } else { if atomic.AddInt32(&b.setStates, 1) >= 1000 { fmt.Println("[INFO] 1000 SetStates commands hidden.") atomic.AddInt32(&b.setStates, -1000) } } b.send(serviceMessage{command: command}) } func (b *EventBus) RunEvent(event Event) { fmt.Println("[EVENT]", event.EventDescription()) b.send(serviceMessage{event: event}) } func (b *EventBus) RunEvents(events []Event) { for _, event := range events { b.RunEvent(event) } } func (b *EventBus) send(message serviceMessage) { b.mu.Lock() defer b.mu.Unlock() for _, service := range b.privilegedList { if message.command != nil { service.HandleCommand(b, message.command) } if message.event != nil { service.HandleEvent(nil, message.event) } } deleteList := make([]int, 0, 0) for i, listener := range b.listeners { if !listener.service.Active() { deleteList = append(deleteList, i-len(deleteList)) continue } listener.mu.Lock() listener.queue = append(listener.queue, message) listener.mu.Unlock() } for _, i := range deleteList { b.listeners = append(b.listeners[:i], b.listeners[i+1:]...) } if b.signal != nil { close(b.signal) b.signal = nil } } func (b *EventBus) signalCh() <-chan struct{} { b.mu.Lock() defer b.mu.Unlock() if b.signal == nil { b.signal = make(chan struct{}) } return b.signal } type serviceListener struct { mu sync.Mutex bus *EventBus queue []serviceMessage service Service } func (l *serviceListener) run(signal <-chan struct{}) { // Detect command support var activeService ActiveService if as, ok := l.service.(ActiveService); ok { activeService = as } queue := make([]serviceMessage, 0, 16) for { // Listen for the signal, but stop here if the service has marked // itself as inactive. <-signal if !l.service.Active() { return } // Take a new signal before copying the queue to avoid delay // in case a message is about to be enqueued right now! signal = l.bus.signalCh() // Take a copy of the queue. l.mu.Lock() for _, message := range l.queue { queue = append(queue, message) } l.queue = l.queue[:0] l.mu.Unlock() // Handle the messages in the queue, but stop mid-queue if the previous message // or external circumstances marked the service inactive. for _, message := range queue { if !l.service.Active() { return } if message.event != nil { l.service.HandleEvent(l.bus, message.event) } if message.command != nil && activeService != nil { activeService.HandleCommand(l.bus, message.command) } } queue = queue[:0] } } type serviceMessage struct { event Event command Command }