From ffa757f742b981f55ac5f98098c488b01d6e147c Mon Sep 17 00:00:00 2001 From: Gisle Aune Date: Sun, 26 Sep 2021 22:15:49 +0200 Subject: [PATCH] rewrite of publishing mechanism to make scenes first-class. --- app/services/publisher/scene.go | 116 ++++++++++++++++ app/services/scene/manager.go | 238 -------------------------------- app/services/scene/scene.go | 175 ----------------------- models/scene.go | 13 +- 4 files changed, 120 insertions(+), 422 deletions(-) create mode 100644 app/services/publisher/scene.go delete mode 100644 app/services/scene/manager.go delete mode 100644 app/services/scene/scene.go diff --git a/app/services/publisher/scene.go b/app/services/publisher/scene.go new file mode 100644 index 0000000..0b7a8ea --- /dev/null +++ b/app/services/publisher/scene.go @@ -0,0 +1,116 @@ +package publisher + +import ( + "git.aiterp.net/lucifer/new-server/models" + "time" +) + +type Scene struct { + data *models.Scene + group string + startTime time.Time + endTime time.Time + interval time.Duration + roleMap map[int]int + roleList map[int][]models.Device + + due bool +} + +func (s *Scene) UpdateScene(data models.Scene) { + +} + +func (s *Scene) UpsertDevice(device models.Device) { + if s.data == nil { + s.roleMap[device.ID] = -1 + s.roleList[-1] = append(s.roleList[-1], device) + return + } + + oldIndex, hasOldIndex := s.roleMap[device.ID] + newIndex := s.data.RoleIndex(&device) + + if hasOldIndex { + if oldIndex == newIndex { + for i, device2 := range s.roleList[newIndex] { + if device2.ID == device.ID { + s.roleList[newIndex][i] = device + s.data.Roles[oldIndex].ApplyOrder(s.roleList[oldIndex]) + break + } + } + + return + } else { + for i, device2 := range s.roleList[oldIndex] { + if device2.ID == device.ID { + s.roleList[oldIndex] = append(s.roleList[oldIndex][:i], s.roleList[oldIndex][i+1:]...) + break + } + } + } + } + + s.roleList[newIndex] = append(s.roleList[newIndex], device) + s.data.Roles[newIndex].ApplyOrder(s.roleList[newIndex]) +} + +func (s *Scene) RemoveDevice(device models.Device) { + roleIndex, hasRoleIndex := s.roleMap[device.ID] + if !hasRoleIndex { + return + } + + for i, device2 := range s.roleList[roleIndex] { + if device2.ID == device.ID { + s.roleList[roleIndex] = append(s.roleList[roleIndex][:i], s.roleList[roleIndex][i+1:]...) + break + } + } +} + +func (s *Scene) Run() []models.Device { + if s.data == nil { + return []models.Device{} + } + + intervalNumber := int64(0) + intervalMax := int64(1) + if s.interval > 0 { + intervalNumber = int64(time.Since(s.startTime) / s.interval) + + if !s.endTime.IsZero() { + intervalMax = int64(s.endTime.Sub(s.startTime) / s.interval) + } else { + intervalMax = intervalNumber + 1 + } + } + + updatedDevices := make([]models.Device, 0, 16) + for i, list := range s.roleList { + if i == -1 { + continue + } + + role := s.data.Roles[i] + + for j, device := range list { + newState := role.ApplyEffect(&device, models.SceneRunContext{ + Index: j, + Length: len(list), + IntervalNumber: intervalNumber, + IntervalMax: intervalMax, + }) + + err := device.SetState(newState) + if err != nil { + continue + } + + updatedDevices = append(updatedDevices, device) + } + } + + return updatedDevices +} diff --git a/app/services/scene/manager.go b/app/services/scene/manager.go deleted file mode 100644 index e6eeb1f..0000000 --- a/app/services/scene/manager.go +++ /dev/null @@ -1,238 +0,0 @@ -package scene - -import ( - "context" - "git.aiterp.net/lucifer/new-server/app/config" - "git.aiterp.net/lucifer/new-server/models" - "log" - "sync" - "time" -) - -type Manager struct { - mu sync.Mutex - allocations map[int]*Scene - devices map[int]*models.Device - scenes []*Scene - - notifyCh chan struct{} -} - -func (mgr *Manager) UpdateDevice(ctx context.Context, device *models.Device, data *models.Scene) error { - mgr.mu.Lock() - defer mgr.mu.Unlock() - - scene, err := mgr.findScene(ctx, device.SceneAssignments, data) - if err != nil { - return err - } - - if mgr.allocations[device.ID] != nil && mgr.allocations[device.ID] != scene { - mgr.allocations[device.ID].RemoveDevice(*device) - } - - if mgr.allocations[device.ID] != scene { - if mgr.allocations == nil { - mgr.allocations = make(map[int]*Scene, 8) - } - - mgr.allocations[device.ID] = scene - - if scene != nil { - scene.due = true - } - } - - if scene != nil { - scene.UpsertDevice(*device) - - if mgr.devices == nil { - mgr.devices = make(map[int]*models.Device) - } - mgr.devices[device.ID] = device - } else { - config.PublishChannel <- []models.Device{*device} - } - - if mgr.notifyCh != nil { - close(mgr.notifyCh) - mgr.notifyCh = nil - } - - return nil -} - -func (mgr *Manager) UpdateScene(data *models.Scene) { - mgr.mu.Lock() - for _, scene := range mgr.scenes { - if scene.data.ID == data.ID { - scene.UpdateScene(scene.data) - } - } - mgr.mu.Unlock() -} - -func (mgr *Manager) FilterUnassigned(devices []models.Device) []models.Device { - mgr.mu.Lock() - defer mgr.mu.Unlock() - - res := make([]models.Device, 0, len(devices)) - for _, device := range devices { - if mgr.allocations[device.ID] == nil { - res = append(res, device) - } - } - - return res -} - -func (mgr *Manager) Run(ctx context.Context) { - devices, err := config.DeviceRepository().FetchByReference(ctx, models.RKAll, "") - if err == nil { - for _, device := range devices { - _ = mgr.UpdateDevice(ctx, &device, nil) - } - } - - ticker := time.NewTicker(time.Millisecond * 101) - defer ticker.Stop() - - mgr.mu.Lock() - mgr.notifyCh = make(chan struct{}) - mgr.mu.Unlock() - - for { - mgr.mu.Lock() - if mgr.notifyCh == nil { - mgr.notifyCh = make(chan struct{}) - } - notifyCh := mgr.notifyCh - mgr.mu.Unlock() - - select { - case <-ticker.C: - case <-notifyCh: - case <-ctx.Done(): - return - } - - timeout, cancel := context.WithTimeout(ctx, time.Millisecond*100) - - var updates []models.Device - mgr.mu.Lock() - for _, device := range mgr.devices { - if device == nil { - continue - } - - if mgr.reScene(timeout, device) { - updates = append(updates, *device) - } - } - - for _, scene := range mgr.scenes { - if scene.Due() { - scene.Run() - } - - updates = append(updates, scene.Flush()...) - } - mgr.mu.Unlock() - - cancel() - - if len(updates) > 0 { - log.Println("Updating", len(updates), "devices") - config.PublishChannel <- updates - } - } -} - -func (mgr *Manager) reScene(ctx context.Context, device *models.Device) bool { - scene, err := mgr.findScene(ctx, device.SceneAssignments, nil) - if err != nil { - return false - } - - if mgr.allocations[device.ID] != nil && mgr.allocations[device.ID] != scene { - mgr.allocations[device.ID].RemoveDevice(*device) - } - if mgr.allocations[device.ID] != scene { - if mgr.allocations == nil { - mgr.allocations = make(map[int]*Scene, 8) - } - - mgr.allocations[device.ID] = scene - - if scene == nil { - return true - } - - mgr.allocations[device.ID].UpsertDevice(*device) - } - - return false -} - -func (mgr *Manager) findScene(ctx context.Context, assignments []models.DeviceSceneAssignment, data *models.Scene) (*Scene, error) { - if len(assignments) == 0 { - return nil, nil - } - - var selected *models.DeviceSceneAssignment - for _, assignment := range assignments { - pos := int64(time.Since(assignment.StartTime) / time.Millisecond) - if assignment.DurationMS == 0 || (pos >= -1 && pos < assignment.DurationMS) { - selected = &assignment - } - } - if selected == nil { - return nil, nil - } - - for _, scene := range mgr.scenes { - if scene.group == selected.Group && scene.startTime.Equal(selected.StartTime) && selected.DurationMS == scene.duration.Milliseconds() { - return scene, nil - } - } - - if data == nil { - for i := range mgr.scenes { - if mgr.scenes[i].data.ID == selected.SceneID { - data = &mgr.scenes[i].data - break - } - } - } - - if data == nil { - var err error - data, err = config.SceneRepository().Find(ctx, selected.SceneID) - if err != nil { - return nil, err - } - } - - scene := &Scene{ - startTime: selected.StartTime, - duration: time.Duration(selected.DurationMS) * time.Millisecond, - lastRun: 0, - due: true, - group: selected.Group, - data: *data, - roleList: make([][]models.Device, len(data.Roles)), - deviceMap: make(map[int]*models.Device, 8), - roleMap: make(map[int]int, 8), - changes: make(map[int]models.NewDeviceState, 8), - } - - mgr.scenes = append(mgr.scenes, scene) - - return scene, nil -} - -var globalManager = &Manager{} - -func GlobalManager() *Manager { - return globalManager -} diff --git a/app/services/scene/scene.go b/app/services/scene/scene.go deleted file mode 100644 index 87d3fb0..0000000 --- a/app/services/scene/scene.go +++ /dev/null @@ -1,175 +0,0 @@ -package scene - -import ( - "context" - "git.aiterp.net/lucifer/new-server/app/config" - "git.aiterp.net/lucifer/new-server/models" - "math" - "time" -) - -type Scene struct { - startTime time.Time - duration time.Duration - lastRun int64 - due bool - group string - data models.Scene - roleList [][]models.Device - deviceMap map[int]*models.Device - roleMap map[int]int - changes map[int]models.NewDeviceState -} - -func (s *Scene) UpdateScene(scene models.Scene) { - s.data = scene - s.roleMap = make(map[int]int) - - for _, device := range s.deviceMap { - s.moveDevice(*device, false) - } - - for i, role := range s.data.Roles { - role.ApplyOrder(s.roleList[i]) - } - - s.Run() -} - -func (s *Scene) UpsertDevice(device models.Device) { - s.deviceMap[device.ID] = &device - s.moveDevice(device, true) -} - -func (s *Scene) RemoveDevice(device models.Device) { - roleIndex, hasRole := s.roleMap[device.ID] - - delete(s.deviceMap, device.ID) - delete(s.roleMap, device.ID) - - if hasRole { - s.runRole(roleIndex) - } -} - -func (s *Scene) Due() bool { - return s.due || s.intervalNumber() != s.lastRun -} - -func (s *Scene) Flush() []models.Device { - res := make([]models.Device, 0, 8) - for key, change := range s.changes { - if s.deviceMap[key] != nil { - device := *s.deviceMap[key] - err := device.SetState(change) - if err != nil { - continue - } - - res = append(res, device) - } - - delete(s.changes, key) - } - - return res -} - -func (s *Scene) Run() { - s.lastRun = s.intervalNumber() - s.due = false - - for index := range s.roleList { - s.runRole(index) - } -} - -func (s *Scene) runRole(index int) { - role := s.data.Roles[index] - devices := s.roleList[index] - - intervalNumber := s.intervalNumber() - intervalMax := s.intervalMax() - - for i, device := range devices { - newState, changed := role.ApplyEffect(&device, models.SceneRunContext{ - Index: i, - Length: len(devices), - IntervalNumber: intervalNumber, - IntervalMax: intervalMax, - }) - - if !changed { - go func() { - _ = config.DeviceRepository().Save(context.Background(), &device, models.SMState) - }() - } - - s.changes[device.ID] = newState - } -} - -func (s *Scene) moveDevice(device models.Device, run bool) { - oldRole, hasOldRole := s.roleMap[device.ID] - newRole := s.data.RoleIndex(&device) - - // If it doesn't move between roles, just update and reorder it. - if hasOldRole && oldRole == newRole { - for i, device2 := range s.roleList[newRole] { - if device2.ID == device.ID { - s.roleList[newRole][i] = device - } - } - - if run { - s.data.Roles[newRole].ApplyOrder(s.roleList[newRole]) - s.runRole(newRole) - } - - return - } - - // Take it out of the old role. - if hasOldRole { - for i, device2 := range s.roleList[oldRole] { - if device2.ID == device.ID { - s.roleList[oldRole] = append(s.roleList[oldRole][:i], s.roleList[oldRole][i+1:]...) - break - } - } - - if run { - s.runRole(oldRole) - } - } - - if newRole != -1 { - s.roleMap[device.ID] = newRole - s.roleList[newRole] = append(s.roleList[newRole], device) - - if run { - s.data.Roles[newRole].ApplyOrder(s.roleList[newRole]) - s.runRole(newRole) - } - } - - s.due = true -} - -func (s *Scene) intervalNumber() int64 { - intervalDuration := time.Duration(s.data.IntervalMS) * time.Millisecond - if intervalDuration == 0 { - return 0 - } - - return int64(math.Round(float64(time.Since(s.startTime)) / float64(intervalDuration))) -} - -func (s *Scene) intervalMax() int64 { - intervalDuration := time.Duration(s.data.IntervalMS) * time.Millisecond - if intervalDuration == 0 { - return 1 - } - - return int64(s.duration / intervalDuration) -} diff --git a/models/scene.go b/models/scene.go index 37edfc3..a05e3a6 100644 --- a/models/scene.go +++ b/models/scene.go @@ -113,7 +113,7 @@ func (r *SceneRole) Validate() error { } switch r.PowerMode { - case SPScene, SPDevice, SPOverride: + case SPScene, SPDevice: default: return ErrSceneRoleUnknownPowerMode } @@ -165,7 +165,7 @@ func (r *SceneRole) ApplyOrder(devices []Device) { } } -func (r *SceneRole) ApplyEffect(device *Device, c SceneRunContext) (newState NewDeviceState, deviceChanged bool) { +func (r *SceneRole) ApplyEffect(device *Device, c SceneRunContext) (newState NewDeviceState) { switch r.Effect { case SEStatic: newState = r.States[0] @@ -184,11 +184,6 @@ func (r *SceneRole) ApplyEffect(device *Device, c SceneRunContext) (newState New newState.Power = nil case SPScene: // Do nothing - case SPOverride: - if newState.Power != nil && device.State.Power != *newState.Power { - device.State.Power = *newState.Power - deviceChanged = true - } } return @@ -226,10 +221,10 @@ type ScenePowerMode string const ( SPDevice ScenePowerMode = "Device" // Device state decides power. Scene state may only power off. SPScene ScenePowerMode = "Scene" // Scene state decide power. - SPOverride ScenePowerMode = "Override" // Same as above, but Scene state set Device state' power. This is good for "wake up" scenes. ) -// DeviceSceneAssignment is an entry on the device stack. +// DeviceSceneAssignment is an entry on the device stack. StartTime and DurationMS are only respected when the scene +// instance is created for the group. type DeviceSceneAssignment struct { SceneID int `json:"sceneId"` Group string `json:"group"`