Browse Source

reduce Hue2 'update storms'..

feature-colorvalue2 3.7.0
Gisle Aune 3 years ago
parent
commit
8a20bab709
  1. 33
      app/services/publisher/publisher.go
  2. 39
      internal/drivers/hue2/bridge.go
  3. 6
      internal/drivers/hue2/data.go

33
app/services/publisher/publisher.go

@ -87,6 +87,8 @@ func (p *Publisher) Publish(devices ...models.Device) {
return return
} }
unwaitMap := make(map[int]bool)
p.mu.Lock() p.mu.Lock()
defer p.mu.Unlock() defer p.mu.Unlock()
@ -106,10 +108,14 @@ func (p *Publisher) Publish(devices ...models.Device) {
} }
} else { } else {
p.pending[device.BridgeID] = append(p.pending[device.BridgeID], device) p.pending[device.BridgeID] = append(p.pending[device.BridgeID], device)
if p.waiting[device.BridgeID] != nil {
close(p.waiting[device.BridgeID])
p.waiting[device.BridgeID] = nil
}
unwaitMap[device.BridgeID] = true
}
}
for bridgeID := range unwaitMap {
if p.waiting[bridgeID] != nil {
close(p.waiting[bridgeID])
p.waiting[bridgeID] = nil
} }
} }
} }
@ -124,11 +130,16 @@ func (p *Publisher) Run() {
ticker := time.NewTicker(time.Millisecond * 100) ticker := time.NewTicker(time.Millisecond * 100)
deleteList := make([]int, 0, 8) deleteList := make([]int, 0, 8)
updatedList := make([]models.Device, 0, 16) updatedList := make([]models.Device, 0, 16)
unwaitMap := make(map[int]bool)
for range ticker.C { for range ticker.C {
deleteList = deleteList[:0] deleteList = deleteList[:0]
updatedList = updatedList[:0] updatedList = updatedList[:0]
for key := range unwaitMap {
delete(unwaitMap, key)
}
p.mu.Lock() p.mu.Lock()
for i, scene := range p.scenes { for i, scene := range p.scenes {
if (!scene.endTime.IsZero() && time.Now().After(scene.endTime)) || scene.Empty() { if (!scene.endTime.IsZero() && time.Now().After(scene.endTime)) || scene.Empty() {
@ -160,9 +171,13 @@ func (p *Publisher) Run() {
} }
p.pending[device.BridgeID] = append(p.pending[device.BridgeID], device) p.pending[device.BridgeID] = append(p.pending[device.BridgeID], device)
if p.waiting[device.BridgeID] != nil {
close(p.waiting[device.BridgeID])
p.waiting[device.BridgeID] = nil
unwaitMap[device.BridgeID] = true
}
for bridgeID := range unwaitMap {
if p.waiting[bridgeID] != nil {
close(p.waiting[bridgeID])
p.waiting[bridgeID] = nil
} }
} }
p.mu.Unlock() p.mu.Unlock()
@ -270,8 +285,8 @@ func (p *Publisher) runBridge(id int) {
p.mu.Lock() p.mu.Lock()
} }
updates := p.pending[id]
p.pending[id] = p.pending[id][:0:0]
updates := append(p.pending[id][:0:0], p.pending[id]...)
p.pending[id] = p.pending[id][:0]
p.mu.Unlock() p.mu.Unlock()
// Only allow the latest update per device (this avoids slow bridges causing a backlog of cations). // Only allow the latest update per device (this avoids slow bridges causing a backlog of cations).

39
internal/drivers/hue2/bridge.go

@ -45,7 +45,7 @@ func (b *Bridge) Run(ctx context.Context, eventCh chan<- models.Event) error {
if err != nil { if err != nil {
return err return err
} }
log.Println(fmt.Sprintf("[Bridge %d]", b.externalID), "Updated", updated, "hue services at startup.")
log.Println(fmt.Sprintf("[Bridge %d]", b.externalID), "Updated", updated, "hue services (startup)")
lightRefreshTimer := time.NewTicker(time.Second * 5) lightRefreshTimer := time.NewTicker(time.Second * 5)
defer lightRefreshTimer.Stop() defer lightRefreshTimer.Stop()
@ -222,7 +222,7 @@ func (b *Bridge) Update(devices ...models.Device) {
} }
func (b *Bridge) MakeCongruent(ctx context.Context) (int, error) { func (b *Bridge) MakeCongruent(ctx context.Context) (int, error) {
// Eat all event if there's a pending update.
// Exhaust the channel to avoid more updates.
exhausted := false exhausted := false
for !exhausted { for !exhausted {
select { select {
@ -294,14 +294,26 @@ func (b *Bridge) MakeCongruent(ctx context.Context) (int, error) {
} }
} }
} }
if len(updates) > 0 {
// Optimistically apply the updates to the states, so that the driver assumes they are set until
// proven otherwise by the SSE client.
newResources := make(map[string]*ResourceData, len(b.resources))
for key, value := range b.resources {
newResources[key] = value
}
for key, update := range updates {
id := strings.SplitN(key, "/", 2)[1]
newResources[id] = newResources[id].WithUpdate(update)
}
b.resources = newResources
}
b.mu.Unlock() b.mu.Unlock()
if len(updates) == 0 { if len(updates) == 0 {
return 0, nil return 0, nil
} }
log.Println(fmt.Sprintf("[Bridge %d]", b.externalID), "Updating", len(updates), "hue services...")
eg, ctx := errgroup.WithContext(ctx) eg, ctx := errgroup.WithContext(ctx)
for key := range updates { for key := range updates {
update := updates[key] update := updates[key]
@ -315,23 +327,10 @@ func (b *Bridge) MakeCongruent(ctx context.Context) (int, error) {
err := eg.Wait() err := eg.Wait()
if err != nil { if err != nil {
return len(updates), err
}
// Try to restore light states.
_ = b.Refresh(ctx, "light")
if len(updates) > 0 {
// Optimistically apply the updates to the states, so that the driver assumes they are set until
// proven otherwise by the SSE client.
b.mu.Lock()
newResources := make(map[string]*ResourceData, len(b.resources))
for key, value := range b.resources {
newResources[key] = value
}
for key, update := range updates {
id := strings.SplitN(key, "/", 2)[1]
newResources[id] = newResources[id].WithUpdate(update)
}
b.resources = newResources
b.mu.Unlock()
return len(updates), err
} }
return len(updates), nil return len(updates), nil

6
internal/drivers/hue2/data.go

@ -102,6 +102,12 @@ func (res *ResourceData) WithUpdate(update ResourceUpdate) *ResourceData {
cp := *resCopy.Color cp := *resCopy.Color
resCopy.Color = &cp resCopy.Color = &cp
resCopy.Color.XY = *update.ColorXY resCopy.Color.XY = *update.ColorXY
if resCopy.ColorTemperature != nil {
cp := *resCopy.ColorTemperature
resCopy.ColorTemperature = &cp
resCopy.ColorTemperature.Mirek = nil
}
} }
if update.Mirek != nil { if update.Mirek != nil {
cp := *resCopy.ColorTemperature cp := *resCopy.ColorTemperature

Loading…
Cancel
Save