You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

178 lines
3.9 KiB

package lucifer3
import (
"fmt"
"sync"
)
type ServiceKey struct{}
type Event interface {
EventDescription() string
}
// A TriggerEvent is a catch-all for events that may trigger a command from the event handler.
type TriggerEvent interface {
Event
TriggerKind() string
TriggerValue(key string) (string, bool)
}
type Command interface {
CommandDescription() string
}
type EventBus struct {
mu sync.Mutex
listeners []*serviceListener
privilegedList []ActiveService
signal chan struct{}
}
// JoinCallback joins the event bus for a moment.
func (b *EventBus) JoinCallback(cb func(event Event) bool) {
b.Join(newCallbackService(cb))
}
func (b *EventBus) Join(service Service) {
// Take the signal here so that it will receive the events sent by the calling function
// so that they're processed without having to wait for a new event.
signal := b.signalCh()
listener := &serviceListener{
bus: b,
queue: make([]serviceMessage, 0, 16),
service: service,
}
go listener.run(signal)
b.mu.Lock()
b.listeners = append(b.listeners, listener)
b.mu.Unlock()
}
// JoinPrivileged will add the services to a list that gets first look at
// all events, but they are not allowed to issue any commands. This is for
// things that cannot have eventual consistency.
func (b *EventBus) JoinPrivileged(service ActiveService) {
b.mu.Lock()
b.privilegedList = append(b.privilegedList, service)
b.mu.Unlock()
}
func (b *EventBus) RunCommand(command Command) {
fmt.Println("[COMMAND]", command.CommandDescription())
b.send(serviceMessage{command: command})
}
func (b *EventBus) RunEvent(event Event) {
fmt.Println("[EVENT]", event.EventDescription())
b.send(serviceMessage{event: event})
}
func (b *EventBus) send(message serviceMessage) {
b.mu.Lock()
defer b.mu.Unlock()
for _, service := range b.privilegedList {
if message.command != nil {
service.HandleCommand(nil, message.command)
}
if message.event != nil {
service.HandleEvent(nil, message.event)
}
}
deleteList := make([]int, 0, 0)
for i, listener := range b.listeners {
if !listener.service.Active() {
deleteList = append(deleteList, i-len(deleteList))
continue
}
listener.mu.Lock()
listener.queue = append(listener.queue, message)
listener.mu.Unlock()
}
for _, i := range deleteList {
b.listeners = append(b.listeners[:i], b.listeners[i+1:]...)
}
if b.signal != nil {
close(b.signal)
b.signal = nil
}
}
func (b *EventBus) signalCh() <-chan struct{} {
b.mu.Lock()
defer b.mu.Unlock()
if b.signal == nil {
b.signal = make(chan struct{})
}
return b.signal
}
type serviceListener struct {
mu sync.Mutex
bus *EventBus
queue []serviceMessage
service Service
}
func (l *serviceListener) run(signal <-chan struct{}) {
// Detect command support
var activeService ActiveService
if as, ok := l.service.(ActiveService); ok {
activeService = as
}
queue := make([]serviceMessage, 0, 16)
for {
// Listen for the signal, but stop here if the service has marked
// itself as inactive.
<-signal
if !l.service.Active() {
return
}
// Take a new signal before copying the queue to avoid delay
// in case a message is about to be enqueued right now!
signal = l.bus.signalCh()
// Take a copy of the queue.
l.mu.Lock()
for _, message := range l.queue {
queue = append(queue, message)
}
l.queue = l.queue[:0]
l.mu.Unlock()
// Handle the messages in the queue, but stop mid-queue if the previous message
// or external circumstances marked the service inactive.
for _, message := range queue {
if !l.service.Active() {
return
}
if message.event != nil {
l.service.HandleEvent(l.bus, message.event)
}
if message.command != nil && activeService != nil {
activeService.HandleCommand(l.bus, message.command)
}
}
queue = queue[:0]
}
}
type serviceMessage struct {
event Event
command Command
}