You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

330 lines
7.7 KiB

  1. package publisher
  2. import (
  3. "context"
  4. "git.aiterp.net/lucifer/new-server/app/config"
  5. "git.aiterp.net/lucifer/new-server/models"
  6. "log"
  7. "sync"
  8. "time"
  9. )
  10. type Publisher struct {
  11. mu sync.Mutex
  12. sceneData map[int]*models.Scene
  13. scenes []*Scene
  14. sceneAssignment map[int]*Scene
  15. started map[int]bool
  16. pending map[int][]models.Device
  17. waiting map[int]chan struct{}
  18. sensorCache map[int]models.SceneSensor
  19. }
  20. func (p *Publisher) SceneState(deviceID int) *models.DeviceState {
  21. p.mu.Lock()
  22. defer p.mu.Unlock()
  23. if s := p.sceneAssignment[deviceID]; s != nil {
  24. return s.LastState(deviceID)
  25. }
  26. return nil
  27. }
  28. func (p *Publisher) UpdateScene(data models.Scene) {
  29. p.mu.Lock()
  30. p.sceneData[data.ID] = &data
  31. for _, scene := range p.scenes {
  32. if scene.data.ID == data.ID {
  33. scene.UpdateScene(data)
  34. }
  35. }
  36. p.mu.Unlock()
  37. }
  38. func (p *Publisher) UpdateSensor(data models.SceneSensor) {
  39. p.mu.Lock()
  40. if assignment := p.sceneAssignment[data.ID]; assignment != nil {
  41. assignment.UpdateSensor(data)
  42. }
  43. p.sensorCache[data.ID] = data
  44. p.mu.Unlock()
  45. }
  46. func (p *Publisher) ReloadScenes(ctx context.Context) error {
  47. scenes, err := config.SceneRepository().FetchAll(ctx)
  48. if err != nil {
  49. return err
  50. }
  51. p.mu.Lock()
  52. for i, scene := range scenes {
  53. p.sceneData[scene.ID] = &scenes[i]
  54. }
  55. for _, scene := range p.scenes {
  56. scene.UpdateScene(*p.sceneData[scene.data.ID])
  57. }
  58. p.mu.Unlock()
  59. return nil
  60. }
  61. func (p *Publisher) ReloadDevices(ctx context.Context) error {
  62. devices, err := config.DeviceRepository().FetchByReference(ctx, models.RKAll, "")
  63. if err != nil {
  64. return err
  65. }
  66. p.Publish(devices...)
  67. return nil
  68. }
  69. func (p *Publisher) Publish(devices ...models.Device) {
  70. if len(devices) == 0 {
  71. return
  72. }
  73. p.mu.Lock()
  74. defer p.mu.Unlock()
  75. for _, device := range devices {
  76. if !p.started[device.BridgeID] {
  77. p.started[device.BridgeID] = true
  78. go p.runBridge(device.BridgeID)
  79. }
  80. p.reassignDevice(device)
  81. if p.sceneAssignment[device.ID] != nil {
  82. p.sceneAssignment[device.ID].UpsertDevice(device)
  83. if cache, ok := p.sensorCache[device.ID]; ok {
  84. p.sceneAssignment[device.ID].UpdateSensor(cache)
  85. }
  86. } else {
  87. p.pending[device.BridgeID] = append(p.pending[device.BridgeID], device)
  88. if p.waiting[device.BridgeID] != nil {
  89. close(p.waiting[device.BridgeID])
  90. p.waiting[device.BridgeID] = nil
  91. }
  92. }
  93. }
  94. }
  95. func (p *Publisher) PublishChannel(ch <-chan []models.Device) {
  96. for list := range ch {
  97. p.Publish(list...)
  98. }
  99. }
  100. func (p *Publisher) Run() {
  101. ticker := time.NewTicker(time.Millisecond * 100)
  102. deleteList := make([]int, 0, 8)
  103. updatedList := make([]models.Device, 0, 16)
  104. for range ticker.C {
  105. deleteList = deleteList[:0]
  106. updatedList = updatedList[:0]
  107. p.mu.Lock()
  108. for i, scene := range p.scenes {
  109. if (!scene.endTime.IsZero() && time.Now().After(scene.endTime)) || scene.Empty() {
  110. deleteList = append(deleteList, i-len(deleteList))
  111. updatedList = append(updatedList, scene.AllDevices()...)
  112. log.Printf("Removing scene instance for %s (%d)", scene.data.Name, scene.data.ID)
  113. for _, device := range scene.AllDevices() {
  114. p.sceneAssignment[device.ID] = nil
  115. }
  116. continue
  117. }
  118. if scene.Due() {
  119. updatedList = append(updatedList, scene.Run()...)
  120. updatedList = append(updatedList, scene.UnaffectedDevices()...)
  121. }
  122. }
  123. for _, i := range deleteList {
  124. p.scenes = append(p.scenes[:i], p.scenes[i+1:]...)
  125. }
  126. for _, device := range updatedList {
  127. if !p.started[device.BridgeID] {
  128. p.started[device.BridgeID] = true
  129. go p.runBridge(device.BridgeID)
  130. }
  131. p.pending[device.BridgeID] = append(p.pending[device.BridgeID], device)
  132. if p.waiting[device.BridgeID] != nil {
  133. close(p.waiting[device.BridgeID])
  134. p.waiting[device.BridgeID] = nil
  135. }
  136. }
  137. p.mu.Unlock()
  138. }
  139. }
  140. // reassignDevice re-evaluates the device's scene assignment config. It will return whether the scene changed, which
  141. // should trigger an update.
  142. func (p *Publisher) reassignDevice(device models.Device) bool {
  143. var selectedAssignment *models.DeviceSceneAssignment
  144. for _, assignment := range device.SceneAssignments {
  145. duration := time.Duration(assignment.DurationMS) * time.Millisecond
  146. if duration <= 0 || time.Now().Before(assignment.StartTime.Add(duration)) {
  147. selectedAssignment = &assignment
  148. }
  149. }
  150. if selectedAssignment == nil {
  151. if p.sceneAssignment[device.ID] != nil {
  152. p.sceneAssignment[device.ID].RemoveDevice(device)
  153. delete(p.sceneAssignment, device.ID)
  154. // Scene changed
  155. return true
  156. }
  157. // Stop here, no scene should be assigned.
  158. return false
  159. } else {
  160. if p.sceneData[selectedAssignment.SceneID] == nil {
  161. // Freeze until scene becomes available.
  162. return false
  163. }
  164. }
  165. if p.sceneAssignment[device.ID] != nil {
  166. scene := p.sceneAssignment[device.ID]
  167. if scene.data.ID == selectedAssignment.SceneID && scene.group == selectedAssignment.Group {
  168. // Current assignment is good.
  169. return false
  170. }
  171. p.sceneAssignment[device.ID].RemoveDevice(device)
  172. delete(p.sceneAssignment, device.ID)
  173. }
  174. for _, scene := range p.scenes {
  175. if scene.data.ID == selectedAssignment.SceneID && scene.group == selectedAssignment.Group {
  176. p.sceneAssignment[device.ID] = scene
  177. return true
  178. }
  179. }
  180. newScene := &Scene{
  181. data: p.sceneData[selectedAssignment.SceneID],
  182. group: selectedAssignment.Group,
  183. startTime: selectedAssignment.StartTime,
  184. endTime: selectedAssignment.StartTime.Add(time.Duration(selectedAssignment.DurationMS) * time.Millisecond),
  185. roleMap: make(map[int]int, 16),
  186. roleList: make(map[int][]models.Device, 16),
  187. lastStates: make(map[int]models.DeviceState, 16),
  188. due: true,
  189. }
  190. p.sceneAssignment[device.ID] = newScene
  191. p.scenes = append(p.scenes, newScene)
  192. if selectedAssignment.DurationMS <= 0 {
  193. newScene.endTime = time.Time{}
  194. }
  195. return true
  196. }
  197. func (p *Publisher) runBridge(id int) {
  198. defer func() {
  199. p.mu.Lock()
  200. p.started[id] = false
  201. p.mu.Unlock()
  202. }()
  203. bridge, err := config.BridgeRepository().Find(context.Background(), id)
  204. if err != nil {
  205. log.Println("Failed to get bridge data:", err)
  206. return
  207. }
  208. driver, err := config.DriverProvider().Provide(bridge.Driver)
  209. if err != nil {
  210. log.Println("Failed to get bridge driver:", err)
  211. log.Println("Maybe Lucifer needs to be updated.")
  212. return
  213. }
  214. devices := make(map[int]models.Device)
  215. for {
  216. p.mu.Lock()
  217. if len(p.pending[id]) == 0 {
  218. if p.waiting[id] == nil {
  219. p.waiting[id] = make(chan struct{})
  220. }
  221. waitCh := p.waiting[id]
  222. p.mu.Unlock()
  223. <-waitCh
  224. p.mu.Lock()
  225. }
  226. updates := p.pending[id]
  227. p.pending[id] = p.pending[id][:0:0]
  228. p.mu.Unlock()
  229. // Only allow the latest update per device (this avoids slow bridges causing a backlog of cations).
  230. for key := range devices {
  231. delete(devices, key)
  232. }
  233. for _, update := range updates {
  234. devices[update.ID] = update
  235. }
  236. updates = updates[:0]
  237. for _, value := range devices {
  238. updates = append(updates, value)
  239. }
  240. err := driver.Publish(context.Background(), bridge, updates)
  241. if err != nil {
  242. log.Println("Failed to publish to driver:", err)
  243. p.mu.Lock()
  244. p.pending[id] = append(updates, p.pending[id]...)
  245. p.mu.Unlock()
  246. return
  247. }
  248. }
  249. }
  250. var publisher = Publisher{
  251. sceneData: make(map[int]*models.Scene),
  252. scenes: make([]*Scene, 0, 16),
  253. sceneAssignment: make(map[int]*Scene, 16),
  254. started: make(map[int]bool),
  255. pending: make(map[int][]models.Device),
  256. waiting: make(map[int]chan struct{}),
  257. sensorCache: make(map[int]models.SceneSensor),
  258. }
  259. func Global() *Publisher {
  260. return &publisher
  261. }
  262. func Initialize(ctx context.Context) error {
  263. err := publisher.ReloadScenes(ctx)
  264. if err != nil {
  265. return err
  266. }
  267. err = publisher.ReloadDevices(ctx)
  268. if err != nil {
  269. return err
  270. }
  271. go publisher.Run()
  272. time.Sleep(time.Millisecond * 50)
  273. go publisher.PublishChannel(config.PublishChannel)
  274. return nil
  275. }