You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

152 lines
3.1 KiB

2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
  1. package lucifer3
  2. import (
  3. "fmt"
  4. "sync"
  5. )
  6. type ServiceKey struct{}
  7. type Event interface {
  8. EventDescription() string
  9. }
  10. type Command interface {
  11. CommandDescription() string
  12. }
  13. type EventBus struct {
  14. mu sync.Mutex
  15. listeners []*serviceListener
  16. signal chan struct{}
  17. }
  18. // JoinCallback joins the event bus for a moment.
  19. func (b *EventBus) JoinCallback(cb func(event Event) bool) {
  20. b.Join(newCallbackService(cb))
  21. }
  22. func (b *EventBus) Join(service Service) {
  23. // Take the signal here so that it will receive the events sent by the calling function
  24. // so that they're processed without having to wait for a new event.
  25. signal := b.signalCh()
  26. b.mu.Lock()
  27. listener := &serviceListener{
  28. bus: b,
  29. queue: make([]serviceMessage, 0, 16),
  30. service: service,
  31. }
  32. go listener.run(signal)
  33. b.listeners = append(b.listeners, listener)
  34. b.mu.Unlock()
  35. }
  36. func (b *EventBus) RunCommand(command Command) {
  37. fmt.Println("[COMMAND]", command.CommandDescription())
  38. b.send(serviceMessage{command: command})
  39. }
  40. func (b *EventBus) RunEvent(event Event) {
  41. fmt.Println("[EVENT]", event.EventDescription())
  42. b.send(serviceMessage{event: event})
  43. }
  44. func (b *EventBus) send(message serviceMessage) {
  45. b.mu.Lock()
  46. defer b.mu.Unlock()
  47. deleteList := make([]int, 0, 0)
  48. for i, listener := range b.listeners {
  49. if !listener.service.Active() {
  50. deleteList = append(deleteList, i-len(deleteList))
  51. continue
  52. }
  53. listener.mu.Lock()
  54. listener.queue = append(listener.queue, message)
  55. listener.mu.Unlock()
  56. }
  57. for i := range deleteList {
  58. b.listeners = append(b.listeners[:i], b.listeners[i+1:]...)
  59. }
  60. if b.signal != nil {
  61. close(b.signal)
  62. b.signal = nil
  63. }
  64. }
  65. func (b *EventBus) signalCh() <-chan struct{} {
  66. b.mu.Lock()
  67. defer b.mu.Unlock()
  68. if b.signal == nil {
  69. b.signal = make(chan struct{})
  70. }
  71. return b.signal
  72. }
  73. type serviceListener struct {
  74. mu sync.Mutex
  75. bus *EventBus
  76. queue []serviceMessage
  77. service Service
  78. }
  79. func (l *serviceListener) run(signal <-chan struct{}) {
  80. // Detect command support
  81. var activeService ActiveService
  82. if as, ok := l.service.(ActiveService); ok {
  83. activeService = as
  84. }
  85. queue := make([]serviceMessage, 0, 16)
  86. for {
  87. // Listen for the signal, but stop here if the service has marked
  88. // itself as inactive.
  89. <-signal
  90. if !l.service.Active() {
  91. return
  92. }
  93. // Take a new signal before copying the queue to avoid delay
  94. // in case a message is about to be enqueued right now!
  95. signal = l.bus.signalCh()
  96. // Take a copy of the queue.
  97. l.mu.Lock()
  98. for _, message := range l.queue {
  99. queue = append(queue, message)
  100. }
  101. l.queue = l.queue[:0]
  102. l.mu.Unlock()
  103. // Handle the messages in the queue, but stop mid-queue if the previous message
  104. // or external circumstances marked the service inactive.
  105. for _, message := range queue {
  106. if !l.service.Active() {
  107. return
  108. }
  109. if message.event != nil {
  110. l.service.HandleEvent(l.bus, message.event)
  111. }
  112. if message.command != nil && activeService != nil {
  113. activeService.HandleCommand(l.bus, message.command)
  114. }
  115. }
  116. queue = queue[:0]
  117. }
  118. }
  119. type serviceMessage struct {
  120. event Event
  121. command Command
  122. }