|
|
package lucifer3
import ( "fmt" "log" "os" "strconv" "strings" "sync" )
type ServiceKey struct{}
var showVerboseEvents bool
type verboseEventOrCommand interface { VerboseKey() string }
type Event interface { EventDescription() string }
// A TriggerEvent is a catch-all for events that may trigger a command from the event handler.
type TriggerEvent interface { Event
TriggerKind() string TriggerValue(key string) (string, bool) }
type Command interface { CommandDescription() string }
type EventBus struct { mu sync.Mutex listeners []*serviceListener privilegedList []ActiveService signal chan struct{} setStates int32 verboseCount int verboseCounts map[string]int }
// JoinCallback joins the event bus for a moment.
func (b *EventBus) JoinCallback(cb func(event Event) bool) { b.Join(newCallbackService(cb)) }
func (b *EventBus) Join(service Service) { // Take the signal here so that it will receive the events sent by the calling function
// so that they're processed without having to wait for a new event.
signal := b.signalCh()
listener := &serviceListener{ bus: b, queue: make([]serviceMessage, 0, 16), service: service, } go listener.run(signal)
b.mu.Lock() b.listeners = append(b.listeners, listener) b.mu.Unlock() }
// JoinPrivileged will add the services to a list that gets first look at
// all events, but they are not allowed to issue any commands. This is for
// things that cannot have eventual consistency.
func (b *EventBus) JoinPrivileged(service ActiveService) { b.mu.Lock() b.privilegedList = append(b.privilegedList, service) b.mu.Unlock() }
func (b *EventBus) RunCommand(command Command) { if v, ok := command.(verboseEventOrCommand); ok && !showVerboseEvents { b.mu.Lock() b.countVerbose(v.VerboseKey()) b.mu.Unlock()
} else { fmt.Println("[COMMAND]", command.CommandDescription()) }
b.send(serviceMessage{command: command}) }
func (b *EventBus) RunEvent(event Event) { if v, ok := event.(verboseEventOrCommand); ok && !showVerboseEvents { b.mu.Lock() b.countVerbose(v.VerboseKey()) b.mu.Unlock() } else { fmt.Println("[EVENT]", event.EventDescription()) }
b.send(serviceMessage{event: event}) }
func (b *EventBus) RunEvents(events []Event) { for _, event := range events { b.RunEvent(event) } }
func (b *EventBus) send(message serviceMessage) { b.mu.Lock() defer b.mu.Unlock()
for _, service := range b.privilegedList { if message.command != nil { service.HandleCommand(b, message.command) } if message.event != nil { service.HandleEvent(nil, message.event) } }
deleteList := make([]int, 0, 0) for i, listener := range b.listeners { if !listener.service.Active() { deleteList = append(deleteList, i-len(deleteList)) continue }
listener.mu.Lock() listener.queue = append(listener.queue, message) listener.mu.Unlock() }
for _, i := range deleteList { b.listeners = append(b.listeners[:i], b.listeners[i+1:]...) }
if b.signal != nil { close(b.signal) b.signal = nil } }
func (b *EventBus) signalCh() <-chan struct{} { b.mu.Lock() defer b.mu.Unlock()
if b.signal == nil { b.signal = make(chan struct{}) }
return b.signal }
type serviceListener struct { mu sync.Mutex bus *EventBus queue []serviceMessage service Service }
func (l *serviceListener) run(signal <-chan struct{}) { // Detect command support
var activeService ActiveService if as, ok := l.service.(ActiveService); ok { activeService = as }
queue := make([]serviceMessage, 0, 16)
for { // Listen for the signal, but stop here if the service has marked
// itself as inactive.
<-signal if !l.service.Active() { return }
// Take a new signal before copying the queue to avoid delay
// in case a message is about to be enqueued right now!
signal = l.bus.signalCh()
// Take a copy of the queue.
l.mu.Lock() for _, message := range l.queue { queue = append(queue, message) } l.queue = l.queue[:0] l.mu.Unlock()
// Handle the messages in the queue, but stop mid-queue if the previous message
// or external circumstances marked the service inactive.
for _, message := range queue { if !l.service.Active() { return }
if message.event != nil { l.service.HandleEvent(l.bus, message.event) }
if message.command != nil && activeService != nil { activeService.HandleCommand(l.bus, message.command) } } queue = queue[:0] } }
func (b *EventBus) countVerbose(key string) { if b.verboseCounts == nil { b.verboseCounts = make(map[string]int, 8) } b.verboseCounts[key] = b.verboseCounts[key] + 1 b.verboseCount += 1
if b.verboseCount >= 1000 { first := true
s := strings.Builder{} s.WriteString("[EVENT] 1000 verbose events hidden (") for key, value := range b.verboseCounts { if !first { s.WriteString(", ") } first = false
s.WriteString(key) s.WriteRune(':') s.WriteString(strconv.Itoa(value)) } s.WriteRune(')') log.Println(s.String())
b.verboseCounts = make(map[string]int, 8) b.verboseCount = 0 } }
type serviceMessage struct { event Event command Command }
func init() { showVerboseEvents = os.Getenv("LUCIFER4_VERBOSE") == "true" }
|