package publisher import ( "context" "git.aiterp.net/lucifer/new-server/app/config" "git.aiterp.net/lucifer/new-server/models" "log" "sync" "time" ) type Publisher struct { mu sync.Mutex sceneData map[int]*models.Scene scenes []*Scene sceneAssignment map[int]*Scene started map[int]bool pending map[int][]models.Device waiting map[int]chan struct{} sensorCache map[int]models.SceneSensor } func (p *Publisher) SceneState(deviceID int) *models.DeviceState { p.mu.Lock() defer p.mu.Unlock() if s := p.sceneAssignment[deviceID]; s != nil { return s.LastState(deviceID) } return nil } func (p *Publisher) UpdateScene(data models.Scene) { p.mu.Lock() p.sceneData[data.ID] = &data for _, scene := range p.scenes { if scene.data.ID == data.ID { scene.UpdateScene(data) } } p.mu.Unlock() } func (p *Publisher) UpdateSensor(data models.SceneSensor) { p.mu.Lock() if assignment := p.sceneAssignment[data.ID]; assignment != nil { assignment.UpdateSensor(data) } p.sensorCache[data.ID] = data p.mu.Unlock() } func (p *Publisher) ReloadScenes(ctx context.Context) error { scenes, err := config.SceneRepository().FetchAll(ctx) if err != nil { return err } p.mu.Lock() for i, scene := range scenes { p.sceneData[scene.ID] = &scenes[i] } for _, scene := range p.scenes { scene.UpdateScene(*p.sceneData[scene.data.ID]) } p.mu.Unlock() return nil } func (p *Publisher) ReloadDevices(ctx context.Context) error { devices, err := config.DeviceRepository().FetchByReference(ctx, models.RKAll, "") if err != nil { return err } p.Publish(devices...) return nil } func (p *Publisher) Publish(devices ...models.Device) { if len(devices) == 0 { return } p.mu.Lock() defer p.mu.Unlock() for _, device := range devices { if !p.started[device.BridgeID] { p.started[device.BridgeID] = true go p.runBridge(device.BridgeID) } p.reassignDevice(device) if p.sceneAssignment[device.ID] != nil { p.sceneAssignment[device.ID].UpsertDevice(device) if cache, ok := p.sensorCache[device.ID]; ok { p.sceneAssignment[device.ID].UpdateSensor(cache) } } else { p.pending[device.BridgeID] = append(p.pending[device.BridgeID], device) if p.waiting[device.BridgeID] != nil { close(p.waiting[device.BridgeID]) p.waiting[device.BridgeID] = nil } } } } func (p *Publisher) PublishChannel(ch <-chan []models.Device) { for list := range ch { p.Publish(list...) } } func (p *Publisher) Run() { ticker := time.NewTicker(time.Millisecond * 100) deleteList := make([]int, 0, 8) updatedList := make([]models.Device, 0, 16) for range ticker.C { deleteList = deleteList[:0] updatedList = updatedList[:0] p.mu.Lock() for i, scene := range p.scenes { if (!scene.endTime.IsZero() && time.Now().After(scene.endTime)) || scene.Empty() { deleteList = append(deleteList, i-len(deleteList)) updatedList = append(updatedList, scene.AllDevices()...) log.Printf("Removing scene instance for %s (%d)", scene.data.Name, scene.data.ID) for _, device := range scene.AllDevices() { p.sceneAssignment[device.ID] = nil } continue } if scene.Due() { updatedList = append(updatedList, scene.Run()...) updatedList = append(updatedList, scene.UnaffectedDevices()...) } } for _, i := range deleteList { p.scenes = append(p.scenes[:i], p.scenes[i+1:]...) } for _, device := range updatedList { if !p.started[device.BridgeID] { p.started[device.BridgeID] = true go p.runBridge(device.BridgeID) } p.pending[device.BridgeID] = append(p.pending[device.BridgeID], device) if p.waiting[device.BridgeID] != nil { close(p.waiting[device.BridgeID]) p.waiting[device.BridgeID] = nil } } p.mu.Unlock() } } // reassignDevice re-evaluates the device's scene assignment config. It will return whether the scene changed, which // should trigger an update. func (p *Publisher) reassignDevice(device models.Device) bool { var selectedAssignment *models.DeviceSceneAssignment for _, assignment := range device.SceneAssignments { duration := time.Duration(assignment.DurationMS) * time.Millisecond if duration <= 0 || time.Now().Before(assignment.StartTime.Add(duration)) { selectedAssignment = &assignment } } if selectedAssignment == nil { if p.sceneAssignment[device.ID] != nil { p.sceneAssignment[device.ID].RemoveDevice(device) delete(p.sceneAssignment, device.ID) // Scene changed return true } // Stop here, no scene should be assigned. return false } else { if p.sceneData[selectedAssignment.SceneID] == nil { // Freeze until scene becomes available. return false } } if p.sceneAssignment[device.ID] != nil { scene := p.sceneAssignment[device.ID] if scene.data.ID == selectedAssignment.SceneID && scene.group == selectedAssignment.Group { // Current assignment is good. return false } p.sceneAssignment[device.ID].RemoveDevice(device) delete(p.sceneAssignment, device.ID) } for _, scene := range p.scenes { if scene.data.ID == selectedAssignment.SceneID && scene.group == selectedAssignment.Group { p.sceneAssignment[device.ID] = scene return true } } newScene := &Scene{ data: p.sceneData[selectedAssignment.SceneID], group: selectedAssignment.Group, startTime: selectedAssignment.StartTime, endTime: selectedAssignment.StartTime.Add(time.Duration(selectedAssignment.DurationMS) * time.Millisecond), roleMap: make(map[int]int, 16), roleList: make(map[int][]models.Device, 16), lastStates: make(map[int]models.DeviceState, 16), due: true, } p.sceneAssignment[device.ID] = newScene p.scenes = append(p.scenes, newScene) if selectedAssignment.DurationMS <= 0 { newScene.endTime = time.Time{} } return true } func (p *Publisher) runBridge(id int) { defer func() { p.mu.Lock() p.started[id] = false p.mu.Unlock() }() bridge, err := config.BridgeRepository().Find(context.Background(), id) if err != nil { log.Println("Failed to get bridge data:", err) return } driver, err := config.DriverProvider().Provide(bridge.Driver) if err != nil { log.Println("Failed to get bridge driver:", err) log.Println("Maybe Lucifer needs to be updated.") return } devices := make(map[int]models.Device) for { p.mu.Lock() if len(p.pending[id]) == 0 { if p.waiting[id] == nil { p.waiting[id] = make(chan struct{}) } waitCh := p.waiting[id] p.mu.Unlock() <-waitCh p.mu.Lock() } updates := p.pending[id] p.pending[id] = p.pending[id][:0:0] p.mu.Unlock() // Only allow the latest update per device (this avoids slow bridges causing a backlog of cations). for key := range devices { delete(devices, key) } for _, update := range updates { devices[update.ID] = update } updates = updates[:0] for _, value := range devices { updates = append(updates, value) } err := driver.Publish(context.Background(), bridge, updates) if err != nil { log.Println("Failed to publish to driver:", err) p.mu.Lock() p.pending[id] = append(updates, p.pending[id]...) p.mu.Unlock() return } } } var publisher = Publisher{ sceneData: make(map[int]*models.Scene), scenes: make([]*Scene, 0, 16), sceneAssignment: make(map[int]*Scene, 16), started: make(map[int]bool), pending: make(map[int][]models.Device), waiting: make(map[int]chan struct{}), sensorCache: make(map[int]models.SceneSensor), } func Global() *Publisher { return &publisher } func Initialize(ctx context.Context) error { err := publisher.ReloadScenes(ctx) if err != nil { return err } err = publisher.ReloadDevices(ctx) if err != nil { return err } go publisher.Run() time.Sleep(time.Millisecond * 50) go publisher.PublishChannel(config.PublishChannel) return nil }