You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

138 lines
2.8 KiB

  1. package lucifer3
  2. import (
  3. "log"
  4. "sync"
  5. )
  6. type ServiceKey struct{}
  7. type Event interface {
  8. EventName() string
  9. }
  10. type EventBus struct {
  11. mu sync.Mutex
  12. listeners []*serviceListener
  13. signal chan struct{}
  14. }
  15. // JoinCallback joins the event bus for a moment.
  16. func (b *EventBus) JoinCallback(cb func(event Event, sender ServiceID) bool) {
  17. b.Join(newCallbackService(cb))
  18. }
  19. func (b *EventBus) Join(service Service) {
  20. // Take the signal here so that it will receive the events sent by the calling function
  21. // so that they're processed without having to wait for a new event.
  22. signal := b.signalCh()
  23. b.mu.Lock()
  24. listener := &serviceListener{
  25. bus: b,
  26. queue: make([]queuedEvent, 0, 16),
  27. service: service,
  28. }
  29. go listener.run(signal)
  30. b.listeners = append(b.listeners, listener)
  31. b.mu.Unlock()
  32. }
  33. func (b *EventBus) Send(event Event, sender ServiceID, recipient *ServiceID) {
  34. b.mu.Lock()
  35. defer b.mu.Unlock()
  36. deleteList := make([]int, 0, 0)
  37. for i, listener := range b.listeners {
  38. if !listener.service.Active() {
  39. deleteList = append(deleteList, i-len(deleteList))
  40. continue
  41. }
  42. if recipient != nil && *recipient != listener.service.ServiceID() {
  43. continue
  44. }
  45. listener.mu.Lock()
  46. listener.queue = append(listener.queue, queuedEvent{
  47. event: event,
  48. sender: sender,
  49. })
  50. listener.mu.Unlock()
  51. }
  52. for i := range deleteList {
  53. b.listeners = append(b.listeners[:i], b.listeners[i+1:]...)
  54. }
  55. if recipient != nil {
  56. log.Printf("%s (-> %s): %s", &sender, recipient, event.EventName())
  57. } else {
  58. log.Printf("%s: %s", &sender, event.EventName())
  59. }
  60. if b.signal != nil {
  61. close(b.signal)
  62. b.signal = nil
  63. }
  64. }
  65. func (b *EventBus) signalCh() <-chan struct{} {
  66. b.mu.Lock()
  67. defer b.mu.Unlock()
  68. if b.signal == nil {
  69. b.signal = make(chan struct{})
  70. }
  71. return b.signal
  72. }
  73. type serviceListener struct {
  74. mu sync.Mutex
  75. bus *EventBus
  76. queue []queuedEvent
  77. service Service
  78. }
  79. func (l *serviceListener) run(signal <-chan struct{}) {
  80. queue := make([]queuedEvent, 0, 16)
  81. for {
  82. // Listen for the signal, but stop here if the service has marked
  83. // itself as inactive.
  84. <-signal
  85. if !l.service.Active() {
  86. return
  87. }
  88. // Take a new signal before copying the queue to avoid delay
  89. // in case a message is about to be enqueued right now!
  90. signal = l.bus.signalCh()
  91. // Take a copy of the queue.
  92. l.mu.Lock()
  93. for _, message := range l.queue {
  94. queue = append(queue, message)
  95. }
  96. l.queue = l.queue[:0]
  97. l.mu.Unlock()
  98. // Handle the messages in the queue, but stop mid-queue if the previous message
  99. // or external circumstances marked the service inactive.
  100. for _, message := range queue {
  101. if !l.service.Active() {
  102. return
  103. }
  104. l.service.Handle(l.bus, message.event, message.sender)
  105. }
  106. queue = queue[:0]
  107. }
  108. }
  109. type queuedEvent struct {
  110. event Event
  111. sender ServiceID
  112. }