You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

326 lines
7.7 KiB

  1. package publisher
  2. import (
  3. "context"
  4. "git.aiterp.net/lucifer/new-server/app/config"
  5. "git.aiterp.net/lucifer/new-server/models"
  6. "log"
  7. "sync"
  8. "time"
  9. )
  10. type Publisher struct {
  11. mu sync.Mutex
  12. sceneData map[int]*models.Scene
  13. scenes []*Scene
  14. sceneAssignment map[int]*Scene
  15. started map[int]bool
  16. pending map[int][]models.Device
  17. waiting map[int]chan struct{}
  18. }
  19. func (p *Publisher) SceneState(deviceID int) *models.DeviceState {
  20. p.mu.Lock()
  21. defer p.mu.Unlock()
  22. if s := p.sceneAssignment[deviceID]; s != nil {
  23. return s.LastState(deviceID)
  24. }
  25. return nil
  26. }
  27. func (p *Publisher) UpdateScene(data models.Scene) {
  28. p.mu.Lock()
  29. p.sceneData[data.ID] = &data
  30. for _, scene := range p.scenes {
  31. if scene.data.ID == data.ID {
  32. scene.UpdateScene(data)
  33. }
  34. }
  35. p.mu.Unlock()
  36. }
  37. func (p *Publisher) ReloadScenes(ctx context.Context) error {
  38. scenes, err := config.SceneRepository().FetchAll(ctx)
  39. if err != nil {
  40. return err
  41. }
  42. p.mu.Lock()
  43. for i, scene := range scenes {
  44. p.sceneData[scene.ID] = &scenes[i]
  45. }
  46. for _, scene := range p.scenes {
  47. scene.UpdateScene(*p.sceneData[scene.data.ID])
  48. }
  49. p.mu.Unlock()
  50. return nil
  51. }
  52. func (p *Publisher) ReloadDevices(ctx context.Context) error {
  53. devices, err := config.DeviceRepository().FetchByReference(ctx, models.RKAll, "")
  54. if err != nil {
  55. return err
  56. }
  57. p.Publish(devices...)
  58. return nil
  59. }
  60. func (p *Publisher) Publish(devices ...models.Device) {
  61. if len(devices) == 0 {
  62. return
  63. }
  64. p.mu.Lock()
  65. defer p.mu.Unlock()
  66. for _, device := range devices {
  67. if !p.started[device.BridgeID] {
  68. p.started[device.BridgeID] = true
  69. go p.runBridge(device.BridgeID)
  70. }
  71. p.reassignDevice(device)
  72. if p.sceneAssignment[device.ID] != nil {
  73. p.sceneAssignment[device.ID].UpsertDevice(device)
  74. } else {
  75. p.pending[device.BridgeID] = append(p.pending[device.BridgeID], device)
  76. if p.waiting[device.BridgeID] != nil {
  77. close(p.waiting[device.BridgeID])
  78. p.waiting[device.BridgeID] = nil
  79. }
  80. }
  81. }
  82. }
  83. func (p *Publisher) PublishChannel(ch <-chan []models.Device) {
  84. for list := range ch {
  85. p.Publish(list...)
  86. }
  87. }
  88. func (p *Publisher) Run() {
  89. ticker := time.NewTicker(time.Millisecond * 100)
  90. deleteList := make([]int, 0, 8)
  91. updatedList := make([]models.Device, 0, 16)
  92. reassignList := make([]models.Device, 0, 16)
  93. for range ticker.C {
  94. deleteList = deleteList[:0]
  95. updatedList = updatedList[:0]
  96. reassignList = reassignList[:0]
  97. p.mu.Lock()
  98. for i, scene := range p.scenes {
  99. if (!scene.endTime.IsZero() && time.Now().After(scene.endTime)) || scene.Empty() {
  100. deleteList = append(deleteList, i-len(deleteList))
  101. updatedList = append(updatedList, scene.AllDevices()...)
  102. reassignList = append(reassignList, scene.AllDevices()...)
  103. log.Printf("Removing scene instance for %s (%d)", scene.data.Name, scene.data.ID)
  104. for _, device := range scene.AllDevices() {
  105. p.sceneAssignment[device.ID] = nil
  106. }
  107. continue
  108. }
  109. if scene.Due() {
  110. updatedList = append(updatedList, scene.Run()...)
  111. updatedList = append(updatedList, scene.UnaffectedDevices()...)
  112. }
  113. }
  114. for _, i := range deleteList {
  115. p.scenes = append(p.scenes[:i], p.scenes[i+1:]...)
  116. }
  117. for _, device := range reassignList {
  118. p.reassignDevice(device)
  119. if p.sceneAssignment[device.ID] != nil {
  120. p.sceneAssignment[device.ID].UpsertDevice(device)
  121. }
  122. }
  123. for _, device := range updatedList {
  124. if !p.started[device.BridgeID] {
  125. p.started[device.BridgeID] = true
  126. go p.runBridge(device.BridgeID)
  127. }
  128. p.pending[device.BridgeID] = append(p.pending[device.BridgeID], device)
  129. if p.waiting[device.BridgeID] != nil {
  130. close(p.waiting[device.BridgeID])
  131. p.waiting[device.BridgeID] = nil
  132. }
  133. }
  134. p.mu.Unlock()
  135. }
  136. }
  137. // reassignDevice re-evaluates the device's scene assignment config. It will return whether the scene changed, which
  138. // should trigger an update.
  139. func (p *Publisher) reassignDevice(device models.Device) bool {
  140. var selectedAssignment *models.DeviceSceneAssignment
  141. for i, assignment := range device.SceneAssignments {
  142. duration := time.Duration(assignment.DurationMS) * time.Millisecond
  143. if duration <= 0 || time.Now().Before(assignment.StartTime.Add(duration)) {
  144. selectedAssignment = &device.SceneAssignments[i]
  145. }
  146. }
  147. if selectedAssignment == nil {
  148. if p.sceneAssignment[device.ID] != nil {
  149. p.sceneAssignment[device.ID].RemoveDevice(device)
  150. delete(p.sceneAssignment, device.ID)
  151. // Scene changed
  152. return true
  153. }
  154. // Stop here, no scene should be assigned.
  155. return false
  156. } else {
  157. if p.sceneData[selectedAssignment.SceneID] == nil {
  158. // Freeze until scene becomes available.
  159. return false
  160. }
  161. }
  162. if p.sceneAssignment[device.ID] != nil {
  163. scene := p.sceneAssignment[device.ID]
  164. if scene.data.ID == selectedAssignment.SceneID && scene.group == selectedAssignment.Group {
  165. // Current assignment is good.
  166. return false
  167. }
  168. p.sceneAssignment[device.ID].RemoveDevice(device)
  169. delete(p.sceneAssignment, device.ID)
  170. }
  171. for _, scene := range p.scenes {
  172. if scene.data.ID == selectedAssignment.SceneID && scene.group == selectedAssignment.Group {
  173. p.sceneAssignment[device.ID] = scene
  174. return true
  175. }
  176. }
  177. newScene := &Scene{
  178. data: p.sceneData[selectedAssignment.SceneID],
  179. group: selectedAssignment.Group,
  180. startTime: selectedAssignment.StartTime,
  181. endTime: selectedAssignment.StartTime.Add(time.Duration(selectedAssignment.DurationMS) * time.Millisecond),
  182. roleMap: make(map[int]int, 16),
  183. roleList: make(map[int][]models.Device, 16),
  184. lastStates: make(map[int]models.DeviceState, 16),
  185. due: true,
  186. }
  187. p.sceneAssignment[device.ID] = newScene
  188. p.scenes = append(p.scenes, newScene)
  189. if selectedAssignment.DurationMS <= 0 {
  190. newScene.endTime = time.Time{}
  191. }
  192. return true
  193. }
  194. func (p *Publisher) runBridge(id int) {
  195. defer func() {
  196. p.mu.Lock()
  197. p.started[id] = false
  198. p.mu.Unlock()
  199. }()
  200. bridge, err := config.BridgeRepository().Find(context.Background(), id)
  201. if err != nil {
  202. log.Println("Failed to get bridge data:", err)
  203. return
  204. }
  205. driver, err := config.DriverProvider().Provide(bridge.Driver)
  206. if err != nil {
  207. log.Println("Failed to get bridge driver:", err)
  208. log.Println("Maybe Lucifer needs to be updated.")
  209. return
  210. }
  211. devices := make(map[int]models.Device)
  212. for {
  213. p.mu.Lock()
  214. if len(p.pending[id]) == 0 {
  215. if p.waiting[id] == nil {
  216. p.waiting[id] = make(chan struct{})
  217. }
  218. waitCh := p.waiting[id]
  219. p.mu.Unlock()
  220. <-waitCh
  221. p.mu.Lock()
  222. }
  223. updates := p.pending[id]
  224. p.pending[id] = p.pending[id][:0:0]
  225. p.mu.Unlock()
  226. // Only allow the latest update per device (this avoids slow bridges causing a backlog of cations).
  227. for key := range devices {
  228. delete(devices, key)
  229. }
  230. for _, update := range updates {
  231. devices[update.ID] = update
  232. }
  233. updates = updates[:0]
  234. for _, value := range devices {
  235. updates = append(updates, value)
  236. }
  237. err := driver.Publish(context.Background(), bridge, updates)
  238. if err != nil {
  239. log.Println("Failed to publish to driver:", err)
  240. p.mu.Lock()
  241. p.pending[id] = append(updates, p.pending[id]...)
  242. p.mu.Unlock()
  243. return
  244. }
  245. }
  246. }
  247. var publisher = Publisher{
  248. sceneData: make(map[int]*models.Scene),
  249. scenes: make([]*Scene, 0, 16),
  250. sceneAssignment: make(map[int]*Scene, 16),
  251. started: make(map[int]bool),
  252. pending: make(map[int][]models.Device),
  253. waiting: make(map[int]chan struct{}),
  254. }
  255. func Global() *Publisher {
  256. return &publisher
  257. }
  258. func Initialize(ctx context.Context) error {
  259. err := publisher.ReloadScenes(ctx)
  260. if err != nil {
  261. return err
  262. }
  263. err = publisher.ReloadDevices(ctx)
  264. if err != nil {
  265. return err
  266. }
  267. go publisher.Run()
  268. time.Sleep(time.Millisecond * 50)
  269. go publisher.PublishChannel(config.PublishChannel)
  270. return nil
  271. }