You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
194 lines
4.3 KiB
194 lines
4.3 KiB
package lucifer3
|
|
|
|
import (
|
|
"fmt"
|
|
"strings"
|
|
"sync"
|
|
"sync/atomic"
|
|
)
|
|
|
|
type ServiceKey struct{}
|
|
|
|
type Event interface {
|
|
EventDescription() string
|
|
}
|
|
|
|
// A TriggerEvent is a catch-all for events that may trigger a command from the event handler.
|
|
type TriggerEvent interface {
|
|
Event
|
|
|
|
TriggerKind() string
|
|
TriggerValue(key string) (string, bool)
|
|
}
|
|
|
|
type Command interface {
|
|
CommandDescription() string
|
|
}
|
|
|
|
type EventBus struct {
|
|
mu sync.Mutex
|
|
listeners []*serviceListener
|
|
privilegedList []ActiveService
|
|
signal chan struct{}
|
|
setStates int32
|
|
}
|
|
|
|
// JoinCallback joins the event bus for a moment.
|
|
func (b *EventBus) JoinCallback(cb func(event Event) bool) {
|
|
b.Join(newCallbackService(cb))
|
|
}
|
|
|
|
func (b *EventBus) Join(service Service) {
|
|
// Take the signal here so that it will receive the events sent by the calling function
|
|
// so that they're processed without having to wait for a new event.
|
|
signal := b.signalCh()
|
|
|
|
listener := &serviceListener{
|
|
bus: b,
|
|
queue: make([]serviceMessage, 0, 16),
|
|
service: service,
|
|
}
|
|
go listener.run(signal)
|
|
|
|
b.mu.Lock()
|
|
b.listeners = append(b.listeners, listener)
|
|
b.mu.Unlock()
|
|
}
|
|
|
|
// JoinPrivileged will add the services to a list that gets first look at
|
|
// all events, but they are not allowed to issue any commands. This is for
|
|
// things that cannot have eventual consistency.
|
|
func (b *EventBus) JoinPrivileged(service ActiveService) {
|
|
b.mu.Lock()
|
|
b.privilegedList = append(b.privilegedList, service)
|
|
b.mu.Unlock()
|
|
}
|
|
|
|
func (b *EventBus) RunCommand(command Command) {
|
|
if cd := command.CommandDescription(); !strings.HasPrefix(cd, "SetState") {
|
|
if setStates := atomic.LoadInt32(&b.setStates); setStates > 0 {
|
|
fmt.Println("[INFO]", setStates, "SetState commands hidden.")
|
|
atomic.AddInt32(&b.setStates, -setStates)
|
|
}
|
|
|
|
fmt.Println("[COMMAND]", cd)
|
|
} else {
|
|
if atomic.AddInt32(&b.setStates, 1) >= 1000 {
|
|
fmt.Println("[INFO] 100 SetState commands hidden.")
|
|
atomic.AddInt32(&b.setStates, -1000)
|
|
}
|
|
}
|
|
|
|
b.send(serviceMessage{command: command})
|
|
}
|
|
|
|
func (b *EventBus) RunEvent(event Event) {
|
|
fmt.Println("[EVENT]", event.EventDescription())
|
|
b.send(serviceMessage{event: event})
|
|
}
|
|
|
|
func (b *EventBus) send(message serviceMessage) {
|
|
b.mu.Lock()
|
|
defer b.mu.Unlock()
|
|
|
|
for _, service := range b.privilegedList {
|
|
if message.command != nil {
|
|
service.HandleCommand(nil, message.command)
|
|
}
|
|
if message.event != nil {
|
|
service.HandleEvent(nil, message.event)
|
|
}
|
|
}
|
|
|
|
deleteList := make([]int, 0, 0)
|
|
for i, listener := range b.listeners {
|
|
if !listener.service.Active() {
|
|
deleteList = append(deleteList, i-len(deleteList))
|
|
continue
|
|
}
|
|
|
|
listener.mu.Lock()
|
|
listener.queue = append(listener.queue, message)
|
|
listener.mu.Unlock()
|
|
}
|
|
|
|
for _, i := range deleteList {
|
|
b.listeners = append(b.listeners[:i], b.listeners[i+1:]...)
|
|
}
|
|
|
|
if b.signal != nil {
|
|
close(b.signal)
|
|
b.signal = nil
|
|
}
|
|
}
|
|
|
|
func (b *EventBus) signalCh() <-chan struct{} {
|
|
b.mu.Lock()
|
|
defer b.mu.Unlock()
|
|
|
|
if b.signal == nil {
|
|
b.signal = make(chan struct{})
|
|
}
|
|
|
|
return b.signal
|
|
}
|
|
|
|
type serviceListener struct {
|
|
mu sync.Mutex
|
|
bus *EventBus
|
|
queue []serviceMessage
|
|
service Service
|
|
}
|
|
|
|
func (l *serviceListener) run(signal <-chan struct{}) {
|
|
// Detect command support
|
|
var activeService ActiveService
|
|
if as, ok := l.service.(ActiveService); ok {
|
|
activeService = as
|
|
}
|
|
|
|
queue := make([]serviceMessage, 0, 16)
|
|
|
|
for {
|
|
// Listen for the signal, but stop here if the service has marked
|
|
// itself as inactive.
|
|
<-signal
|
|
if !l.service.Active() {
|
|
return
|
|
}
|
|
|
|
// Take a new signal before copying the queue to avoid delay
|
|
// in case a message is about to be enqueued right now!
|
|
signal = l.bus.signalCh()
|
|
|
|
// Take a copy of the queue.
|
|
l.mu.Lock()
|
|
for _, message := range l.queue {
|
|
queue = append(queue, message)
|
|
}
|
|
l.queue = l.queue[:0]
|
|
l.mu.Unlock()
|
|
|
|
// Handle the messages in the queue, but stop mid-queue if the previous message
|
|
// or external circumstances marked the service inactive.
|
|
for _, message := range queue {
|
|
if !l.service.Active() {
|
|
return
|
|
}
|
|
|
|
if message.event != nil {
|
|
l.service.HandleEvent(l.bus, message.event)
|
|
}
|
|
|
|
if message.command != nil && activeService != nil {
|
|
activeService.HandleCommand(l.bus, message.command)
|
|
}
|
|
}
|
|
queue = queue[:0]
|
|
}
|
|
}
|
|
|
|
type serviceMessage struct {
|
|
event Event
|
|
command Command
|
|
}
|