You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

160 lines
3.3 KiB

2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
  1. package lucifer3
  2. import (
  3. "fmt"
  4. "sync"
  5. )
  6. type ServiceKey struct{}
  7. type Event interface {
  8. EventDescription() string
  9. }
  10. // A TriggerEvent is a catch-all for events that may trigger a command from the event handler.
  11. type TriggerEvent interface {
  12. Event
  13. TriggerKind() string
  14. TriggerValue(key string) (string, bool)
  15. }
  16. type Command interface {
  17. CommandDescription() string
  18. }
  19. type EventBus struct {
  20. mu sync.Mutex
  21. listeners []*serviceListener
  22. signal chan struct{}
  23. }
  24. // JoinCallback joins the event bus for a moment.
  25. func (b *EventBus) JoinCallback(cb func(event Event) bool) {
  26. b.Join(newCallbackService(cb))
  27. }
  28. func (b *EventBus) Join(service Service) {
  29. // Take the signal here so that it will receive the events sent by the calling function
  30. // so that they're processed without having to wait for a new event.
  31. signal := b.signalCh()
  32. b.mu.Lock()
  33. listener := &serviceListener{
  34. bus: b,
  35. queue: make([]serviceMessage, 0, 16),
  36. service: service,
  37. }
  38. go listener.run(signal)
  39. b.listeners = append(b.listeners, listener)
  40. b.mu.Unlock()
  41. }
  42. func (b *EventBus) RunCommand(command Command) {
  43. fmt.Println("[COMMAND]", command.CommandDescription())
  44. b.send(serviceMessage{command: command})
  45. }
  46. func (b *EventBus) RunEvent(event Event) {
  47. fmt.Println("[EVENT]", event.EventDescription())
  48. b.send(serviceMessage{event: event})
  49. }
  50. func (b *EventBus) send(message serviceMessage) {
  51. b.mu.Lock()
  52. defer b.mu.Unlock()
  53. deleteList := make([]int, 0, 0)
  54. for i, listener := range b.listeners {
  55. if !listener.service.Active() {
  56. deleteList = append(deleteList, i-len(deleteList))
  57. continue
  58. }
  59. listener.mu.Lock()
  60. listener.queue = append(listener.queue, message)
  61. listener.mu.Unlock()
  62. }
  63. for i := range deleteList {
  64. b.listeners = append(b.listeners[:i], b.listeners[i+1:]...)
  65. }
  66. if b.signal != nil {
  67. close(b.signal)
  68. b.signal = nil
  69. }
  70. }
  71. func (b *EventBus) signalCh() <-chan struct{} {
  72. b.mu.Lock()
  73. defer b.mu.Unlock()
  74. if b.signal == nil {
  75. b.signal = make(chan struct{})
  76. }
  77. return b.signal
  78. }
  79. type serviceListener struct {
  80. mu sync.Mutex
  81. bus *EventBus
  82. queue []serviceMessage
  83. service Service
  84. }
  85. func (l *serviceListener) run(signal <-chan struct{}) {
  86. // Detect command support
  87. var activeService ActiveService
  88. if as, ok := l.service.(ActiveService); ok {
  89. activeService = as
  90. }
  91. queue := make([]serviceMessage, 0, 16)
  92. for {
  93. // Listen for the signal, but stop here if the service has marked
  94. // itself as inactive.
  95. <-signal
  96. if !l.service.Active() {
  97. return
  98. }
  99. // Take a new signal before copying the queue to avoid delay
  100. // in case a message is about to be enqueued right now!
  101. signal = l.bus.signalCh()
  102. // Take a copy of the queue.
  103. l.mu.Lock()
  104. for _, message := range l.queue {
  105. queue = append(queue, message)
  106. }
  107. l.queue = l.queue[:0]
  108. l.mu.Unlock()
  109. // Handle the messages in the queue, but stop mid-queue if the previous message
  110. // or external circumstances marked the service inactive.
  111. for _, message := range queue {
  112. if !l.service.Active() {
  113. return
  114. }
  115. if message.event != nil {
  116. l.service.HandleEvent(l.bus, message.event)
  117. }
  118. if message.command != nil && activeService != nil {
  119. activeService.HandleCommand(l.bus, message.command)
  120. }
  121. }
  122. queue = queue[:0]
  123. }
  124. }
  125. type serviceMessage struct {
  126. event Event
  127. command Command
  128. }