You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

315 lines
7.3 KiB

  1. package publisher
  2. import (
  3. "context"
  4. "git.aiterp.net/lucifer/new-server/app/config"
  5. "git.aiterp.net/lucifer/new-server/models"
  6. "log"
  7. "sync"
  8. "time"
  9. )
  10. type Publisher struct {
  11. mu sync.Mutex
  12. sceneData map[int]*models.Scene
  13. scenes []*Scene
  14. sceneAssignment map[int]*Scene
  15. started map[int]bool
  16. pending map[int][]models.Device
  17. waiting map[int]chan struct{}
  18. }
  19. func (p *Publisher) SceneState(deviceID int) *models.DeviceState {
  20. p.mu.Lock()
  21. defer p.mu.Unlock()
  22. if s := p.sceneAssignment[deviceID]; s != nil {
  23. return s.LastState(deviceID)
  24. }
  25. return nil
  26. }
  27. func (p *Publisher) UpdateScene(data models.Scene) {
  28. p.mu.Lock()
  29. p.sceneData[data.ID] = &data
  30. for _, scene := range p.scenes {
  31. if scene.data.ID == data.ID {
  32. scene.UpdateScene(data)
  33. }
  34. }
  35. p.mu.Unlock()
  36. }
  37. func (p *Publisher) ReloadScenes(ctx context.Context) error {
  38. scenes, err := config.SceneRepository().FetchAll(ctx)
  39. if err != nil {
  40. return err
  41. }
  42. p.mu.Lock()
  43. for i, scene := range scenes {
  44. p.sceneData[scene.ID] = &scenes[i]
  45. }
  46. for _, scene := range p.scenes {
  47. scene.UpdateScene(*p.sceneData[scene.data.ID])
  48. }
  49. p.mu.Unlock()
  50. return nil
  51. }
  52. func (p *Publisher) ReloadDevices(ctx context.Context) error {
  53. devices, err := config.DeviceRepository().FetchByReference(ctx, models.RKAll, "")
  54. if err != nil {
  55. return err
  56. }
  57. p.Publish(devices...)
  58. return nil
  59. }
  60. func (p *Publisher) Publish(devices ...models.Device) {
  61. if len(devices) == 0 {
  62. return
  63. }
  64. p.mu.Lock()
  65. defer p.mu.Unlock()
  66. for _, device := range devices {
  67. if !p.started[device.BridgeID] {
  68. p.started[device.BridgeID] = true
  69. go p.runBridge(device.BridgeID)
  70. }
  71. p.reassignDevice(device)
  72. if p.sceneAssignment[device.ID] != nil {
  73. p.sceneAssignment[device.ID].UpsertDevice(device)
  74. } else {
  75. p.pending[device.BridgeID] = append(p.pending[device.BridgeID], device)
  76. if p.waiting[device.BridgeID] != nil {
  77. close(p.waiting[device.BridgeID])
  78. p.waiting[device.BridgeID] = nil
  79. }
  80. }
  81. }
  82. }
  83. func (p *Publisher) PublishChannel(ch <-chan []models.Device) {
  84. for list := range ch {
  85. p.Publish(list...)
  86. }
  87. }
  88. func (p *Publisher) Run() {
  89. ticker := time.NewTicker(time.Millisecond * 100)
  90. deleteList := make([]int, 0, 8)
  91. updatedList := make([]models.Device, 0, 16)
  92. for range ticker.C {
  93. deleteList = deleteList[:0]
  94. updatedList = updatedList[:0]
  95. p.mu.Lock()
  96. for i, scene := range p.scenes {
  97. if (!scene.endTime.IsZero() && time.Now().After(scene.endTime)) || scene.Empty() {
  98. deleteList = append(deleteList, i-len(deleteList))
  99. updatedList = append(updatedList, scene.AllDevices()...)
  100. log.Printf("Removing scene instance for %s (%d)", scene.data.Name, scene.data.ID)
  101. for _, device := range scene.AllDevices() {
  102. p.sceneAssignment[device.ID] = nil
  103. }
  104. continue
  105. }
  106. if scene.Due() {
  107. updatedList = append(updatedList, scene.Run()...)
  108. updatedList = append(updatedList, scene.UnaffectedDevices()...)
  109. }
  110. }
  111. for _, i := range deleteList {
  112. p.scenes = append(p.scenes[:i], p.scenes[i+1:]...)
  113. }
  114. for _, device := range updatedList {
  115. if !p.started[device.BridgeID] {
  116. p.started[device.BridgeID] = true
  117. go p.runBridge(device.BridgeID)
  118. }
  119. p.pending[device.BridgeID] = append(p.pending[device.BridgeID], device)
  120. if p.waiting[device.BridgeID] != nil {
  121. close(p.waiting[device.BridgeID])
  122. p.waiting[device.BridgeID] = nil
  123. }
  124. }
  125. p.mu.Unlock()
  126. }
  127. }
  128. // reassignDevice re-evaluates the device's scene assignment config. It will return whether the scene changed, which
  129. // should trigger an update.
  130. func (p *Publisher) reassignDevice(device models.Device) bool {
  131. var selectedAssignment *models.DeviceSceneAssignment
  132. for _, assignment := range device.SceneAssignments {
  133. duration := time.Duration(assignment.DurationMS) * time.Millisecond
  134. if duration <= 0 || time.Now().Before(assignment.StartTime.Add(duration)) {
  135. selectedAssignment = &assignment
  136. }
  137. }
  138. if selectedAssignment == nil {
  139. if p.sceneAssignment[device.ID] != nil {
  140. p.sceneAssignment[device.ID].RemoveDevice(device)
  141. delete(p.sceneAssignment, device.ID)
  142. // Scene changed
  143. return true
  144. }
  145. // Stop here, no scene should be assigned.
  146. return false
  147. } else {
  148. if p.sceneData[selectedAssignment.SceneID] == nil {
  149. // Freeze until scene becomes available.
  150. return false
  151. }
  152. }
  153. if p.sceneAssignment[device.ID] != nil {
  154. scene := p.sceneAssignment[device.ID]
  155. if scene.data.ID == selectedAssignment.SceneID && scene.group == selectedAssignment.Group {
  156. // Current assignment is good.
  157. return false
  158. }
  159. p.sceneAssignment[device.ID].RemoveDevice(device)
  160. delete(p.sceneAssignment, device.ID)
  161. }
  162. for _, scene := range p.scenes {
  163. if scene.data.ID == selectedAssignment.SceneID && scene.group == selectedAssignment.Group {
  164. p.sceneAssignment[device.ID] = scene
  165. return true
  166. }
  167. }
  168. newScene := &Scene{
  169. data: p.sceneData[selectedAssignment.SceneID],
  170. group: selectedAssignment.Group,
  171. startTime: selectedAssignment.StartTime,
  172. endTime: selectedAssignment.StartTime.Add(time.Duration(selectedAssignment.DurationMS) * time.Millisecond),
  173. roleMap: make(map[int]int, 16),
  174. roleList: make(map[int][]models.Device, 16),
  175. lastStates: make(map[int]models.DeviceState, 16),
  176. due: true,
  177. }
  178. p.sceneAssignment[device.ID] = newScene
  179. p.scenes = append(p.scenes, newScene)
  180. if selectedAssignment.DurationMS <= 0 {
  181. newScene.endTime = time.Time{}
  182. }
  183. return true
  184. }
  185. func (p *Publisher) runBridge(id int) {
  186. defer func() {
  187. p.mu.Lock()
  188. p.started[id] = false
  189. p.mu.Unlock()
  190. }()
  191. bridge, err := config.BridgeRepository().Find(context.Background(), id)
  192. if err != nil {
  193. log.Println("Failed to get bridge data:", err)
  194. return
  195. }
  196. driver, err := config.DriverProvider().Provide(bridge.Driver)
  197. if err != nil {
  198. log.Println("Failed to get bridge driver:", err)
  199. log.Println("Maybe Lucifer needs to be updated.")
  200. return
  201. }
  202. devices := make(map[int]models.Device)
  203. for {
  204. p.mu.Lock()
  205. if len(p.pending[id]) == 0 {
  206. if p.waiting[id] == nil {
  207. p.waiting[id] = make(chan struct{})
  208. }
  209. waitCh := p.waiting[id]
  210. p.mu.Unlock()
  211. <-waitCh
  212. p.mu.Lock()
  213. }
  214. updates := p.pending[id]
  215. p.pending[id] = p.pending[id][:0:0]
  216. p.mu.Unlock()
  217. // Only allow the latest update per device (this avoids slow bridges causing a backlog of cations).
  218. for key := range devices {
  219. delete(devices, key)
  220. }
  221. for _, update := range updates {
  222. devices[update.ID] = update
  223. }
  224. updates = updates[:0]
  225. for _, value := range devices {
  226. updates = append(updates, value)
  227. }
  228. err := driver.Publish(context.Background(), bridge, updates)
  229. if err != nil {
  230. log.Println("Failed to publish to driver:", err)
  231. p.mu.Lock()
  232. p.pending[id] = append(updates, p.pending[id]...)
  233. p.mu.Unlock()
  234. return
  235. }
  236. }
  237. }
  238. var publisher = Publisher{
  239. sceneData: make(map[int]*models.Scene),
  240. scenes: make([]*Scene, 0, 16),
  241. sceneAssignment: make(map[int]*Scene, 16),
  242. started: make(map[int]bool),
  243. pending: make(map[int][]models.Device),
  244. waiting: make(map[int]chan struct{}),
  245. }
  246. func Global() *Publisher {
  247. return &publisher
  248. }
  249. func Initialize(ctx context.Context) error {
  250. err := publisher.ReloadScenes(ctx)
  251. if err != nil {
  252. return err
  253. }
  254. err = publisher.ReloadDevices(ctx)
  255. if err != nil {
  256. return err
  257. }
  258. go publisher.Run()
  259. time.Sleep(time.Millisecond * 50)
  260. go publisher.PublishChannel(config.PublishChannel)
  261. return nil
  262. }