You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

185 lines
4.0 KiB

2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
  1. package lucifer3
  2. import (
  3. "fmt"
  4. "sync"
  5. )
  6. type ServiceKey struct{}
  7. type Event interface {
  8. EventDescription() string
  9. }
  10. // A TriggerEvent is a catch-all for events that may trigger a command from the event handler.
  11. type TriggerEvent interface {
  12. Event
  13. TriggerKind() string
  14. TriggerValue(key string) (string, bool)
  15. }
  16. type Command interface {
  17. CommandDescription() string
  18. }
  19. type EventBus struct {
  20. mu sync.Mutex
  21. listeners []*serviceListener
  22. privilegedList []ActiveService
  23. signal chan struct{}
  24. setStates int32
  25. }
  26. // JoinCallback joins the event bus for a moment.
  27. func (b *EventBus) JoinCallback(cb func(event Event) bool) {
  28. b.Join(newCallbackService(cb))
  29. }
  30. func (b *EventBus) Join(service Service) {
  31. // Take the signal here so that it will receive the events sent by the calling function
  32. // so that they're processed without having to wait for a new event.
  33. signal := b.signalCh()
  34. listener := &serviceListener{
  35. bus: b,
  36. queue: make([]serviceMessage, 0, 16),
  37. service: service,
  38. }
  39. go listener.run(signal)
  40. b.mu.Lock()
  41. b.listeners = append(b.listeners, listener)
  42. b.mu.Unlock()
  43. }
  44. // JoinPrivileged will add the services to a list that gets first look at
  45. // all events, but they are not allowed to issue any commands. This is for
  46. // things that cannot have eventual consistency.
  47. func (b *EventBus) JoinPrivileged(service ActiveService) {
  48. b.mu.Lock()
  49. b.privilegedList = append(b.privilegedList, service)
  50. b.mu.Unlock()
  51. }
  52. func (b *EventBus) RunCommand(command Command) {
  53. fmt.Println("[COMMAND]", command.CommandDescription())
  54. b.send(serviceMessage{command: command})
  55. }
  56. func (b *EventBus) RunEvent(event Event) {
  57. fmt.Println("[EVENT]", event.EventDescription())
  58. b.send(serviceMessage{event: event})
  59. }
  60. func (b *EventBus) RunEvents(events []Event) {
  61. for _, event := range events {
  62. b.RunEvent(event)
  63. }
  64. }
  65. func (b *EventBus) send(message serviceMessage) {
  66. b.mu.Lock()
  67. defer b.mu.Unlock()
  68. for _, service := range b.privilegedList {
  69. if message.command != nil {
  70. service.HandleCommand(b, message.command)
  71. }
  72. if message.event != nil {
  73. service.HandleEvent(nil, message.event)
  74. }
  75. }
  76. deleteList := make([]int, 0, 0)
  77. for i, listener := range b.listeners {
  78. if !listener.service.Active() {
  79. deleteList = append(deleteList, i-len(deleteList))
  80. continue
  81. }
  82. listener.mu.Lock()
  83. listener.queue = append(listener.queue, message)
  84. listener.mu.Unlock()
  85. }
  86. for _, i := range deleteList {
  87. b.listeners = append(b.listeners[:i], b.listeners[i+1:]...)
  88. }
  89. if b.signal != nil {
  90. close(b.signal)
  91. b.signal = nil
  92. }
  93. }
  94. func (b *EventBus) signalCh() <-chan struct{} {
  95. b.mu.Lock()
  96. defer b.mu.Unlock()
  97. if b.signal == nil {
  98. b.signal = make(chan struct{})
  99. }
  100. return b.signal
  101. }
  102. type serviceListener struct {
  103. mu sync.Mutex
  104. bus *EventBus
  105. queue []serviceMessage
  106. service Service
  107. }
  108. func (l *serviceListener) run(signal <-chan struct{}) {
  109. // Detect command support
  110. var activeService ActiveService
  111. if as, ok := l.service.(ActiveService); ok {
  112. activeService = as
  113. }
  114. queue := make([]serviceMessage, 0, 16)
  115. for {
  116. // Listen for the signal, but stop here if the service has marked
  117. // itself as inactive.
  118. <-signal
  119. if !l.service.Active() {
  120. return
  121. }
  122. // Take a new signal before copying the queue to avoid delay
  123. // in case a message is about to be enqueued right now!
  124. signal = l.bus.signalCh()
  125. // Take a copy of the queue.
  126. l.mu.Lock()
  127. for _, message := range l.queue {
  128. queue = append(queue, message)
  129. }
  130. l.queue = l.queue[:0]
  131. l.mu.Unlock()
  132. // Handle the messages in the queue, but stop mid-queue if the previous message
  133. // or external circumstances marked the service inactive.
  134. for _, message := range queue {
  135. if !l.service.Active() {
  136. return
  137. }
  138. if message.event != nil {
  139. l.service.HandleEvent(l.bus, message.event)
  140. }
  141. if message.command != nil && activeService != nil {
  142. activeService.HandleCommand(l.bus, message.command)
  143. }
  144. }
  145. queue = queue[:0]
  146. }
  147. }
  148. type serviceMessage struct {
  149. event Event
  150. command Command
  151. }