diff --git a/app/services/publisher/publisher.go b/app/services/publisher/publisher.go index a0f315c..d5ac6d0 100644 --- a/app/services/publisher/publisher.go +++ b/app/services/publisher/publisher.go @@ -87,6 +87,8 @@ func (p *Publisher) Publish(devices ...models.Device) { return } + unwaitMap := make(map[int]bool) + p.mu.Lock() defer p.mu.Unlock() @@ -106,10 +108,14 @@ func (p *Publisher) Publish(devices ...models.Device) { } } else { p.pending[device.BridgeID] = append(p.pending[device.BridgeID], device) - if p.waiting[device.BridgeID] != nil { - close(p.waiting[device.BridgeID]) - p.waiting[device.BridgeID] = nil - } + unwaitMap[device.BridgeID] = true + } + } + + for bridgeID := range unwaitMap { + if p.waiting[bridgeID] != nil { + close(p.waiting[bridgeID]) + p.waiting[bridgeID] = nil } } } @@ -124,11 +130,16 @@ func (p *Publisher) Run() { ticker := time.NewTicker(time.Millisecond * 100) deleteList := make([]int, 0, 8) updatedList := make([]models.Device, 0, 16) + unwaitMap := make(map[int]bool) for range ticker.C { deleteList = deleteList[:0] updatedList = updatedList[:0] + for key := range unwaitMap { + delete(unwaitMap, key) + } + p.mu.Lock() for i, scene := range p.scenes { if (!scene.endTime.IsZero() && time.Now().After(scene.endTime)) || scene.Empty() { @@ -160,9 +171,13 @@ func (p *Publisher) Run() { } p.pending[device.BridgeID] = append(p.pending[device.BridgeID], device) - if p.waiting[device.BridgeID] != nil { - close(p.waiting[device.BridgeID]) - p.waiting[device.BridgeID] = nil + unwaitMap[device.BridgeID] = true + } + + for bridgeID := range unwaitMap { + if p.waiting[bridgeID] != nil { + close(p.waiting[bridgeID]) + p.waiting[bridgeID] = nil } } p.mu.Unlock() @@ -270,8 +285,8 @@ func (p *Publisher) runBridge(id int) { p.mu.Lock() } - updates := p.pending[id] - p.pending[id] = p.pending[id][:0:0] + updates := append(p.pending[id][:0:0], p.pending[id]...) + p.pending[id] = p.pending[id][:0] p.mu.Unlock() // Only allow the latest update per device (this avoids slow bridges causing a backlog of cations). diff --git a/internal/drivers/hue2/bridge.go b/internal/drivers/hue2/bridge.go index 767c2f5..aada532 100644 --- a/internal/drivers/hue2/bridge.go +++ b/internal/drivers/hue2/bridge.go @@ -45,7 +45,7 @@ func (b *Bridge) Run(ctx context.Context, eventCh chan<- models.Event) error { if err != nil { return err } - log.Println(fmt.Sprintf("[Bridge %d]", b.externalID), "Updated", updated, "hue services at startup.") + log.Println(fmt.Sprintf("[Bridge %d]", b.externalID), "Updated", updated, "hue services (startup)") lightRefreshTimer := time.NewTicker(time.Second * 5) defer lightRefreshTimer.Stop() @@ -222,7 +222,7 @@ func (b *Bridge) Update(devices ...models.Device) { } func (b *Bridge) MakeCongruent(ctx context.Context) (int, error) { - // Eat all event if there's a pending update. + // Exhaust the channel to avoid more updates. exhausted := false for !exhausted { select { @@ -294,14 +294,26 @@ func (b *Bridge) MakeCongruent(ctx context.Context) (int, error) { } } } + + if len(updates) > 0 { + // Optimistically apply the updates to the states, so that the driver assumes they are set until + // proven otherwise by the SSE client. + newResources := make(map[string]*ResourceData, len(b.resources)) + for key, value := range b.resources { + newResources[key] = value + } + for key, update := range updates { + id := strings.SplitN(key, "/", 2)[1] + newResources[id] = newResources[id].WithUpdate(update) + } + b.resources = newResources + } b.mu.Unlock() if len(updates) == 0 { return 0, nil } - log.Println(fmt.Sprintf("[Bridge %d]", b.externalID), "Updating", len(updates), "hue services...") - eg, ctx := errgroup.WithContext(ctx) for key := range updates { update := updates[key] @@ -315,23 +327,10 @@ func (b *Bridge) MakeCongruent(ctx context.Context) (int, error) { err := eg.Wait() if err != nil { - return len(updates), err - } + // Try to restore light states. + _ = b.Refresh(ctx, "light") - if len(updates) > 0 { - // Optimistically apply the updates to the states, so that the driver assumes they are set until - // proven otherwise by the SSE client. - b.mu.Lock() - newResources := make(map[string]*ResourceData, len(b.resources)) - for key, value := range b.resources { - newResources[key] = value - } - for key, update := range updates { - id := strings.SplitN(key, "/", 2)[1] - newResources[id] = newResources[id].WithUpdate(update) - } - b.resources = newResources - b.mu.Unlock() + return len(updates), err } return len(updates), nil diff --git a/internal/drivers/hue2/data.go b/internal/drivers/hue2/data.go index a48bcdd..7f7134b 100644 --- a/internal/drivers/hue2/data.go +++ b/internal/drivers/hue2/data.go @@ -102,6 +102,12 @@ func (res *ResourceData) WithUpdate(update ResourceUpdate) *ResourceData { cp := *resCopy.Color resCopy.Color = &cp resCopy.Color.XY = *update.ColorXY + + if resCopy.ColorTemperature != nil { + cp := *resCopy.ColorTemperature + resCopy.ColorTemperature = &cp + resCopy.ColorTemperature.Mirek = nil + } } if update.Mirek != nil { cp := *resCopy.ColorTemperature