You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

200 lines
4.4 KiB

2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
  1. package lucifer3
  2. import (
  3. "fmt"
  4. "strings"
  5. "sync"
  6. "sync/atomic"
  7. )
  8. type ServiceKey struct{}
  9. type Event interface {
  10. EventDescription() string
  11. }
  12. // A TriggerEvent is a catch-all for events that may trigger a command from the event handler.
  13. type TriggerEvent interface {
  14. Event
  15. TriggerKind() string
  16. TriggerValue(key string) (string, bool)
  17. }
  18. type Command interface {
  19. CommandDescription() string
  20. }
  21. type EventBus struct {
  22. mu sync.Mutex
  23. listeners []*serviceListener
  24. privilegedList []ActiveService
  25. signal chan struct{}
  26. setStates int32
  27. }
  28. // JoinCallback joins the event bus for a moment.
  29. func (b *EventBus) JoinCallback(cb func(event Event) bool) {
  30. b.Join(newCallbackService(cb))
  31. }
  32. func (b *EventBus) Join(service Service) {
  33. // Take the signal here so that it will receive the events sent by the calling function
  34. // so that they're processed without having to wait for a new event.
  35. signal := b.signalCh()
  36. listener := &serviceListener{
  37. bus: b,
  38. queue: make([]serviceMessage, 0, 16),
  39. service: service,
  40. }
  41. go listener.run(signal)
  42. b.mu.Lock()
  43. b.listeners = append(b.listeners, listener)
  44. b.mu.Unlock()
  45. }
  46. // JoinPrivileged will add the services to a list that gets first look at
  47. // all events, but they are not allowed to issue any commands. This is for
  48. // things that cannot have eventual consistency.
  49. func (b *EventBus) JoinPrivileged(service ActiveService) {
  50. b.mu.Lock()
  51. b.privilegedList = append(b.privilegedList, service)
  52. b.mu.Unlock()
  53. }
  54. func (b *EventBus) RunCommand(command Command) {
  55. if cd := command.CommandDescription(); !strings.HasPrefix(cd, "SetStateBatch(") {
  56. if setStates := atomic.LoadInt32(&b.setStates); setStates > 0 {
  57. fmt.Println("[INFO]", setStates, "SetStates commands hidden.")
  58. atomic.AddInt32(&b.setStates, -setStates)
  59. }
  60. fmt.Println("[COMMAND]", cd)
  61. } else {
  62. if atomic.AddInt32(&b.setStates, 1) >= 1000 {
  63. fmt.Println("[INFO] 100 SetStates commands hidden.")
  64. atomic.AddInt32(&b.setStates, -1000)
  65. }
  66. }
  67. b.send(serviceMessage{command: command})
  68. }
  69. func (b *EventBus) RunEvent(event Event) {
  70. fmt.Println("[EVENT]", event.EventDescription())
  71. b.send(serviceMessage{event: event})
  72. }
  73. func (b *EventBus) RunEvents(events []Event) {
  74. for _, event := range events {
  75. b.RunEvent(event)
  76. }
  77. }
  78. func (b *EventBus) send(message serviceMessage) {
  79. b.mu.Lock()
  80. defer b.mu.Unlock()
  81. for _, service := range b.privilegedList {
  82. if message.command != nil {
  83. service.HandleCommand(nil, message.command)
  84. }
  85. if message.event != nil {
  86. service.HandleEvent(nil, message.event)
  87. }
  88. }
  89. deleteList := make([]int, 0, 0)
  90. for i, listener := range b.listeners {
  91. if !listener.service.Active() {
  92. deleteList = append(deleteList, i-len(deleteList))
  93. continue
  94. }
  95. listener.mu.Lock()
  96. listener.queue = append(listener.queue, message)
  97. listener.mu.Unlock()
  98. }
  99. for _, i := range deleteList {
  100. b.listeners = append(b.listeners[:i], b.listeners[i+1:]...)
  101. }
  102. if b.signal != nil {
  103. close(b.signal)
  104. b.signal = nil
  105. }
  106. }
  107. func (b *EventBus) signalCh() <-chan struct{} {
  108. b.mu.Lock()
  109. defer b.mu.Unlock()
  110. if b.signal == nil {
  111. b.signal = make(chan struct{})
  112. }
  113. return b.signal
  114. }
  115. type serviceListener struct {
  116. mu sync.Mutex
  117. bus *EventBus
  118. queue []serviceMessage
  119. service Service
  120. }
  121. func (l *serviceListener) run(signal <-chan struct{}) {
  122. // Detect command support
  123. var activeService ActiveService
  124. if as, ok := l.service.(ActiveService); ok {
  125. activeService = as
  126. }
  127. queue := make([]serviceMessage, 0, 16)
  128. for {
  129. // Listen for the signal, but stop here if the service has marked
  130. // itself as inactive.
  131. <-signal
  132. if !l.service.Active() {
  133. return
  134. }
  135. // Take a new signal before copying the queue to avoid delay
  136. // in case a message is about to be enqueued right now!
  137. signal = l.bus.signalCh()
  138. // Take a copy of the queue.
  139. l.mu.Lock()
  140. for _, message := range l.queue {
  141. queue = append(queue, message)
  142. }
  143. l.queue = l.queue[:0]
  144. l.mu.Unlock()
  145. // Handle the messages in the queue, but stop mid-queue if the previous message
  146. // or external circumstances marked the service inactive.
  147. for _, message := range queue {
  148. if !l.service.Active() {
  149. return
  150. }
  151. if message.event != nil {
  152. l.service.HandleEvent(l.bus, message.event)
  153. }
  154. if message.command != nil && activeService != nil {
  155. activeService.HandleCommand(l.bus, message.command)
  156. }
  157. }
  158. queue = queue[:0]
  159. }
  160. }
  161. type serviceMessage struct {
  162. event Event
  163. command Command
  164. }