You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

297 lines
6.9 KiB

  1. package publisher
  2. import (
  3. "context"
  4. "git.aiterp.net/lucifer/new-server/app/config"
  5. "git.aiterp.net/lucifer/new-server/models"
  6. "log"
  7. "sync"
  8. "time"
  9. )
  10. type Publisher struct {
  11. mu sync.Mutex
  12. sceneData map[int]*models.Scene
  13. scenes []*Scene
  14. sceneAssignment map[int]*Scene
  15. started map[int]bool
  16. pending map[int][]models.Device
  17. waiting map[int]chan struct{}
  18. }
  19. func (p *Publisher) UpdateScene(data models.Scene) {
  20. p.mu.Lock()
  21. p.sceneData[data.ID] = &data
  22. for _, scene := range p.scenes {
  23. if scene.data.ID == data.ID {
  24. scene.UpdateScene(data)
  25. }
  26. }
  27. p.mu.Unlock()
  28. }
  29. func (p *Publisher) ReloadScenes(ctx context.Context) error {
  30. scenes, err := config.SceneRepository().FetchAll(ctx)
  31. if err != nil {
  32. return err
  33. }
  34. p.mu.Lock()
  35. for i, scene := range scenes {
  36. p.sceneData[scene.ID] = &scenes[i]
  37. }
  38. for _, scene := range p.scenes {
  39. scene.UpdateScene(*p.sceneData[scene.data.ID])
  40. }
  41. p.mu.Unlock()
  42. return nil
  43. }
  44. func (p *Publisher) ReloadDevices(ctx context.Context) error {
  45. devices, err := config.DeviceRepository().FetchByReference(ctx, models.RKAll, "")
  46. if err != nil {
  47. return err
  48. }
  49. p.Publish(devices...)
  50. return nil
  51. }
  52. func (p *Publisher) Publish(devices ...models.Device) {
  53. if len(devices) == 0 {
  54. return
  55. }
  56. p.mu.Lock()
  57. defer p.mu.Unlock()
  58. for _, device := range devices {
  59. if !p.started[device.BridgeID] {
  60. p.started[device.BridgeID] = true
  61. go p.runBridge(device.BridgeID)
  62. }
  63. p.reassignDevice(device)
  64. if p.sceneAssignment[device.ID] != nil {
  65. p.sceneAssignment[device.ID].UpsertDevice(device)
  66. } else {
  67. p.pending[device.BridgeID] = append(p.pending[device.BridgeID], device)
  68. if p.waiting[device.BridgeID] != nil {
  69. close(p.waiting[device.BridgeID])
  70. p.waiting[device.BridgeID] = nil
  71. }
  72. }
  73. }
  74. }
  75. func (p *Publisher) PublishChannel(ch <-chan []models.Device) {
  76. for list := range ch {
  77. p.Publish(list...)
  78. }
  79. }
  80. func (p *Publisher) Run() {
  81. ticker := time.NewTicker(time.Millisecond * 100)
  82. deleteList := make([]int, 0, 8)
  83. updatedList := make([]models.Device, 0, 16)
  84. for range ticker.C {
  85. deleteList = deleteList[:0]
  86. updatedList = updatedList[:0]
  87. p.mu.Lock()
  88. for i, scene := range p.scenes {
  89. if (!scene.endTime.IsZero() && time.Now().After(scene.endTime)) || scene.Empty() {
  90. deleteList = append(deleteList, i-len(deleteList))
  91. updatedList = append(updatedList, scene.AllDevices()...)
  92. log.Printf("Removing scene instance for %s (%d)", scene.data.Name, scene.data.ID)
  93. for _, device := range scene.AllDevices() {
  94. p.sceneAssignment[device.ID] = nil
  95. }
  96. continue
  97. }
  98. if scene.Due() {
  99. updatedList = append(updatedList, scene.Run()...)
  100. updatedList = append(updatedList, scene.UnaffectedDevices()...)
  101. }
  102. }
  103. for _, i := range deleteList {
  104. p.scenes = append(p.scenes[:i], p.scenes[i+1:]...)
  105. }
  106. for _, device := range updatedList {
  107. if !p.started[device.BridgeID] {
  108. p.started[device.BridgeID] = true
  109. go p.runBridge(device.BridgeID)
  110. }
  111. p.pending[device.BridgeID] = append(p.pending[device.BridgeID], device)
  112. if p.waiting[device.BridgeID] != nil {
  113. close(p.waiting[device.BridgeID])
  114. p.waiting[device.BridgeID] = nil
  115. }
  116. }
  117. p.mu.Unlock()
  118. }
  119. }
  120. // reassignDevice re-evaluates the device's scene assignment config. It will return whether the scene changed, which
  121. // should trigger an update.
  122. func (p *Publisher) reassignDevice(device models.Device) bool {
  123. var selectedAssignment *models.DeviceSceneAssignment
  124. for _, assignment := range device.SceneAssignments {
  125. duration := time.Duration(assignment.DurationMS) * time.Millisecond
  126. if duration <= 0 || time.Now().Before(assignment.StartTime.Add(duration)) {
  127. selectedAssignment = &assignment
  128. }
  129. }
  130. if selectedAssignment == nil {
  131. if p.sceneAssignment[device.ID] != nil {
  132. p.sceneAssignment[device.ID].RemoveDevice(device)
  133. delete(p.sceneAssignment, device.ID)
  134. // Scene changed
  135. return true
  136. }
  137. // Stop here, no scene should be assigned.
  138. return false
  139. }
  140. if p.sceneAssignment[device.ID] != nil {
  141. scene := p.sceneAssignment[device.ID]
  142. if scene.data.ID == selectedAssignment.SceneID && scene.group == selectedAssignment.Group {
  143. // Current assignment is good.
  144. return false
  145. }
  146. p.sceneAssignment[device.ID].RemoveDevice(device)
  147. delete(p.sceneAssignment, device.ID)
  148. }
  149. for _, scene := range p.scenes {
  150. if scene.data.ID == selectedAssignment.SceneID && scene.group == selectedAssignment.Group {
  151. p.sceneAssignment[device.ID] = scene
  152. return true
  153. }
  154. }
  155. newScene := &Scene{
  156. data: p.sceneData[selectedAssignment.SceneID],
  157. group: selectedAssignment.Group,
  158. startTime: selectedAssignment.StartTime,
  159. endTime: selectedAssignment.StartTime.Add(time.Duration(selectedAssignment.DurationMS) * time.Millisecond),
  160. roleMap: make(map[int]int, 16),
  161. roleList: make(map[int][]models.Device, 16),
  162. due: true,
  163. }
  164. p.sceneAssignment[device.ID] = newScene
  165. p.scenes = append(p.scenes, newScene)
  166. if selectedAssignment.DurationMS <= 0 {
  167. newScene.endTime = time.Time{}
  168. }
  169. return true
  170. }
  171. func (p *Publisher) runBridge(id int) {
  172. defer func() {
  173. p.mu.Lock()
  174. p.started[id] = false
  175. p.mu.Unlock()
  176. }()
  177. bridge, err := config.BridgeRepository().Find(context.Background(), id)
  178. if err != nil {
  179. log.Println("Failed to get bridge data:", err)
  180. return
  181. }
  182. driver, err := config.DriverProvider().Provide(bridge.Driver)
  183. if err != nil {
  184. log.Println("Failed to get bridge driver:", err)
  185. log.Println("Maybe Lucifer needs to be updated.")
  186. return
  187. }
  188. devices := make(map[int]models.Device)
  189. for {
  190. p.mu.Lock()
  191. if len(p.pending[id]) == 0 {
  192. if p.waiting[id] == nil {
  193. p.waiting[id] = make(chan struct{})
  194. }
  195. waitCh := p.waiting[id]
  196. p.mu.Unlock()
  197. <-waitCh
  198. p.mu.Lock()
  199. }
  200. updates := p.pending[id]
  201. p.pending[id] = p.pending[id][:0:0]
  202. p.mu.Unlock()
  203. // Only allow the latest update per device (this avoids slow bridges causing a backlog of cations).
  204. for key := range devices {
  205. delete(devices, key)
  206. }
  207. for _, update := range updates {
  208. devices[update.ID] = update
  209. }
  210. updates = updates[:0]
  211. for _, value := range devices {
  212. updates = append(updates, value)
  213. }
  214. err := driver.Publish(context.Background(), bridge, updates)
  215. if err != nil {
  216. log.Println("Failed to publish to driver:", err)
  217. p.mu.Lock()
  218. p.pending[id] = append(updates, p.pending[id]...)
  219. p.mu.Unlock()
  220. return
  221. }
  222. }
  223. }
  224. var publisher = Publisher{
  225. sceneData: make(map[int]*models.Scene),
  226. scenes: make([]*Scene, 0, 16),
  227. sceneAssignment: make(map[int]*Scene, 16),
  228. started: make(map[int]bool),
  229. pending: make(map[int][]models.Device),
  230. waiting: make(map[int]chan struct{}),
  231. }
  232. func Global() *Publisher {
  233. return &publisher
  234. }
  235. func Initialize(ctx context.Context) error {
  236. err :=publisher.ReloadScenes(ctx)
  237. if err != nil {
  238. return err
  239. }
  240. err = publisher.ReloadDevices(ctx)
  241. if err != nil {
  242. return err
  243. }
  244. go publisher.PublishChannel(config.PublishChannel)
  245. go publisher.Run()
  246. return nil
  247. }