You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

326 lines
7.7 KiB

package publisher
import (
"context"
"git.aiterp.net/lucifer/new-server/app/config"
"git.aiterp.net/lucifer/new-server/models"
"log"
"sync"
"time"
)
type Publisher struct {
mu sync.Mutex
sceneData map[int]*models.Scene
scenes []*Scene
sceneAssignment map[int]*Scene
started map[int]bool
pending map[int][]models.Device
waiting map[int]chan struct{}
}
func (p *Publisher) SceneState(deviceID int) *models.DeviceState {
p.mu.Lock()
defer p.mu.Unlock()
if s := p.sceneAssignment[deviceID]; s != nil {
return s.LastState(deviceID)
}
return nil
}
func (p *Publisher) UpdateScene(data models.Scene) {
p.mu.Lock()
p.sceneData[data.ID] = &data
for _, scene := range p.scenes {
if scene.data.ID == data.ID {
scene.UpdateScene(data)
}
}
p.mu.Unlock()
}
func (p *Publisher) ReloadScenes(ctx context.Context) error {
scenes, err := config.SceneRepository().FetchAll(ctx)
if err != nil {
return err
}
p.mu.Lock()
for i, scene := range scenes {
p.sceneData[scene.ID] = &scenes[i]
}
for _, scene := range p.scenes {
scene.UpdateScene(*p.sceneData[scene.data.ID])
}
p.mu.Unlock()
return nil
}
func (p *Publisher) ReloadDevices(ctx context.Context) error {
devices, err := config.DeviceRepository().FetchByReference(ctx, models.RKAll, "")
if err != nil {
return err
}
p.Publish(devices...)
return nil
}
func (p *Publisher) Publish(devices ...models.Device) {
if len(devices) == 0 {
return
}
p.mu.Lock()
defer p.mu.Unlock()
for _, device := range devices {
if !p.started[device.BridgeID] {
p.started[device.BridgeID] = true
go p.runBridge(device.BridgeID)
}
p.reassignDevice(device)
if p.sceneAssignment[device.ID] != nil {
p.sceneAssignment[device.ID].UpsertDevice(device)
} else {
p.pending[device.BridgeID] = append(p.pending[device.BridgeID], device)
if p.waiting[device.BridgeID] != nil {
close(p.waiting[device.BridgeID])
p.waiting[device.BridgeID] = nil
}
}
}
}
func (p *Publisher) PublishChannel(ch <-chan []models.Device) {
for list := range ch {
p.Publish(list...)
}
}
func (p *Publisher) Run() {
ticker := time.NewTicker(time.Millisecond * 100)
deleteList := make([]int, 0, 8)
updatedList := make([]models.Device, 0, 16)
reassignList := make([]models.Device, 0, 16)
for range ticker.C {
deleteList = deleteList[:0]
updatedList = updatedList[:0]
reassignList = reassignList[:0]
p.mu.Lock()
for i, scene := range p.scenes {
if (!scene.endTime.IsZero() && time.Now().After(scene.endTime)) || scene.Empty() {
deleteList = append(deleteList, i-len(deleteList))
updatedList = append(updatedList, scene.AllDevices()...)
reassignList = append(reassignList, scene.AllDevices()...)
log.Printf("Removing scene instance for %s (%d)", scene.data.Name, scene.data.ID)
for _, device := range scene.AllDevices() {
p.sceneAssignment[device.ID] = nil
}
continue
}
if scene.Due() {
updatedList = append(updatedList, scene.Run()...)
updatedList = append(updatedList, scene.UnaffectedDevices()...)
}
}
for _, i := range deleteList {
p.scenes = append(p.scenes[:i], p.scenes[i+1:]...)
}
for _, device := range reassignList {
p.reassignDevice(device)
if p.sceneAssignment[device.ID] != nil {
p.sceneAssignment[device.ID].UpsertDevice(device)
}
}
for _, device := range updatedList {
if !p.started[device.BridgeID] {
p.started[device.BridgeID] = true
go p.runBridge(device.BridgeID)
}
p.pending[device.BridgeID] = append(p.pending[device.BridgeID], device)
if p.waiting[device.BridgeID] != nil {
close(p.waiting[device.BridgeID])
p.waiting[device.BridgeID] = nil
}
}
p.mu.Unlock()
}
}
// reassignDevice re-evaluates the device's scene assignment config. It will return whether the scene changed, which
// should trigger an update.
func (p *Publisher) reassignDevice(device models.Device) bool {
var selectedAssignment *models.DeviceSceneAssignment
for i, assignment := range device.SceneAssignments {
duration := time.Duration(assignment.DurationMS) * time.Millisecond
if duration <= 0 || time.Now().Before(assignment.StartTime.Add(duration)) {
selectedAssignment = &device.SceneAssignments[i]
}
}
if selectedAssignment == nil {
if p.sceneAssignment[device.ID] != nil {
p.sceneAssignment[device.ID].RemoveDevice(device)
delete(p.sceneAssignment, device.ID)
// Scene changed
return true
}
// Stop here, no scene should be assigned.
return false
} else {
if p.sceneData[selectedAssignment.SceneID] == nil {
// Freeze until scene becomes available.
return false
}
}
if p.sceneAssignment[device.ID] != nil {
scene := p.sceneAssignment[device.ID]
if scene.data.ID == selectedAssignment.SceneID && scene.group == selectedAssignment.Group {
// Current assignment is good.
return false
}
p.sceneAssignment[device.ID].RemoveDevice(device)
delete(p.sceneAssignment, device.ID)
}
for _, scene := range p.scenes {
if scene.data.ID == selectedAssignment.SceneID && scene.group == selectedAssignment.Group {
p.sceneAssignment[device.ID] = scene
return true
}
}
newScene := &Scene{
data: p.sceneData[selectedAssignment.SceneID],
group: selectedAssignment.Group,
startTime: selectedAssignment.StartTime,
endTime: selectedAssignment.StartTime.Add(time.Duration(selectedAssignment.DurationMS) * time.Millisecond),
roleMap: make(map[int]int, 16),
roleList: make(map[int][]models.Device, 16),
lastStates: make(map[int]models.DeviceState, 16),
due: true,
}
p.sceneAssignment[device.ID] = newScene
p.scenes = append(p.scenes, newScene)
if selectedAssignment.DurationMS <= 0 {
newScene.endTime = time.Time{}
}
return true
}
func (p *Publisher) runBridge(id int) {
defer func() {
p.mu.Lock()
p.started[id] = false
p.mu.Unlock()
}()
bridge, err := config.BridgeRepository().Find(context.Background(), id)
if err != nil {
log.Println("Failed to get bridge data:", err)
return
}
driver, err := config.DriverProvider().Provide(bridge.Driver)
if err != nil {
log.Println("Failed to get bridge driver:", err)
log.Println("Maybe Lucifer needs to be updated.")
return
}
devices := make(map[int]models.Device)
for {
p.mu.Lock()
if len(p.pending[id]) == 0 {
if p.waiting[id] == nil {
p.waiting[id] = make(chan struct{})
}
waitCh := p.waiting[id]
p.mu.Unlock()
<-waitCh
p.mu.Lock()
}
updates := p.pending[id]
p.pending[id] = p.pending[id][:0:0]
p.mu.Unlock()
// Only allow the latest update per device (this avoids slow bridges causing a backlog of cations).
for key := range devices {
delete(devices, key)
}
for _, update := range updates {
devices[update.ID] = update
}
updates = updates[:0]
for _, value := range devices {
updates = append(updates, value)
}
err := driver.Publish(context.Background(), bridge, updates)
if err != nil {
log.Println("Failed to publish to driver:", err)
p.mu.Lock()
p.pending[id] = append(updates, p.pending[id]...)
p.mu.Unlock()
return
}
}
}
var publisher = Publisher{
sceneData: make(map[int]*models.Scene),
scenes: make([]*Scene, 0, 16),
sceneAssignment: make(map[int]*Scene, 16),
started: make(map[int]bool),
pending: make(map[int][]models.Device),
waiting: make(map[int]chan struct{}),
}
func Global() *Publisher {
return &publisher
}
func Initialize(ctx context.Context) error {
err := publisher.ReloadScenes(ctx)
if err != nil {
return err
}
err = publisher.ReloadDevices(ctx)
if err != nil {
return err
}
go publisher.Run()
time.Sleep(time.Millisecond * 50)
go publisher.PublishChannel(config.PublishChannel)
return nil
}