You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

275 lines
6.5 KiB

  1. package publisher
  2. import (
  3. "context"
  4. "git.aiterp.net/lucifer/new-server/app/config"
  5. "git.aiterp.net/lucifer/new-server/models"
  6. "log"
  7. "sync"
  8. "time"
  9. )
  10. type Publisher struct {
  11. mu sync.Mutex
  12. sceneData map[int]*models.Scene
  13. scenes []*Scene
  14. sceneAssignment map[int]*Scene
  15. started map[int]bool
  16. pending map[int][]models.Device
  17. waiting map[int]chan struct{}
  18. }
  19. func (p *Publisher) ReloadScenes(ctx context.Context) error {
  20. scenes, err := config.SceneRepository().FetchAll(ctx)
  21. if err != nil {
  22. return err
  23. }
  24. p.mu.Lock()
  25. for _, scene := range scenes {
  26. p.sceneData[scene.ID] = &scene
  27. }
  28. p.mu.Unlock()
  29. return nil
  30. }
  31. func (p *Publisher) ReloadDevices(ctx context.Context) error {
  32. devices, err := config.DeviceRepository().FetchByReference(ctx, models.RKAll, "")
  33. if err != nil {
  34. return err
  35. }
  36. p.Publish(devices...)
  37. return nil
  38. }
  39. func (p *Publisher) Publish(devices ...models.Device) {
  40. if len(devices) == 0 {
  41. return
  42. }
  43. p.mu.Lock()
  44. defer p.mu.Unlock()
  45. for _, device := range devices {
  46. if !p.started[device.BridgeID] {
  47. p.started[device.BridgeID] = true
  48. go p.runBridge(device.BridgeID)
  49. }
  50. p.reassignDevice(device)
  51. if p.sceneAssignment[device.ID] != nil {
  52. p.sceneAssignment[device.ID].UpsertDevice(device)
  53. } else {
  54. p.pending[device.BridgeID] = append(p.pending[device.BridgeID], device)
  55. if p.waiting[device.BridgeID] != nil {
  56. close(p.waiting[device.BridgeID])
  57. p.waiting[device.BridgeID] = nil
  58. }
  59. }
  60. }
  61. }
  62. func (p *Publisher) PublishChannel(ch <-chan []models.Device) {
  63. for list := range ch {
  64. p.Publish(list...)
  65. }
  66. }
  67. func (p *Publisher) Run() {
  68. ticker := time.NewTicker(time.Millisecond * 100)
  69. deleteList := make([]int, 0, 8)
  70. updatedList := make([]models.Device, 0, 16)
  71. for range ticker.C {
  72. deleteList = deleteList[:0]
  73. updatedList = updatedList[:0]
  74. p.mu.Lock()
  75. for i, scene := range p.scenes {
  76. if !scene.endTime.IsZero() && time.Now().After(scene.endTime) {
  77. deleteList = append(deleteList, i-len(deleteList))
  78. updatedList = append(updatedList, scene.AllDevices()...)
  79. for _, device := range scene.AllDevices() {
  80. p.sceneAssignment[device.ID] = nil
  81. }
  82. continue
  83. }
  84. if scene.Due() {
  85. updatedList = append(updatedList, scene.Run()...)
  86. updatedList = append(updatedList, scene.UnaffectedDevices()...)
  87. }
  88. }
  89. for _, i := range deleteList {
  90. p.scenes = append(p.scenes[:i], p.scenes[i+1:]...)
  91. }
  92. for _, device := range updatedList {
  93. if !p.started[device.BridgeID] {
  94. p.started[device.BridgeID] = true
  95. go p.runBridge(device.BridgeID)
  96. }
  97. p.pending[device.BridgeID] = append(p.pending[device.BridgeID], device)
  98. if p.waiting[device.BridgeID] != nil {
  99. close(p.waiting[device.BridgeID])
  100. p.waiting[device.BridgeID] = nil
  101. }
  102. }
  103. p.mu.Unlock()
  104. }
  105. }
  106. // reassignDevice re-evaluates the device's scene assignment config. It will return whether the scene changed, which
  107. // should trigger an update.
  108. func (p *Publisher) reassignDevice(device models.Device) bool {
  109. var selectedAssignment *models.DeviceSceneAssignment
  110. for _, assignment := range device.SceneAssignments {
  111. duration := time.Duration(assignment.DurationMS) * time.Millisecond
  112. if duration <= 0 || time.Now().Before(assignment.StartTime.Add(duration)) {
  113. selectedAssignment = &assignment
  114. }
  115. }
  116. if selectedAssignment == nil {
  117. if p.sceneAssignment[device.ID] != nil {
  118. p.sceneAssignment[device.ID].RemoveDevice(device)
  119. delete(p.sceneAssignment, device.ID)
  120. // Scene changed
  121. return true
  122. }
  123. // Stop here, no scene should be assigned.
  124. return false
  125. }
  126. if p.sceneAssignment[device.ID] != nil {
  127. scene := p.sceneAssignment[device.ID]
  128. if scene.data.ID == selectedAssignment.SceneID && scene.group == selectedAssignment.Group {
  129. // Current assignment is good.
  130. return false
  131. }
  132. p.sceneAssignment[device.ID].RemoveDevice(device)
  133. delete(p.sceneAssignment, device.ID)
  134. }
  135. for _, scene := range p.scenes {
  136. if scene.data.ID == selectedAssignment.SceneID && scene.group == selectedAssignment.Group {
  137. p.sceneAssignment[device.ID] = scene
  138. return true
  139. }
  140. }
  141. newScene := &Scene{
  142. data: p.sceneData[selectedAssignment.SceneID],
  143. group: selectedAssignment.Group,
  144. startTime: selectedAssignment.StartTime,
  145. endTime: selectedAssignment.StartTime.Add(time.Duration(selectedAssignment.DurationMS) * time.Millisecond),
  146. roleMap: make(map[int]int, 16),
  147. roleList: make(map[int][]models.Device, 16),
  148. due: true,
  149. }
  150. p.sceneAssignment[device.ID] = newScene
  151. p.scenes = append(p.scenes, newScene)
  152. if selectedAssignment.DurationMS <= 0 {
  153. newScene.endTime = time.Time{}
  154. }
  155. return true
  156. }
  157. func (p *Publisher) runBridge(id int) {
  158. defer func() {
  159. p.mu.Lock()
  160. p.started[id] = false
  161. p.mu.Unlock()
  162. }()
  163. bridge, err := config.BridgeRepository().Find(context.Background(), id)
  164. if err != nil {
  165. log.Println("Failed to get bridge data:", err)
  166. return
  167. }
  168. driver, err := config.DriverProvider().Provide(bridge.Driver)
  169. if err != nil {
  170. log.Println("Failed to get bridge driver:", err)
  171. log.Println("Maybe Lucifer needs to be updated.")
  172. return
  173. }
  174. devices := make(map[int]models.Device)
  175. for {
  176. p.mu.Lock()
  177. if len(p.pending[id]) == 0 {
  178. if p.waiting[id] == nil {
  179. p.waiting[id] = make(chan struct{})
  180. }
  181. waitCh := p.waiting[id]
  182. p.mu.Unlock()
  183. <-waitCh
  184. p.mu.Lock()
  185. }
  186. updates := p.pending[id]
  187. p.pending[id] = p.pending[id][:0:0]
  188. p.mu.Unlock()
  189. // Only allow the latest update per device (this avoids slow bridges causing a backlog of cations).
  190. for key := range devices {
  191. delete(devices, key)
  192. }
  193. for _, update := range updates {
  194. devices[update.ID] = update
  195. }
  196. updates = updates[:0]
  197. for _, value := range devices {
  198. updates = append(updates, value)
  199. }
  200. err := driver.Publish(context.Background(), bridge, updates)
  201. if err != nil {
  202. log.Println("Failed to publish to driver:", err)
  203. p.mu.Lock()
  204. p.pending[id] = append(updates, p.pending[id]...)
  205. p.mu.Unlock()
  206. return
  207. }
  208. }
  209. }
  210. var publisher = Publisher{
  211. sceneData: make(map[int]*models.Scene),
  212. scenes: make([]*Scene, 0, 16),
  213. sceneAssignment: make(map[int]*Scene, 16),
  214. started: make(map[int]bool),
  215. pending: make(map[int][]models.Device),
  216. waiting: make(map[int]chan struct{}),
  217. }
  218. func Initialize(ctx context.Context) error {
  219. err :=publisher.ReloadScenes(ctx)
  220. if err != nil {
  221. return err
  222. }
  223. err = publisher.ReloadDevices(ctx)
  224. if err != nil {
  225. return err
  226. }
  227. go publisher.PublishChannel(config.PublishChannel)
  228. go publisher.Run()
  229. return nil
  230. }