You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

246 lines
5.1 KiB

package lucifer3
import (
"fmt"
"log"
"os"
"strconv"
"strings"
"sync"
)
type ServiceKey struct{}
var showVerboseEvents bool
type verboseEventOrCommand interface {
VerboseKey() string
}
type Event interface {
EventDescription() string
}
// A TriggerEvent is a catch-all for events that may trigger a command from the event handler.
type TriggerEvent interface {
Event
TriggerKind() string
TriggerValue(key string) (string, bool)
}
type Command interface {
CommandDescription() string
}
type EventBus struct {
mu sync.Mutex
listeners []*serviceListener
privilegedList []ActiveService
signal chan struct{}
setStates int32
verboseCount int
verboseCounts map[string]int
}
// JoinCallback joins the event bus for a moment.
func (b *EventBus) JoinCallback(cb func(event Event) bool) {
b.Join(newCallbackService(cb))
}
func (b *EventBus) Join(service Service) {
// Take the signal here so that it will receive the events sent by the calling function
// so that they're processed without having to wait for a new event.
signal := b.signalCh()
listener := &serviceListener{
bus: b,
queue: make([]serviceMessage, 0, 16),
service: service,
}
go listener.run(signal)
b.mu.Lock()
b.listeners = append(b.listeners, listener)
b.mu.Unlock()
}
// JoinPrivileged will add the services to a list that gets first look at
// all events, but they are not allowed to issue any commands. This is for
// things that cannot have eventual consistency.
func (b *EventBus) JoinPrivileged(service ActiveService) {
b.mu.Lock()
b.privilegedList = append(b.privilegedList, service)
b.mu.Unlock()
}
func (b *EventBus) RunCommand(command Command) {
if v, ok := command.(verboseEventOrCommand); ok && !showVerboseEvents {
b.mu.Lock()
b.countVerbose(v.VerboseKey())
b.mu.Unlock()
} else {
fmt.Println("[COMMAND]", command.CommandDescription())
}
b.send(serviceMessage{command: command})
}
func (b *EventBus) RunEvent(event Event) {
if v, ok := event.(verboseEventOrCommand); ok && !showVerboseEvents {
b.mu.Lock()
b.countVerbose(v.VerboseKey())
b.mu.Unlock()
} else {
fmt.Println("[EVENT]", event.EventDescription())
}
b.send(serviceMessage{event: event})
}
func (b *EventBus) RunEvents(events []Event) {
for _, event := range events {
b.RunEvent(event)
}
}
func (b *EventBus) send(message serviceMessage) {
b.mu.Lock()
defer b.mu.Unlock()
for _, service := range b.privilegedList {
if message.command != nil {
service.HandleCommand(b, message.command)
}
if message.event != nil {
service.HandleEvent(nil, message.event)
}
}
deleteList := make([]int, 0, 0)
for i, listener := range b.listeners {
if !listener.service.Active() {
deleteList = append(deleteList, i-len(deleteList))
continue
}
listener.mu.Lock()
listener.queue = append(listener.queue, message)
listener.mu.Unlock()
}
for _, i := range deleteList {
b.listeners = append(b.listeners[:i], b.listeners[i+1:]...)
}
if b.signal != nil {
close(b.signal)
b.signal = nil
}
}
func (b *EventBus) signalCh() <-chan struct{} {
b.mu.Lock()
defer b.mu.Unlock()
if b.signal == nil {
b.signal = make(chan struct{})
}
return b.signal
}
type serviceListener struct {
mu sync.Mutex
bus *EventBus
queue []serviceMessage
service Service
}
func (l *serviceListener) run(signal <-chan struct{}) {
// Detect command support
var activeService ActiveService
if as, ok := l.service.(ActiveService); ok {
activeService = as
}
queue := make([]serviceMessage, 0, 16)
for {
// Listen for the signal, but stop here if the service has marked
// itself as inactive.
<-signal
if !l.service.Active() {
return
}
// Take a new signal before copying the queue to avoid delay
// in case a message is about to be enqueued right now!
signal = l.bus.signalCh()
// Take a copy of the queue.
l.mu.Lock()
for _, message := range l.queue {
queue = append(queue, message)
}
l.queue = l.queue[:0]
l.mu.Unlock()
// Handle the messages in the queue, but stop mid-queue if the previous message
// or external circumstances marked the service inactive.
for _, message := range queue {
if !l.service.Active() {
return
}
if message.event != nil {
l.service.HandleEvent(l.bus, message.event)
}
if message.command != nil && activeService != nil {
activeService.HandleCommand(l.bus, message.command)
}
}
queue = queue[:0]
}
}
func (b *EventBus) countVerbose(key string) {
if b.verboseCounts == nil {
b.verboseCounts = make(map[string]int, 8)
}
b.verboseCounts[key] = b.verboseCounts[key] + 1
b.verboseCount += 1
if b.verboseCount >= 1000 {
first := true
s := strings.Builder{}
s.WriteString("[EVENT] 1000 verbose events hidden (")
for key, value := range b.verboseCounts {
if !first {
s.WriteString(", ")
}
first = false
s.WriteString(key)
s.WriteRune(':')
s.WriteString(strconv.Itoa(value))
}
s.WriteRune(')')
log.Println(s.String())
b.verboseCounts = make(map[string]int, 8)
b.verboseCount = 0
}
}
type serviceMessage struct {
event Event
command Command
}
func init() {
showVerboseEvents = os.Getenv("LUCIFER4_VERBOSE") == "true"
}