You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

345 lines
8.0 KiB

  1. package publisher
  2. import (
  3. "context"
  4. "git.aiterp.net/lucifer/new-server/app/config"
  5. "git.aiterp.net/lucifer/new-server/models"
  6. "log"
  7. "sync"
  8. "time"
  9. )
  10. type Publisher struct {
  11. mu sync.Mutex
  12. sceneData map[int]*models.Scene
  13. scenes []*Scene
  14. sceneAssignment map[int]*Scene
  15. started map[int]bool
  16. pending map[int][]models.Device
  17. waiting map[int]chan struct{}
  18. sensorCache map[int]models.SceneSensor
  19. }
  20. func (p *Publisher) SceneState(deviceID int) *models.DeviceState {
  21. p.mu.Lock()
  22. defer p.mu.Unlock()
  23. if s := p.sceneAssignment[deviceID]; s != nil {
  24. return s.LastState(deviceID)
  25. }
  26. return nil
  27. }
  28. func (p *Publisher) UpdateScene(data models.Scene) {
  29. p.mu.Lock()
  30. p.sceneData[data.ID] = &data
  31. for _, scene := range p.scenes {
  32. if scene.data.ID == data.ID {
  33. scene.UpdateScene(data)
  34. }
  35. }
  36. p.mu.Unlock()
  37. }
  38. func (p *Publisher) UpdateSensor(data models.SceneSensor) {
  39. p.mu.Lock()
  40. if assignment := p.sceneAssignment[data.ID]; assignment != nil {
  41. assignment.UpdateSensor(data)
  42. }
  43. p.sensorCache[data.ID] = data
  44. p.mu.Unlock()
  45. }
  46. func (p *Publisher) ReloadScenes(ctx context.Context) error {
  47. scenes, err := config.SceneRepository().FetchAll(ctx)
  48. if err != nil {
  49. return err
  50. }
  51. p.mu.Lock()
  52. for i, scene := range scenes {
  53. p.sceneData[scene.ID] = &scenes[i]
  54. }
  55. for _, scene := range p.scenes {
  56. scene.UpdateScene(*p.sceneData[scene.data.ID])
  57. }
  58. p.mu.Unlock()
  59. return nil
  60. }
  61. func (p *Publisher) ReloadDevices(ctx context.Context) error {
  62. devices, err := config.DeviceRepository().FetchByReference(ctx, models.RKAll, "")
  63. if err != nil {
  64. return err
  65. }
  66. p.Publish(devices...)
  67. return nil
  68. }
  69. func (p *Publisher) Publish(devices ...models.Device) {
  70. if len(devices) == 0 {
  71. return
  72. }
  73. unwaitMap := make(map[int]bool)
  74. p.mu.Lock()
  75. defer p.mu.Unlock()
  76. for _, device := range devices {
  77. if !p.started[device.BridgeID] {
  78. p.started[device.BridgeID] = true
  79. go p.runBridge(device.BridgeID)
  80. }
  81. p.reassignDevice(device)
  82. if p.sceneAssignment[device.ID] != nil {
  83. p.sceneAssignment[device.ID].UpsertDevice(device)
  84. if cache, ok := p.sensorCache[device.ID]; ok {
  85. p.sceneAssignment[device.ID].UpdateSensor(cache)
  86. }
  87. } else {
  88. p.pending[device.BridgeID] = append(p.pending[device.BridgeID], device)
  89. unwaitMap[device.BridgeID] = true
  90. }
  91. }
  92. for bridgeID := range unwaitMap {
  93. if p.waiting[bridgeID] != nil {
  94. close(p.waiting[bridgeID])
  95. p.waiting[bridgeID] = nil
  96. }
  97. }
  98. }
  99. func (p *Publisher) PublishChannel(ch <-chan []models.Device) {
  100. for list := range ch {
  101. p.Publish(list...)
  102. }
  103. }
  104. func (p *Publisher) Run() {
  105. ticker := time.NewTicker(time.Millisecond * 100)
  106. deleteList := make([]int, 0, 8)
  107. updatedList := make([]models.Device, 0, 16)
  108. unwaitMap := make(map[int]bool)
  109. for range ticker.C {
  110. deleteList = deleteList[:0]
  111. updatedList = updatedList[:0]
  112. for key := range unwaitMap {
  113. delete(unwaitMap, key)
  114. }
  115. p.mu.Lock()
  116. for i, scene := range p.scenes {
  117. if (!scene.endTime.IsZero() && time.Now().After(scene.endTime)) || scene.Empty() {
  118. deleteList = append(deleteList, i-len(deleteList))
  119. updatedList = append(updatedList, scene.AllDevices()...)
  120. log.Printf("Removing scene instance for %s (%d)", scene.data.Name, scene.data.ID)
  121. for _, device := range scene.AllDevices() {
  122. p.sceneAssignment[device.ID] = nil
  123. }
  124. continue
  125. }
  126. if scene.Due() {
  127. updatedList = append(updatedList, scene.Run()...)
  128. updatedList = append(updatedList, scene.UnaffectedDevices()...)
  129. }
  130. }
  131. for _, i := range deleteList {
  132. p.scenes = append(p.scenes[:i], p.scenes[i+1:]...)
  133. }
  134. for _, device := range updatedList {
  135. if !p.started[device.BridgeID] {
  136. p.started[device.BridgeID] = true
  137. go p.runBridge(device.BridgeID)
  138. }
  139. p.pending[device.BridgeID] = append(p.pending[device.BridgeID], device)
  140. unwaitMap[device.BridgeID] = true
  141. }
  142. for bridgeID := range unwaitMap {
  143. if p.waiting[bridgeID] != nil {
  144. close(p.waiting[bridgeID])
  145. p.waiting[bridgeID] = nil
  146. }
  147. }
  148. p.mu.Unlock()
  149. }
  150. }
  151. // reassignDevice re-evaluates the device's scene assignment config. It will return whether the scene changed, which
  152. // should trigger an update.
  153. func (p *Publisher) reassignDevice(device models.Device) bool {
  154. var selectedAssignment *models.DeviceSceneAssignment
  155. for _, assignment := range device.SceneAssignments {
  156. duration := time.Duration(assignment.DurationMS) * time.Millisecond
  157. if duration <= 0 || time.Now().Before(assignment.StartTime.Add(duration)) {
  158. selectedAssignment = &assignment
  159. }
  160. }
  161. if selectedAssignment == nil {
  162. if p.sceneAssignment[device.ID] != nil {
  163. p.sceneAssignment[device.ID].RemoveDevice(device)
  164. delete(p.sceneAssignment, device.ID)
  165. // Scene changed
  166. return true
  167. }
  168. // Stop here, no scene should be assigned.
  169. return false
  170. } else {
  171. if p.sceneData[selectedAssignment.SceneID] == nil {
  172. // Freeze until scene becomes available.
  173. return false
  174. }
  175. }
  176. if p.sceneAssignment[device.ID] != nil {
  177. scene := p.sceneAssignment[device.ID]
  178. if scene.data.ID == selectedAssignment.SceneID && scene.group == selectedAssignment.Group {
  179. // Current assignment is good.
  180. return false
  181. }
  182. p.sceneAssignment[device.ID].RemoveDevice(device)
  183. delete(p.sceneAssignment, device.ID)
  184. }
  185. for _, scene := range p.scenes {
  186. if scene.data.ID == selectedAssignment.SceneID && scene.group == selectedAssignment.Group {
  187. p.sceneAssignment[device.ID] = scene
  188. return true
  189. }
  190. }
  191. newScene := &Scene{
  192. data: p.sceneData[selectedAssignment.SceneID],
  193. group: selectedAssignment.Group,
  194. startTime: selectedAssignment.StartTime,
  195. endTime: selectedAssignment.StartTime.Add(time.Duration(selectedAssignment.DurationMS) * time.Millisecond),
  196. roleMap: make(map[int]int, 16),
  197. roleList: make(map[int][]models.Device, 16),
  198. lastStates: make(map[int]models.DeviceState, 16),
  199. due: true,
  200. }
  201. p.sceneAssignment[device.ID] = newScene
  202. p.scenes = append(p.scenes, newScene)
  203. if selectedAssignment.DurationMS <= 0 {
  204. newScene.endTime = time.Time{}
  205. }
  206. return true
  207. }
  208. func (p *Publisher) runBridge(id int) {
  209. defer func() {
  210. p.mu.Lock()
  211. p.started[id] = false
  212. p.mu.Unlock()
  213. }()
  214. bridge, err := config.BridgeRepository().Find(context.Background(), id)
  215. if err != nil {
  216. log.Println("Failed to get bridge data:", err)
  217. return
  218. }
  219. driver, err := config.DriverProvider().Provide(bridge.Driver)
  220. if err != nil {
  221. log.Println("Failed to get bridge driver:", err)
  222. log.Println("Maybe Lucifer needs to be updated.")
  223. return
  224. }
  225. devices := make(map[int]models.Device)
  226. for {
  227. p.mu.Lock()
  228. if len(p.pending[id]) == 0 {
  229. if p.waiting[id] == nil {
  230. p.waiting[id] = make(chan struct{})
  231. }
  232. waitCh := p.waiting[id]
  233. p.mu.Unlock()
  234. <-waitCh
  235. p.mu.Lock()
  236. }
  237. updates := append(p.pending[id][:0:0], p.pending[id]...)
  238. p.pending[id] = p.pending[id][:0]
  239. p.mu.Unlock()
  240. // Only allow the latest update per device (this avoids slow bridges causing a backlog of cations).
  241. for key := range devices {
  242. delete(devices, key)
  243. }
  244. for _, update := range updates {
  245. devices[update.ID] = update
  246. }
  247. updates = updates[:0]
  248. for _, value := range devices {
  249. updates = append(updates, value)
  250. }
  251. err := driver.Publish(context.Background(), bridge, updates)
  252. if err != nil {
  253. log.Println("Failed to publish to driver:", err)
  254. p.mu.Lock()
  255. p.pending[id] = append(updates, p.pending[id]...)
  256. p.mu.Unlock()
  257. return
  258. }
  259. }
  260. }
  261. var publisher = Publisher{
  262. sceneData: make(map[int]*models.Scene),
  263. scenes: make([]*Scene, 0, 16),
  264. sceneAssignment: make(map[int]*Scene, 16),
  265. started: make(map[int]bool),
  266. pending: make(map[int][]models.Device),
  267. waiting: make(map[int]chan struct{}),
  268. sensorCache: make(map[int]models.SceneSensor),
  269. }
  270. func Global() *Publisher {
  271. return &publisher
  272. }
  273. func Initialize(ctx context.Context) error {
  274. err := publisher.ReloadScenes(ctx)
  275. if err != nil {
  276. return err
  277. }
  278. err = publisher.ReloadDevices(ctx)
  279. if err != nil {
  280. return err
  281. }
  282. go publisher.Run()
  283. time.Sleep(time.Millisecond * 50)
  284. go publisher.PublishChannel(config.PublishChannel)
  285. return nil
  286. }