|
|
package publisher
import ( "context" "git.aiterp.net/lucifer/new-server/app/config" "git.aiterp.net/lucifer/new-server/models" "log" "sync" "time" )
type Publisher struct { mu sync.Mutex sceneData map[int]*models.Scene scenes []*Scene sceneAssignment map[int]*Scene started map[int]bool pending map[int][]models.Device waiting map[int]chan struct{} }
func (p *Publisher) SceneState(deviceID int) *models.DeviceState { p.mu.Lock() defer p.mu.Unlock()
if s := p.sceneAssignment[deviceID]; s != nil { return s.LastState(deviceID) }
return nil }
func (p *Publisher) UpdateScene(data models.Scene) { p.mu.Lock() p.sceneData[data.ID] = &data
for _, scene := range p.scenes { if scene.data.ID == data.ID { scene.UpdateScene(data) } } p.mu.Unlock() }
func (p *Publisher) ReloadScenes(ctx context.Context) error { scenes, err := config.SceneRepository().FetchAll(ctx) if err != nil { return err }
p.mu.Lock() for i, scene := range scenes { p.sceneData[scene.ID] = &scenes[i] }
for _, scene := range p.scenes { scene.UpdateScene(*p.sceneData[scene.data.ID]) } p.mu.Unlock()
return nil }
func (p *Publisher) ReloadDevices(ctx context.Context) error { devices, err := config.DeviceRepository().FetchByReference(ctx, models.RKAll, "") if err != nil { return err }
p.Publish(devices...)
return nil }
func (p *Publisher) Publish(devices ...models.Device) { if len(devices) == 0 { return }
p.mu.Lock() defer p.mu.Unlock()
for _, device := range devices { if !p.started[device.BridgeID] { p.started[device.BridgeID] = true go p.runBridge(device.BridgeID) }
p.reassignDevice(device)
if p.sceneAssignment[device.ID] != nil { p.sceneAssignment[device.ID].UpsertDevice(device) } else { p.pending[device.BridgeID] = append(p.pending[device.BridgeID], device) if p.waiting[device.BridgeID] != nil { close(p.waiting[device.BridgeID]) p.waiting[device.BridgeID] = nil } } } }
func (p *Publisher) PublishChannel(ch <-chan []models.Device) { for list := range ch { p.Publish(list...) } }
func (p *Publisher) Run() { ticker := time.NewTicker(time.Millisecond * 100) deleteList := make([]int, 0, 8) updatedList := make([]models.Device, 0, 16) reassignList := make([]models.Device, 0, 16)
for range ticker.C { deleteList = deleteList[:0] updatedList = updatedList[:0] reassignList = reassignList[:0]
p.mu.Lock() for i, scene := range p.scenes { if (!scene.endTime.IsZero() && time.Now().After(scene.endTime)) || scene.Empty() { deleteList = append(deleteList, i-len(deleteList)) updatedList = append(updatedList, scene.AllDevices()...) reassignList = append(reassignList, scene.AllDevices()...)
log.Printf("Removing scene instance for %s (%d)", scene.data.Name, scene.data.ID)
for _, device := range scene.AllDevices() { p.sceneAssignment[device.ID] = nil } continue }
if scene.Due() { updatedList = append(updatedList, scene.Run()...) updatedList = append(updatedList, scene.UnaffectedDevices()...) } }
for _, i := range deleteList { p.scenes = append(p.scenes[:i], p.scenes[i+1:]...) }
for _, device := range reassignList { p.reassignDevice(device)
if p.sceneAssignment[device.ID] != nil { p.sceneAssignment[device.ID].UpsertDevice(device) } }
for _, device := range updatedList { if !p.started[device.BridgeID] { p.started[device.BridgeID] = true go p.runBridge(device.BridgeID) }
p.pending[device.BridgeID] = append(p.pending[device.BridgeID], device) if p.waiting[device.BridgeID] != nil { close(p.waiting[device.BridgeID]) p.waiting[device.BridgeID] = nil } } p.mu.Unlock() } }
// reassignDevice re-evaluates the device's scene assignment config. It will return whether the scene changed, which
// should trigger an update.
func (p *Publisher) reassignDevice(device models.Device) bool { var selectedAssignment *models.DeviceSceneAssignment for i, assignment := range device.SceneAssignments { duration := time.Duration(assignment.DurationMS) * time.Millisecond if duration <= 0 || time.Now().Before(assignment.StartTime.Add(duration)) { selectedAssignment = &device.SceneAssignments[i] } }
if selectedAssignment == nil { if p.sceneAssignment[device.ID] != nil { p.sceneAssignment[device.ID].RemoveDevice(device) delete(p.sceneAssignment, device.ID)
// Scene changed
return true }
// Stop here, no scene should be assigned.
return false } else { if p.sceneData[selectedAssignment.SceneID] == nil { // Freeze until scene becomes available.
return false } }
if p.sceneAssignment[device.ID] != nil { scene := p.sceneAssignment[device.ID] if scene.data.ID == selectedAssignment.SceneID && scene.group == selectedAssignment.Group { // Current assignment is good.
return false }
p.sceneAssignment[device.ID].RemoveDevice(device) delete(p.sceneAssignment, device.ID) }
for _, scene := range p.scenes { if scene.data.ID == selectedAssignment.SceneID && scene.group == selectedAssignment.Group { p.sceneAssignment[device.ID] = scene return true } }
newScene := &Scene{ data: p.sceneData[selectedAssignment.SceneID], group: selectedAssignment.Group, startTime: selectedAssignment.StartTime, endTime: selectedAssignment.StartTime.Add(time.Duration(selectedAssignment.DurationMS) * time.Millisecond), roleMap: make(map[int]int, 16), roleList: make(map[int][]models.Device, 16), lastStates: make(map[int]models.DeviceState, 16), due: true, } p.sceneAssignment[device.ID] = newScene p.scenes = append(p.scenes, newScene)
if selectedAssignment.DurationMS <= 0 { newScene.endTime = time.Time{} }
return true }
func (p *Publisher) runBridge(id int) { defer func() { p.mu.Lock() p.started[id] = false p.mu.Unlock() }()
bridge, err := config.BridgeRepository().Find(context.Background(), id) if err != nil { log.Println("Failed to get bridge data:", err) return }
driver, err := config.DriverProvider().Provide(bridge.Driver) if err != nil { log.Println("Failed to get bridge driver:", err) log.Println("Maybe Lucifer needs to be updated.") return }
devices := make(map[int]models.Device)
for { p.mu.Lock() if len(p.pending[id]) == 0 { if p.waiting[id] == nil { p.waiting[id] = make(chan struct{}) } waitCh := p.waiting[id] p.mu.Unlock() <-waitCh p.mu.Lock() }
updates := p.pending[id] p.pending[id] = p.pending[id][:0:0] p.mu.Unlock()
// Only allow the latest update per device (this avoids slow bridges causing a backlog of cations).
for key := range devices { delete(devices, key) } for _, update := range updates { devices[update.ID] = update } updates = updates[:0] for _, value := range devices { updates = append(updates, value) }
err := driver.Publish(context.Background(), bridge, updates) if err != nil { log.Println("Failed to publish to driver:", err)
p.mu.Lock() p.pending[id] = append(updates, p.pending[id]...) p.mu.Unlock() return } } }
var publisher = Publisher{ sceneData: make(map[int]*models.Scene), scenes: make([]*Scene, 0, 16), sceneAssignment: make(map[int]*Scene, 16), started: make(map[int]bool), pending: make(map[int][]models.Device), waiting: make(map[int]chan struct{}), }
func Global() *Publisher { return &publisher }
func Initialize(ctx context.Context) error { err := publisher.ReloadScenes(ctx) if err != nil { return err } err = publisher.ReloadDevices(ctx) if err != nil { return err }
go publisher.Run() time.Sleep(time.Millisecond * 50) go publisher.PublishChannel(config.PublishChannel)
return nil }
|