You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

246 lines
5.1 KiB

2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
  1. package lucifer3
  2. import (
  3. "fmt"
  4. "log"
  5. "os"
  6. "strconv"
  7. "strings"
  8. "sync"
  9. )
  10. type ServiceKey struct{}
  11. var showVerboseEvents bool
  12. type verboseEventOrCommand interface {
  13. VerboseKey() string
  14. }
  15. type Event interface {
  16. EventDescription() string
  17. }
  18. // A TriggerEvent is a catch-all for events that may trigger a command from the event handler.
  19. type TriggerEvent interface {
  20. Event
  21. TriggerKind() string
  22. TriggerValue(key string) (string, bool)
  23. }
  24. type Command interface {
  25. CommandDescription() string
  26. }
  27. type EventBus struct {
  28. mu sync.Mutex
  29. listeners []*serviceListener
  30. privilegedList []ActiveService
  31. signal chan struct{}
  32. setStates int32
  33. verboseCount int
  34. verboseCounts map[string]int
  35. }
  36. // JoinCallback joins the event bus for a moment.
  37. func (b *EventBus) JoinCallback(cb func(event Event) bool) {
  38. b.Join(newCallbackService(cb))
  39. }
  40. func (b *EventBus) Join(service Service) {
  41. // Take the signal here so that it will receive the events sent by the calling function
  42. // so that they're processed without having to wait for a new event.
  43. signal := b.signalCh()
  44. listener := &serviceListener{
  45. bus: b,
  46. queue: make([]serviceMessage, 0, 16),
  47. service: service,
  48. }
  49. go listener.run(signal)
  50. b.mu.Lock()
  51. b.listeners = append(b.listeners, listener)
  52. b.mu.Unlock()
  53. }
  54. // JoinPrivileged will add the services to a list that gets first look at
  55. // all events, but they are not allowed to issue any commands. This is for
  56. // things that cannot have eventual consistency.
  57. func (b *EventBus) JoinPrivileged(service ActiveService) {
  58. b.mu.Lock()
  59. b.privilegedList = append(b.privilegedList, service)
  60. b.mu.Unlock()
  61. }
  62. func (b *EventBus) RunCommand(command Command) {
  63. if v, ok := command.(verboseEventOrCommand); ok && !showVerboseEvents {
  64. b.mu.Lock()
  65. b.countVerbose(v.VerboseKey())
  66. b.mu.Unlock()
  67. } else {
  68. fmt.Println("[COMMAND]", command.CommandDescription())
  69. }
  70. b.send(serviceMessage{command: command})
  71. }
  72. func (b *EventBus) RunEvent(event Event) {
  73. if v, ok := event.(verboseEventOrCommand); ok && !showVerboseEvents {
  74. b.mu.Lock()
  75. b.countVerbose(v.VerboseKey())
  76. b.mu.Unlock()
  77. } else {
  78. fmt.Println("[EVENT]", event.EventDescription())
  79. }
  80. b.send(serviceMessage{event: event})
  81. }
  82. func (b *EventBus) RunEvents(events []Event) {
  83. for _, event := range events {
  84. b.RunEvent(event)
  85. }
  86. }
  87. func (b *EventBus) send(message serviceMessage) {
  88. b.mu.Lock()
  89. defer b.mu.Unlock()
  90. for _, service := range b.privilegedList {
  91. if message.command != nil {
  92. service.HandleCommand(b, message.command)
  93. }
  94. if message.event != nil {
  95. service.HandleEvent(nil, message.event)
  96. }
  97. }
  98. deleteList := make([]int, 0, 0)
  99. for i, listener := range b.listeners {
  100. if !listener.service.Active() {
  101. deleteList = append(deleteList, i-len(deleteList))
  102. continue
  103. }
  104. listener.mu.Lock()
  105. listener.queue = append(listener.queue, message)
  106. listener.mu.Unlock()
  107. }
  108. for _, i := range deleteList {
  109. b.listeners = append(b.listeners[:i], b.listeners[i+1:]...)
  110. }
  111. if b.signal != nil {
  112. close(b.signal)
  113. b.signal = nil
  114. }
  115. }
  116. func (b *EventBus) signalCh() <-chan struct{} {
  117. b.mu.Lock()
  118. defer b.mu.Unlock()
  119. if b.signal == nil {
  120. b.signal = make(chan struct{})
  121. }
  122. return b.signal
  123. }
  124. type serviceListener struct {
  125. mu sync.Mutex
  126. bus *EventBus
  127. queue []serviceMessage
  128. service Service
  129. }
  130. func (l *serviceListener) run(signal <-chan struct{}) {
  131. // Detect command support
  132. var activeService ActiveService
  133. if as, ok := l.service.(ActiveService); ok {
  134. activeService = as
  135. }
  136. queue := make([]serviceMessage, 0, 16)
  137. for {
  138. // Listen for the signal, but stop here if the service has marked
  139. // itself as inactive.
  140. <-signal
  141. if !l.service.Active() {
  142. return
  143. }
  144. // Take a new signal before copying the queue to avoid delay
  145. // in case a message is about to be enqueued right now!
  146. signal = l.bus.signalCh()
  147. // Take a copy of the queue.
  148. l.mu.Lock()
  149. for _, message := range l.queue {
  150. queue = append(queue, message)
  151. }
  152. l.queue = l.queue[:0]
  153. l.mu.Unlock()
  154. // Handle the messages in the queue, but stop mid-queue if the previous message
  155. // or external circumstances marked the service inactive.
  156. for _, message := range queue {
  157. if !l.service.Active() {
  158. return
  159. }
  160. if message.event != nil {
  161. l.service.HandleEvent(l.bus, message.event)
  162. }
  163. if message.command != nil && activeService != nil {
  164. activeService.HandleCommand(l.bus, message.command)
  165. }
  166. }
  167. queue = queue[:0]
  168. }
  169. }
  170. func (b *EventBus) countVerbose(key string) {
  171. if b.verboseCounts == nil {
  172. b.verboseCounts = make(map[string]int, 8)
  173. }
  174. b.verboseCounts[key] = b.verboseCounts[key] + 1
  175. b.verboseCount += 1
  176. if b.verboseCount >= 1000 {
  177. first := true
  178. s := strings.Builder{}
  179. s.WriteString("[EVENT] 1000 verbose events hidden (")
  180. for key, value := range b.verboseCounts {
  181. if !first {
  182. s.WriteString(", ")
  183. }
  184. first = false
  185. s.WriteString(key)
  186. s.WriteRune(':')
  187. s.WriteString(strconv.Itoa(value))
  188. }
  189. s.WriteRune(')')
  190. log.Println(s.String())
  191. b.verboseCounts = make(map[string]int, 8)
  192. b.verboseCount = 0
  193. }
  194. }
  195. type serviceMessage struct {
  196. event Event
  197. command Command
  198. }
  199. func init() {
  200. showVerboseEvents = os.Getenv("LUCIFER4_VERBOSE") == "true"
  201. }