package lucifer3 import ( "fmt" "log" "os" "strconv" "strings" "sync" ) type ServiceKey struct{} var showVerboseEvents bool type verboseEventOrCommand interface { VerboseKey() string } type Event interface { EventDescription() string } // A TriggerEvent is a catch-all for events that may trigger a command from the event handler. type TriggerEvent interface { Event TriggerKind() string TriggerValue(key string) (string, bool) } type Command interface { CommandDescription() string } type EventBus struct { mu sync.Mutex listeners []*serviceListener privilegedList []ActiveService signal chan struct{} setStates int32 verboseCount int verboseCounts map[string]int } // JoinCallback joins the event bus for a moment. func (b *EventBus) JoinCallback(cb func(event Event) bool) { b.Join(newCallbackService(cb)) } func (b *EventBus) Join(service Service) { // Take the signal here so that it will receive the events sent by the calling function // so that they're processed without having to wait for a new event. signal := b.signalCh() listener := &serviceListener{ bus: b, queue: make([]serviceMessage, 0, 16), service: service, } go listener.run(signal) b.mu.Lock() b.listeners = append(b.listeners, listener) b.mu.Unlock() } // JoinPrivileged will add the services to a list that gets first look at // all events, but they are not allowed to issue any commands. This is for // things that cannot have eventual consistency. func (b *EventBus) JoinPrivileged(service ActiveService) { b.mu.Lock() b.privilegedList = append(b.privilegedList, service) b.mu.Unlock() } func (b *EventBus) RunCommand(command Command) { if v, ok := command.(verboseEventOrCommand); ok && !showVerboseEvents { b.mu.Lock() b.countVerbose(v.VerboseKey()) b.mu.Unlock() } else { fmt.Println("[COMMAND]", command.CommandDescription()) } b.send(serviceMessage{command: command}) } func (b *EventBus) RunEvent(event Event) { if v, ok := event.(verboseEventOrCommand); ok && !showVerboseEvents { b.mu.Lock() b.countVerbose(v.VerboseKey()) b.mu.Unlock() } else { fmt.Println("[EVENT]", event.EventDescription()) } b.send(serviceMessage{event: event}) } func (b *EventBus) RunEvents(events []Event) { for _, event := range events { b.RunEvent(event) } } func (b *EventBus) send(message serviceMessage) { b.mu.Lock() defer b.mu.Unlock() for _, service := range b.privilegedList { if message.command != nil { service.HandleCommand(b, message.command) } if message.event != nil { service.HandleEvent(nil, message.event) } } deleteList := make([]int, 0, 0) for i, listener := range b.listeners { if !listener.service.Active() { deleteList = append(deleteList, i-len(deleteList)) continue } listener.mu.Lock() listener.queue = append(listener.queue, message) listener.mu.Unlock() } for _, i := range deleteList { b.listeners = append(b.listeners[:i], b.listeners[i+1:]...) } if b.signal != nil { close(b.signal) b.signal = nil } } func (b *EventBus) signalCh() <-chan struct{} { b.mu.Lock() defer b.mu.Unlock() if b.signal == nil { b.signal = make(chan struct{}) } return b.signal } type serviceListener struct { mu sync.Mutex bus *EventBus queue []serviceMessage service Service } func (l *serviceListener) run(signal <-chan struct{}) { // Detect command support var activeService ActiveService if as, ok := l.service.(ActiveService); ok { activeService = as } queue := make([]serviceMessage, 0, 16) for { // Listen for the signal, but stop here if the service has marked // itself as inactive. <-signal if !l.service.Active() { return } // Take a new signal before copying the queue to avoid delay // in case a message is about to be enqueued right now! signal = l.bus.signalCh() // Take a copy of the queue. l.mu.Lock() for _, message := range l.queue { queue = append(queue, message) } l.queue = l.queue[:0] l.mu.Unlock() // Handle the messages in the queue, but stop mid-queue if the previous message // or external circumstances marked the service inactive. for _, message := range queue { if !l.service.Active() { return } if message.event != nil { l.service.HandleEvent(l.bus, message.event) } if message.command != nil && activeService != nil { activeService.HandleCommand(l.bus, message.command) } } queue = queue[:0] } } func (b *EventBus) countVerbose(key string) { if b.verboseCounts == nil { b.verboseCounts = make(map[string]int, 8) } b.verboseCounts[key] = b.verboseCounts[key] + 1 b.verboseCount += 1 if b.verboseCount >= 1000 { first := true s := strings.Builder{} s.WriteString("[EVENT] 1000 verbose events hidden (") for key, value := range b.verboseCounts { if !first { s.WriteString(", ") } first = false s.WriteString(key) s.WriteRune(':') s.WriteString(strconv.Itoa(value)) } s.WriteRune(')') log.Println(s.String()) b.verboseCounts = make(map[string]int, 8) b.verboseCount = 0 } } type serviceMessage struct { event Event command Command } func init() { showVerboseEvents = os.Getenv("LUCIFER4_VERBOSE") == "true" }