diff --git a/app/api/devices.go b/app/api/devices.go index aa7a78f..0438514 100644 --- a/app/api/devices.go +++ b/app/api/devices.go @@ -3,7 +3,6 @@ package api import ( "context" "git.aiterp.net/lucifer/new-server/app/config" - "git.aiterp.net/lucifer/new-server/app/services/scene" "git.aiterp.net/lucifer/new-server/models" "github.com/gin-gonic/gin" "golang.org/x/sync/errgroup" @@ -56,14 +55,12 @@ func Devices(r gin.IRoutes) { return nil, err } - _ = scene.GlobalManager().UpdateDevice(ctxOf(c), &devices[i], nil) - set[devices[i].ID] = true changed = append(changed, devices[i]) } } - config.PublishChannel <- scene.GlobalManager().FilterUnassigned(changed) + config.PublishChannel <- changed go func() { for _, device := range changed { @@ -126,11 +123,9 @@ func Devices(r gin.IRoutes) { if err != nil { return nil, err } - - _ = scene.GlobalManager().UpdateDevice(ctxOf(c), &devices[i], nil) } - config.PublishChannel <- scene.GlobalManager().FilterUnassigned(devices) + config.PublishChannel <- devices go func() { for _, device := range devices { @@ -217,7 +212,7 @@ func Devices(r gin.IRoutes) { return []models.Device{}, nil } - assignedScene, err := config.SceneRepository().Find(ctxOf(c), body.SceneID) + _, err = config.SceneRepository().Find(ctxOf(c), body.SceneID) if err != nil { return nil, err } @@ -228,17 +223,15 @@ func Devices(r gin.IRoutes) { for i := range devices { devices[i].SceneAssignments = []models.DeviceSceneAssignment{body} - - err := scene.GlobalManager().UpdateDevice(ctxOf(c), &devices[i], assignedScene) - if err != nil { - return nil, err - } } + config.PublishChannel <- devices eg := errgroup.Group{} for i := range devices { + device := devices[i] + eg.Go(func() error { - return config.DeviceRepository().Save(ctxOf(c), &devices[i], 0) + return config.DeviceRepository().Save(ctxOf(c), &device, 0) }) } diff --git a/app/server.go b/app/server.go index 73a7028..ab32941 100644 --- a/app/server.go +++ b/app/server.go @@ -6,19 +6,26 @@ import ( "git.aiterp.net/lucifer/new-server/app/api" "git.aiterp.net/lucifer/new-server/app/config" "git.aiterp.net/lucifer/new-server/app/services" - "git.aiterp.net/lucifer/new-server/app/services/scene" + "git.aiterp.net/lucifer/new-server/app/services/publisher" "github.com/gin-gonic/gin" "log" + "time" ) func StartServer() { + setupCtx, cancel := context.WithTimeout(context.Background(), time.Second * 10) + defer cancel() + + err := publisher.Initialize(setupCtx) + if err != nil { + log.Fatalln("Publish init failed:", err) + return + } + services.StartEventHandler() - services.StartPublisher() services.ConnectToBridges() services.CheckNewDevices() - go scene.GlobalManager().Run(context.Background()) - gin.SetMode(gin.ReleaseMode) ginny := gin.New() diff --git a/app/services/events.go b/app/services/events.go index a1d258c..3079599 100644 --- a/app/services/events.go +++ b/app/services/events.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "git.aiterp.net/lucifer/new-server/app/config" - "git.aiterp.net/lucifer/new-server/app/services/scene" "git.aiterp.net/lucifer/new-server/models" "log" "math" @@ -158,7 +157,7 @@ func handleEvent(event models.Event) (responses []models.Event) { } } - config.PublishChannel <- scene.GlobalManager().FilterUnassigned(allDevices) + config.PublishChannel <- allDevices wg := sync.WaitGroup{} @@ -172,15 +171,6 @@ func handleEvent(event models.Event) (responses []models.Event) { wg.Done() }() - - for _, device := range allDevices { - go func(device models.Device) { - err := scene.GlobalManager().UpdateDevice(context.Background(), &device, nil) - if err != nil { - log.Println("Failed to update devices' scene in resposne to event:", err) - } - }(device) - } } for _, handler := range deadHandlers { diff --git a/app/services/publish.go b/app/services/publish.go deleted file mode 100644 index 11c6ed1..0000000 --- a/app/services/publish.go +++ /dev/null @@ -1,81 +0,0 @@ -package services - -import ( - "context" - "git.aiterp.net/lucifer/new-server/app/config" - "git.aiterp.net/lucifer/new-server/models" - "log" - "sync" - "time" -) - -func StartPublisher() { - ctx := context.Background() - - go func() { - for devices := range config.PublishChannel { - if len(devices) == 0 { - continue - } - - lists := make(map[int][]models.Device, 4) - for _, device := range devices { - lists[device.BridgeID] = append(lists[device.BridgeID], device) - } - - ctx, cancel := context.WithTimeout(ctx, time.Second * 30) - - bridges, err := config.BridgeRepository().FetchAll(ctx) - if err != nil { - log.Println("Publishing error (1): " + err.Error()) - cancel() - continue - } - - wg := sync.WaitGroup{} - for _, devices := range lists { - wg.Add(1) - - go func(devices []models.Device) { - defer wg.Done() - - var bridge models.Bridge - for _, bridge2 := range bridges { - if bridge2.ID == devices[0].BridgeID { - bridge = bridge2 - } - } - if bridge.ID == 0 { - log.Println("Unknown bridge") - return - } - - if bridge.Driver == models.DTLIFX { - return - } - - bridge, err := config.BridgeRepository().Find(ctx, devices[0].BridgeID) - if err != nil { - log.Println("Publishing error (1): " + err.Error()) - return - } - - driver, err := config.DriverProvider().Provide(bridge.Driver) - if err != nil { - log.Println("Publishing error (2): " + err.Error()) - return - } - - err = driver.Publish(ctx, bridge, devices) - if err != nil { - log.Println("Publishing error (3): " + err.Error()) - return - } - }(devices) - } - - wg.Wait() - cancel() - } - }() -} diff --git a/app/services/publisher/publisher.go b/app/services/publisher/publisher.go new file mode 100644 index 0000000..74eedff --- /dev/null +++ b/app/services/publisher/publisher.go @@ -0,0 +1,275 @@ +package publisher + +import ( + "context" + "git.aiterp.net/lucifer/new-server/app/config" + "git.aiterp.net/lucifer/new-server/models" + "log" + "sync" + "time" +) + +type Publisher struct { + mu sync.Mutex + sceneData map[int]*models.Scene + scenes []*Scene + sceneAssignment map[int]*Scene + started map[int]bool + pending map[int][]models.Device + waiting map[int]chan struct{} +} + +func (p *Publisher) ReloadScenes(ctx context.Context) error { + scenes, err := config.SceneRepository().FetchAll(ctx) + if err != nil { + return err + } + + p.mu.Lock() + for _, scene := range scenes { + p.sceneData[scene.ID] = &scene + } + p.mu.Unlock() + + return nil +} + +func (p *Publisher) ReloadDevices(ctx context.Context) error { + devices, err := config.DeviceRepository().FetchByReference(ctx, models.RKAll, "") + if err != nil { + return err + } + + p.Publish(devices...) + + return nil +} + +func (p *Publisher) Publish(devices ...models.Device) { + if len(devices) == 0 { + return + } + + p.mu.Lock() + defer p.mu.Unlock() + + for _, device := range devices { + if !p.started[device.BridgeID] { + p.started[device.BridgeID] = true + go p.runBridge(device.BridgeID) + } + + p.reassignDevice(device) + + if p.sceneAssignment[device.ID] != nil { + p.sceneAssignment[device.ID].UpsertDevice(device) + } else { + p.pending[device.BridgeID] = append(p.pending[device.BridgeID], device) + if p.waiting[device.BridgeID] != nil { + close(p.waiting[device.BridgeID]) + p.waiting[device.BridgeID] = nil + } + } + } +} + +func (p *Publisher) PublishChannel(ch <-chan []models.Device) { + for list := range ch { + p.Publish(list...) + } +} + +func (p *Publisher) Run() { + ticker := time.NewTicker(time.Millisecond * 100) + deleteList := make([]int, 0, 8) + updatedList := make([]models.Device, 0, 16) + + for range ticker.C { + deleteList = deleteList[:0] + updatedList = updatedList[:0] + + p.mu.Lock() + for i, scene := range p.scenes { + if !scene.endTime.IsZero() && time.Now().After(scene.endTime) { + deleteList = append(deleteList, i-len(deleteList)) + updatedList = append(updatedList, scene.AllDevices()...) + + for _, device := range scene.AllDevices() { + p.sceneAssignment[device.ID] = nil + } + continue + } + + if scene.Due() { + updatedList = append(updatedList, scene.Run()...) + updatedList = append(updatedList, scene.UnaffectedDevices()...) + } + } + + for _, i := range deleteList { + p.scenes = append(p.scenes[:i], p.scenes[i+1:]...) + } + + for _, device := range updatedList { + if !p.started[device.BridgeID] { + p.started[device.BridgeID] = true + go p.runBridge(device.BridgeID) + } + + p.pending[device.BridgeID] = append(p.pending[device.BridgeID], device) + if p.waiting[device.BridgeID] != nil { + close(p.waiting[device.BridgeID]) + p.waiting[device.BridgeID] = nil + } + } + p.mu.Unlock() + } +} + +// reassignDevice re-evaluates the device's scene assignment config. It will return whether the scene changed, which +// should trigger an update. +func (p *Publisher) reassignDevice(device models.Device) bool { + var selectedAssignment *models.DeviceSceneAssignment + for _, assignment := range device.SceneAssignments { + duration := time.Duration(assignment.DurationMS) * time.Millisecond + if duration <= 0 || time.Now().Before(assignment.StartTime.Add(duration)) { + selectedAssignment = &assignment + } + } + + if selectedAssignment == nil { + if p.sceneAssignment[device.ID] != nil { + p.sceneAssignment[device.ID].RemoveDevice(device) + delete(p.sceneAssignment, device.ID) + + // Scene changed + return true + } + + // Stop here, no scene should be assigned. + return false + } + + if p.sceneAssignment[device.ID] != nil { + scene := p.sceneAssignment[device.ID] + if scene.data.ID == selectedAssignment.SceneID && scene.group == selectedAssignment.Group { + // Current assignment is good. + return false + } + + p.sceneAssignment[device.ID].RemoveDevice(device) + delete(p.sceneAssignment, device.ID) + } + + for _, scene := range p.scenes { + if scene.data.ID == selectedAssignment.SceneID && scene.group == selectedAssignment.Group { + p.sceneAssignment[device.ID] = scene + return true + } + } + + newScene := &Scene{ + data: p.sceneData[selectedAssignment.SceneID], + group: selectedAssignment.Group, + startTime: selectedAssignment.StartTime, + endTime: selectedAssignment.StartTime.Add(time.Duration(selectedAssignment.DurationMS) * time.Millisecond), + roleMap: make(map[int]int, 16), + roleList: make(map[int][]models.Device, 16), + due: true, + } + p.sceneAssignment[device.ID] = newScene + p.scenes = append(p.scenes, newScene) + + if selectedAssignment.DurationMS <= 0 { + newScene.endTime = time.Time{} + } + + return true +} + +func (p *Publisher) runBridge(id int) { + defer func() { + p.mu.Lock() + p.started[id] = false + p.mu.Unlock() + }() + + bridge, err := config.BridgeRepository().Find(context.Background(), id) + if err != nil { + log.Println("Failed to get bridge data:", err) + return + } + + driver, err := config.DriverProvider().Provide(bridge.Driver) + if err != nil { + log.Println("Failed to get bridge driver:", err) + log.Println("Maybe Lucifer needs to be updated.") + return + } + + devices := make(map[int]models.Device) + + for { + p.mu.Lock() + if len(p.pending[id]) == 0 { + if p.waiting[id] == nil { + p.waiting[id] = make(chan struct{}) + } + waitCh := p.waiting[id] + p.mu.Unlock() + <-waitCh + p.mu.Lock() + } + + updates := p.pending[id] + p.pending[id] = p.pending[id][:0:0] + p.mu.Unlock() + + // Only allow the latest update per device (this avoids slow bridges causing a backlog of cations). + for key := range devices { + delete(devices, key) + } + for _, update := range updates { + devices[update.ID] = update + } + updates = updates[:0] + for _, value := range devices { + updates = append(updates, value) + } + + err := driver.Publish(context.Background(), bridge, updates) + if err != nil { + log.Println("Failed to publish to driver:", err) + + p.mu.Lock() + p.pending[id] = append(updates, p.pending[id]...) + p.mu.Unlock() + return + } + } +} + +var publisher = Publisher{ + sceneData: make(map[int]*models.Scene), + scenes: make([]*Scene, 0, 16), + sceneAssignment: make(map[int]*Scene, 16), + started: make(map[int]bool), + pending: make(map[int][]models.Device), + waiting: make(map[int]chan struct{}), +} + +func Initialize(ctx context.Context) error { + err :=publisher.ReloadScenes(ctx) + if err != nil { + return err + } + err = publisher.ReloadDevices(ctx) + if err != nil { + return err + } + + go publisher.PublishChannel(config.PublishChannel) + go publisher.Run() + + return nil +} diff --git a/app/services/publisher/scene.go b/app/services/publisher/scene.go index 98082ce..2d12ba3 100644 --- a/app/services/publisher/scene.go +++ b/app/services/publisher/scene.go @@ -10,32 +10,34 @@ type Scene struct { group string startTime time.Time endTime time.Time - interval time.Duration roleMap map[int]int roleList map[int][]models.Device - due bool + due bool + lastInterval int64 } // UpdateScene updates the scene data and re-seats all devices. func (s *Scene) UpdateScene(data models.Scene) { devices := make([]models.Device, 0, 16) - for ri, list := range s.roleList { + // Collect all devices into the undefined role (-1) + for _, list := range s.roleList { for _, device := range list { devices = append(devices, device) s.roleMap[device.ID] = -1 } } + s.roleList = map[int][]models.Device{-1: devices} - s.roleList = map[int][]models.Device {-1: devices} - + // Update data and reset devices. + s.data = &data for _, device := range append(devices[:0:0], devices...) { s.UpsertDevice(device) } } -// UpsertDevice moves the device if neccesary and updates its state. +// UpsertDevice moves the device if necessary and updates its state. func (s *Scene) UpsertDevice(device models.Device) { if s.data == nil { s.roleMap[device.ID] = -1 @@ -46,29 +48,23 @@ func (s *Scene) UpsertDevice(device models.Device) { oldIndex, hasOldIndex := s.roleMap[device.ID] newIndex := s.data.RoleIndex(&device) - if hasOldIndex { - if oldIndex == newIndex { - for i, device2 := range s.roleList[newIndex] { - if device2.ID == device.ID { - s.roleList[newIndex][i] = device - s.data.Roles[oldIndex].ApplyOrder(s.roleList[oldIndex]) - break - } - } + s.roleMap[device.ID] = newIndex - return - } else { - for i, device2 := range s.roleList[oldIndex] { - if device2.ID == device.ID { - s.roleList[oldIndex] = append(s.roleList[oldIndex][:i], s.roleList[oldIndex][i+1:]...) - break - } + if hasOldIndex { + for i, device2 := range s.roleList[oldIndex] { + if device2.ID == device.ID { + s.roleList[oldIndex] = append(s.roleList[oldIndex][:i], s.roleList[oldIndex][i+1:]...) + break } } } + s.due = true + s.roleList[newIndex] = append(s.roleList[newIndex], device) - s.data.Roles[newIndex].ApplyOrder(s.roleList[newIndex]) + if newIndex != -1 { + s.data.Roles[newIndex].ApplyOrder(s.roleList[newIndex]) + } } // RemoveDevice finds and remove a device. It's a noop if the device does not exist in this scene. @@ -85,9 +81,37 @@ func (s *Scene) RemoveDevice(device models.Device) { } } + s.due = true + delete(s.roleMap, device.ID) } +func (s *Scene) Due() bool { + if s.due { + return true + } + + if s.data.IntervalMS > 0 { + interval := time.Duration(s.data.IntervalMS) * time.Millisecond + return int64(time.Since(s.startTime)/interval) != s.lastInterval + } + + return false +} + +func (s *Scene) UnaffectedDevices() []models.Device { + return append(s.roleList[-1][:0:0], s.roleList[-1]...) +} + +func (s *Scene) AllDevices() []models.Device { + res := make([]models.Device, 0, 16) + for _, list := range s.roleList { + res = append(res, list...) + } + + return res +} + // Run runs the scene func (s *Scene) Run() []models.Device { if s.data == nil { @@ -96,11 +120,12 @@ func (s *Scene) Run() []models.Device { intervalNumber := int64(0) intervalMax := int64(1) - if s.interval > 0 { - intervalNumber = int64(time.Since(s.startTime) / s.interval) + if s.data.IntervalMS > 0 { + interval := time.Duration(s.data.IntervalMS) * time.Millisecond + intervalNumber = int64(time.Since(s.startTime) / interval) if !s.endTime.IsZero() { - intervalMax = int64(s.endTime.Sub(s.startTime) / s.interval) + intervalMax = int64(s.endTime.Sub(s.startTime) / interval) } else { intervalMax = intervalNumber + 1 } @@ -131,5 +156,8 @@ func (s *Scene) Run() []models.Device { } } + s.due = false + s.lastInterval = intervalNumber + return updatedDevices } diff --git a/internal/mysql/devicerepo.go b/internal/mysql/devicerepo.go index 9932a46..2b17574 100644 --- a/internal/mysql/devicerepo.go +++ b/internal/mysql/devicerepo.go @@ -149,7 +149,8 @@ func (r *DeviceRepo) SaveMany(ctx context.Context, mode models.SaveMode, devices icon = :icon, name = :name, capabilities = :capabilities, - button_names = :button_names + button_names = :button_names, + scene_assignments = :scene_assignments WHERE id=:id `, record) if err != nil { diff --git a/models/device.go b/models/device.go index 0ee9463..bf1abec 100644 --- a/models/device.go +++ b/models/device.go @@ -2,6 +2,7 @@ package models import ( "context" + "math" "strings" "time" ) @@ -189,6 +190,25 @@ func (d *Device) SetState(newState NewDeviceState) error { return nil } +func (s *NewDeviceState) RelativeTo(device Device) NewDeviceState { + n := NewDeviceState{} + + if s.Intensity != nil { + intensity := device.State.Intensity * *s.Intensity + n.Intensity = &intensity + } + if s.Color != nil { + c, err := ParseColorValue(*s.Color) + if err == nil { + c.Hue = math.Mod(device.State.Color.Hue + c.Hue, 360) + c.Saturation *= device.State.Color.Saturation + c.Kelvin += device.State.Color.Kelvin + } + } + + return n +} + func (s *NewDeviceState) Interpolate(other NewDeviceState, fac float64) NewDeviceState { n := NewDeviceState{} diff --git a/models/scene.go b/models/scene.go index a05e3a6..0737836 100644 --- a/models/scene.go +++ b/models/scene.go @@ -69,6 +69,7 @@ type SceneRole struct { TargetKind ReferenceKind `json:"targetKind"` TargetValue string `json:"targetValue"` Interpolate bool `json:"interpolate"` + Relative bool `json:"relative"` Order string `json:"order"` States []NewDeviceState `json:"states"` } @@ -81,7 +82,7 @@ type SceneRunContext struct { } func (d *SceneRunContext) PositionFacShifted() float64 { - shiftFac := float64((int64(d.Index)+d.IntervalNumber)%int64(d.Length)) * (1 / float64(d.Length)) + shiftFac := float64(d.IntervalNumber%int64(d.Length)) / float64(d.Length) return math.Mod(d.PositionFac()+shiftFac, 1) } @@ -179,6 +180,10 @@ func (r *SceneRole) ApplyEffect(device *Device, c SceneRunContext) (newState New newState = r.State(c.IntervalFac()) } + if r.Relative { + newState = newState.RelativeTo(*device) + } + switch r.PowerMode { case SPDevice: newState.Power = nil @@ -196,6 +201,8 @@ func (r *SceneRole) State(fac float64) NewDeviceState { return r.States[0] } + + if r.Interpolate { if len(r.States) == 2 { return r.States[0].Interpolate(r.States[1], fac) @@ -219,8 +226,8 @@ func (r *SceneRole) State(fac float64) NewDeviceState { type ScenePowerMode string const ( - SPDevice ScenePowerMode = "Device" // Device state decides power. Scene state may only power off. - SPScene ScenePowerMode = "Scene" // Scene state decide power. + SPDevice ScenePowerMode = "Device" // Device state decides power. Scene state may only power off. + SPScene ScenePowerMode = "Scene" // Scene state decide power. ) // DeviceSceneAssignment is an entry on the device stack. StartTime and DurationMS are only respected when the scene