You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

178 lines
3.9 KiB

2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
  1. package lucifer3
  2. import (
  3. "fmt"
  4. "sync"
  5. )
  6. type ServiceKey struct{}
  7. type Event interface {
  8. EventDescription() string
  9. }
  10. // A TriggerEvent is a catch-all for events that may trigger a command from the event handler.
  11. type TriggerEvent interface {
  12. Event
  13. TriggerKind() string
  14. TriggerValue(key string) (string, bool)
  15. }
  16. type Command interface {
  17. CommandDescription() string
  18. }
  19. type EventBus struct {
  20. mu sync.Mutex
  21. listeners []*serviceListener
  22. privilegedList []ActiveService
  23. signal chan struct{}
  24. }
  25. // JoinCallback joins the event bus for a moment.
  26. func (b *EventBus) JoinCallback(cb func(event Event) bool) {
  27. b.Join(newCallbackService(cb))
  28. }
  29. func (b *EventBus) Join(service Service) {
  30. // Take the signal here so that it will receive the events sent by the calling function
  31. // so that they're processed without having to wait for a new event.
  32. signal := b.signalCh()
  33. listener := &serviceListener{
  34. bus: b,
  35. queue: make([]serviceMessage, 0, 16),
  36. service: service,
  37. }
  38. go listener.run(signal)
  39. b.mu.Lock()
  40. b.listeners = append(b.listeners, listener)
  41. b.mu.Unlock()
  42. }
  43. // JoinPrivileged will add the services to a list that gets first look at
  44. // all events, but they are not allowed to issue any commands. This is for
  45. // things that cannot have eventual consistency.
  46. func (b *EventBus) JoinPrivileged(service ActiveService) {
  47. b.mu.Lock()
  48. b.privilegedList = append(b.privilegedList, service)
  49. b.mu.Unlock()
  50. }
  51. func (b *EventBus) RunCommand(command Command) {
  52. fmt.Println("[COMMAND]", command.CommandDescription())
  53. b.send(serviceMessage{command: command})
  54. }
  55. func (b *EventBus) RunEvent(event Event) {
  56. fmt.Println("[EVENT]", event.EventDescription())
  57. b.send(serviceMessage{event: event})
  58. }
  59. func (b *EventBus) send(message serviceMessage) {
  60. b.mu.Lock()
  61. defer b.mu.Unlock()
  62. for _, service := range b.privilegedList {
  63. if message.command != nil {
  64. service.HandleCommand(nil, message.command)
  65. }
  66. if message.event != nil {
  67. service.HandleEvent(nil, message.event)
  68. }
  69. }
  70. deleteList := make([]int, 0, 0)
  71. for i, listener := range b.listeners {
  72. if !listener.service.Active() {
  73. deleteList = append(deleteList, i-len(deleteList))
  74. continue
  75. }
  76. listener.mu.Lock()
  77. listener.queue = append(listener.queue, message)
  78. listener.mu.Unlock()
  79. }
  80. for i := range deleteList {
  81. b.listeners = append(b.listeners[:i], b.listeners[i+1:]...)
  82. }
  83. if b.signal != nil {
  84. close(b.signal)
  85. b.signal = nil
  86. }
  87. }
  88. func (b *EventBus) signalCh() <-chan struct{} {
  89. b.mu.Lock()
  90. defer b.mu.Unlock()
  91. if b.signal == nil {
  92. b.signal = make(chan struct{})
  93. }
  94. return b.signal
  95. }
  96. type serviceListener struct {
  97. mu sync.Mutex
  98. bus *EventBus
  99. queue []serviceMessage
  100. service Service
  101. }
  102. func (l *serviceListener) run(signal <-chan struct{}) {
  103. // Detect command support
  104. var activeService ActiveService
  105. if as, ok := l.service.(ActiveService); ok {
  106. activeService = as
  107. }
  108. queue := make([]serviceMessage, 0, 16)
  109. for {
  110. // Listen for the signal, but stop here if the service has marked
  111. // itself as inactive.
  112. <-signal
  113. if !l.service.Active() {
  114. return
  115. }
  116. // Take a new signal before copying the queue to avoid delay
  117. // in case a message is about to be enqueued right now!
  118. signal = l.bus.signalCh()
  119. // Take a copy of the queue.
  120. l.mu.Lock()
  121. for _, message := range l.queue {
  122. queue = append(queue, message)
  123. }
  124. l.queue = l.queue[:0]
  125. l.mu.Unlock()
  126. // Handle the messages in the queue, but stop mid-queue if the previous message
  127. // or external circumstances marked the service inactive.
  128. for _, message := range queue {
  129. if !l.service.Active() {
  130. return
  131. }
  132. if message.event != nil {
  133. l.service.HandleEvent(l.bus, message.event)
  134. }
  135. if message.command != nil && activeService != nil {
  136. activeService.HandleCommand(l.bus, message.command)
  137. }
  138. }
  139. queue = queue[:0]
  140. }
  141. }
  142. type serviceMessage struct {
  143. event Event
  144. command Command
  145. }