You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

310 lines
7.2 KiB

  1. package publisher
  2. import (
  3. "context"
  4. "git.aiterp.net/lucifer/new-server/app/config"
  5. "git.aiterp.net/lucifer/new-server/models"
  6. "log"
  7. "sync"
  8. "time"
  9. )
  10. type Publisher struct {
  11. mu sync.Mutex
  12. sceneData map[int]*models.Scene
  13. scenes []*Scene
  14. sceneAssignment map[int]*Scene
  15. started map[int]bool
  16. pending map[int][]models.Device
  17. waiting map[int]chan struct{}
  18. }
  19. func (p *Publisher) SceneState(deviceID int) *models.DeviceState {
  20. p.mu.Lock()
  21. defer p.mu.Unlock()
  22. if s := p.sceneAssignment[deviceID]; s != nil {
  23. return s.LastState(deviceID)
  24. }
  25. return nil
  26. }
  27. func (p *Publisher) UpdateScene(data models.Scene) {
  28. p.mu.Lock()
  29. p.sceneData[data.ID] = &data
  30. for _, scene := range p.scenes {
  31. if scene.data.ID == data.ID {
  32. scene.UpdateScene(data)
  33. }
  34. }
  35. p.mu.Unlock()
  36. }
  37. func (p *Publisher) ReloadScenes(ctx context.Context) error {
  38. scenes, err := config.SceneRepository().FetchAll(ctx)
  39. if err != nil {
  40. return err
  41. }
  42. p.mu.Lock()
  43. for i, scene := range scenes {
  44. p.sceneData[scene.ID] = &scenes[i]
  45. }
  46. for _, scene := range p.scenes {
  47. scene.UpdateScene(*p.sceneData[scene.data.ID])
  48. }
  49. p.mu.Unlock()
  50. return nil
  51. }
  52. func (p *Publisher) ReloadDevices(ctx context.Context) error {
  53. devices, err := config.DeviceRepository().FetchByReference(ctx, models.RKAll, "")
  54. if err != nil {
  55. return err
  56. }
  57. p.Publish(devices...)
  58. return nil
  59. }
  60. func (p *Publisher) Publish(devices ...models.Device) {
  61. if len(devices) == 0 {
  62. return
  63. }
  64. p.mu.Lock()
  65. defer p.mu.Unlock()
  66. for _, device := range devices {
  67. if !p.started[device.BridgeID] {
  68. p.started[device.BridgeID] = true
  69. go p.runBridge(device.BridgeID)
  70. }
  71. p.reassignDevice(device)
  72. if p.sceneAssignment[device.ID] != nil {
  73. p.sceneAssignment[device.ID].UpsertDevice(device)
  74. } else {
  75. p.pending[device.BridgeID] = append(p.pending[device.BridgeID], device)
  76. if p.waiting[device.BridgeID] != nil {
  77. close(p.waiting[device.BridgeID])
  78. p.waiting[device.BridgeID] = nil
  79. }
  80. }
  81. }
  82. }
  83. func (p *Publisher) PublishChannel(ch <-chan []models.Device) {
  84. for list := range ch {
  85. p.Publish(list...)
  86. }
  87. }
  88. func (p *Publisher) Run() {
  89. ticker := time.NewTicker(time.Millisecond * 100)
  90. deleteList := make([]int, 0, 8)
  91. updatedList := make([]models.Device, 0, 16)
  92. for range ticker.C {
  93. deleteList = deleteList[:0]
  94. updatedList = updatedList[:0]
  95. p.mu.Lock()
  96. for i, scene := range p.scenes {
  97. if (!scene.endTime.IsZero() && time.Now().After(scene.endTime)) || scene.Empty() {
  98. deleteList = append(deleteList, i-len(deleteList))
  99. updatedList = append(updatedList, scene.AllDevices()...)
  100. log.Printf("Removing scene instance for %s (%d)", scene.data.Name, scene.data.ID)
  101. for _, device := range scene.AllDevices() {
  102. p.sceneAssignment[device.ID] = nil
  103. }
  104. continue
  105. }
  106. if scene.Due() {
  107. updatedList = append(updatedList, scene.Run()...)
  108. updatedList = append(updatedList, scene.UnaffectedDevices()...)
  109. }
  110. }
  111. for _, i := range deleteList {
  112. p.scenes = append(p.scenes[:i], p.scenes[i+1:]...)
  113. }
  114. for _, device := range updatedList {
  115. if !p.started[device.BridgeID] {
  116. p.started[device.BridgeID] = true
  117. go p.runBridge(device.BridgeID)
  118. }
  119. p.pending[device.BridgeID] = append(p.pending[device.BridgeID], device)
  120. if p.waiting[device.BridgeID] != nil {
  121. close(p.waiting[device.BridgeID])
  122. p.waiting[device.BridgeID] = nil
  123. }
  124. }
  125. p.mu.Unlock()
  126. }
  127. }
  128. // reassignDevice re-evaluates the device's scene assignment config. It will return whether the scene changed, which
  129. // should trigger an update.
  130. func (p *Publisher) reassignDevice(device models.Device) bool {
  131. var selectedAssignment *models.DeviceSceneAssignment
  132. for _, assignment := range device.SceneAssignments {
  133. duration := time.Duration(assignment.DurationMS) * time.Millisecond
  134. if duration <= 0 || time.Now().Before(assignment.StartTime.Add(duration)) {
  135. selectedAssignment = &assignment
  136. }
  137. }
  138. if selectedAssignment == nil {
  139. if p.sceneAssignment[device.ID] != nil {
  140. p.sceneAssignment[device.ID].RemoveDevice(device)
  141. delete(p.sceneAssignment, device.ID)
  142. // Scene changed
  143. return true
  144. }
  145. // Stop here, no scene should be assigned.
  146. return false
  147. }
  148. if p.sceneAssignment[device.ID] != nil {
  149. scene := p.sceneAssignment[device.ID]
  150. if scene.data.ID == selectedAssignment.SceneID && scene.group == selectedAssignment.Group {
  151. // Current assignment is good.
  152. return false
  153. }
  154. p.sceneAssignment[device.ID].RemoveDevice(device)
  155. delete(p.sceneAssignment, device.ID)
  156. }
  157. for _, scene := range p.scenes {
  158. if scene.data.ID == selectedAssignment.SceneID && scene.group == selectedAssignment.Group {
  159. p.sceneAssignment[device.ID] = scene
  160. return true
  161. }
  162. }
  163. newScene := &Scene{
  164. data: p.sceneData[selectedAssignment.SceneID],
  165. group: selectedAssignment.Group,
  166. startTime: selectedAssignment.StartTime,
  167. endTime: selectedAssignment.StartTime.Add(time.Duration(selectedAssignment.DurationMS) * time.Millisecond),
  168. roleMap: make(map[int]int, 16),
  169. roleList: make(map[int][]models.Device, 16),
  170. lastStates: make(map[int]models.DeviceState, 16),
  171. due: true,
  172. }
  173. p.sceneAssignment[device.ID] = newScene
  174. p.scenes = append(p.scenes, newScene)
  175. if selectedAssignment.DurationMS <= 0 {
  176. newScene.endTime = time.Time{}
  177. }
  178. return true
  179. }
  180. func (p *Publisher) runBridge(id int) {
  181. defer func() {
  182. p.mu.Lock()
  183. p.started[id] = false
  184. p.mu.Unlock()
  185. }()
  186. bridge, err := config.BridgeRepository().Find(context.Background(), id)
  187. if err != nil {
  188. log.Println("Failed to get bridge data:", err)
  189. return
  190. }
  191. driver, err := config.DriverProvider().Provide(bridge.Driver)
  192. if err != nil {
  193. log.Println("Failed to get bridge driver:", err)
  194. log.Println("Maybe Lucifer needs to be updated.")
  195. return
  196. }
  197. devices := make(map[int]models.Device)
  198. for {
  199. p.mu.Lock()
  200. if len(p.pending[id]) == 0 {
  201. if p.waiting[id] == nil {
  202. p.waiting[id] = make(chan struct{})
  203. }
  204. waitCh := p.waiting[id]
  205. p.mu.Unlock()
  206. <-waitCh
  207. p.mu.Lock()
  208. }
  209. updates := p.pending[id]
  210. p.pending[id] = p.pending[id][:0:0]
  211. p.mu.Unlock()
  212. // Only allow the latest update per device (this avoids slow bridges causing a backlog of cations).
  213. for key := range devices {
  214. delete(devices, key)
  215. }
  216. for _, update := range updates {
  217. devices[update.ID] = update
  218. }
  219. updates = updates[:0]
  220. for _, value := range devices {
  221. updates = append(updates, value)
  222. }
  223. err := driver.Publish(context.Background(), bridge, updates)
  224. if err != nil {
  225. log.Println("Failed to publish to driver:", err)
  226. p.mu.Lock()
  227. p.pending[id] = append(updates, p.pending[id]...)
  228. p.mu.Unlock()
  229. return
  230. }
  231. }
  232. }
  233. var publisher = Publisher{
  234. sceneData: make(map[int]*models.Scene),
  235. scenes: make([]*Scene, 0, 16),
  236. sceneAssignment: make(map[int]*Scene, 16),
  237. started: make(map[int]bool),
  238. pending: make(map[int][]models.Device),
  239. waiting: make(map[int]chan struct{}),
  240. }
  241. func Global() *Publisher {
  242. return &publisher
  243. }
  244. func Initialize(ctx context.Context) error {
  245. err := publisher.ReloadScenes(ctx)
  246. if err != nil {
  247. return err
  248. }
  249. err = publisher.ReloadDevices(ctx)
  250. if err != nil {
  251. return err
  252. }
  253. go publisher.Run()
  254. time.Sleep(time.Millisecond * 50)
  255. go publisher.PublishChannel(config.PublishChannel)
  256. return nil
  257. }