You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

491 lines
12 KiB

package mysqldb
import (
"context"
"database/sql"
"encoding/json"
"fmt"
lucifer3 "git.aiterp.net/lucifer3/server"
"git.aiterp.net/lucifer3/server/commands"
"git.aiterp.net/lucifer3/server/effects"
"git.aiterp.net/lucifer3/server/events"
"git.aiterp.net/lucifer3/server/internal/gentools"
"git.aiterp.net/lucifer3/server/services/mysqldb/mysqlgen"
"git.aiterp.net/lucifer3/server/services/script"
"strings"
"time"
_ "github.com/go-sql-driver/mysql"
)
type database struct {
db *sql.DB
scriptVariables map[string]string
}
func (d *database) Active() bool {
return true
}
func (d *database) HandleEvent(bus *lucifer3.EventBus, event lucifer3.Event) {
q := mysqlgen.New(d.db)
timeout, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
switch event := event.(type) {
case events.Started:
timeout, cancel := context.WithTimeout(context.Background(), time.Second*90)
defer cancel()
// Fetch all aliases first
aliases, err := q.ListDeviceAliases(timeout)
if err != nil {
bus.RunEvent(events.Log{
Level: "error",
Code: "database_could_not_list_aliases",
Message: "Database could not list aliases: " + err.Error(),
})
}
auths, err := q.ListDeviceAuth(timeout)
if err != nil {
bus.RunEvent(events.Log{
Level: "error",
Code: "database_could_not_list_device_infos",
Message: "Hardware State serialization failed: " + err.Error(),
})
}
for _, auth := range auths {
bus.RunCommand(commands.ConnectDevice{
ID: auth.ID,
APIKey: auth.ApiKey,
})
}
infos, err := q.ListDeviceInfos(timeout)
if err != nil {
bus.RunEvent(events.Log{
Level: "error",
Code: "database_could_not_list_device_infos",
Message: "Hardware State serialization failed: " + err.Error(),
})
}
for _, info := range infos {
switch info.Kind {
case "hardware_state":
staleEvent := events.HardwareState{}
err := json.Unmarshal(info.Data, &staleEvent)
if err == nil {
bus.RunEvent(staleEvent)
}
case "hardware_metadata":
staleEvent := events.HardwareMetadata{}
err := json.Unmarshal(info.Data, &staleEvent)
if err == nil {
bus.RunEvent(staleEvent)
}
}
}
// Run the aliases now
for _, alias := range aliases {
bus.RunCommand(commands.AddAlias{
Match: alias.ID,
Alias: alias.Alias,
})
}
assignments, err := q.ListDeviceAssignments(timeout)
if err != nil {
bus.RunEvent(events.Log{
Level: "error",
Code: "database_could_not_list_assignments",
Message: "Database could not list assignments: " + err.Error(),
})
}
for _, ass := range assignments {
effect := effects.Serializable{}
err := json.Unmarshal(ass.Effect, &effect)
if err != nil {
bus.RunEvent(events.Log{
Level: "error",
Code: "database_could_not_deserialize_effect",
Message: err.Error(),
})
continue
}
bus.RunCommand(commands.Assign{
ID: gentools.ShallowCopy(&ass.ID),
Match: ass.Match,
Effect: effect.Effect,
})
}
scripts, err := q.ListScripts(timeout)
if err != nil {
bus.RunEvent(events.Log{
Level: "error",
Code: "database_could_not_list_scripts",
Message: "Database could not list scripts: " + err.Error(),
})
}
for _, scr := range scripts {
var lines []script.Line
err := json.Unmarshal(scr.Data, &lines)
if err != nil {
bus.RunEvent(events.Log{
Level: "error",
Code: "database_could_not_read_script",
Message: "Database could not read script: " + err.Error(),
})
}
bus.RunCommand(script.Update{
Name: scr.Name,
Lines: lines,
})
}
scriptVariables, err := q.ListScriptVariables(timeout)
if err != nil {
bus.RunEvent(events.Log{
Level: "error",
Code: "database_could_not_list_script_variables",
Message: "Database could not list script variables: " + err.Error(),
})
}
for _, variable := range scriptVariables {
d.scriptVariables[variable.Scope+"::"+variable.Name] = variable.Value
cmd := script.SetVariable{
Key: variable.Name,
Value: variable.Value,
}
scopeKind, scopeValue, _ := strings.Cut(variable.Scope, ":")
switch scopeKind {
case "devices":
cmd.Devices = strings.Split(scopeValue, ";")
case "match":
cmd.Match = gentools.Ptr(scopeValue)
}
bus.RunCommand(cmd)
}
scriptTriggers, err := q.ListScriptTriggers(timeout)
if err != nil {
bus.RunEvent(events.Log{
Level: "error",
Code: "database_could_not_list_script_triggers",
Message: "Database could not list script triggers: " + err.Error(),
})
}
for _, trig := range scriptTriggers {
bus.RunCommand(script.UpdateTrigger{
ID: trig.ID,
Name: trig.Name,
Event: script.TriggerEvent(trig.Event),
DeviceMatch: trig.DeviceMatch,
Parameter: trig.Parameter,
ScriptTarget: trig.ScriptTarget,
ScriptName: trig.ScriptName,
ScriptPre: fromJSON[[]script.Line](trig.ScriptPre),
ScriptPost: fromJSON[[]script.Line](trig.ScriptPost),
})
}
case events.DeviceAccepted:
if event.Extras == nil {
event.Extras = map[string]string{}
}
extras, err := json.Marshal(event.Extras)
if err != nil {
extras = []byte("{}")
}
err = q.ReplaceDeviceAuth(timeout, mysqlgen.ReplaceDeviceAuthParams{
ID: event.ID,
ApiKey: event.APIKey,
Extras: extras,
})
if err != nil {
bus.RunEvent(events.Log{
Level: "error",
Code: "database_could_not_save_device_auth",
Message: "Device auth save failed: " + err.Error(),
})
}
case events.HardwareState:
if event.Stale {
return
}
event.Stale = true
serialized, err := json.Marshal(event)
if err != nil {
bus.RunEvent(events.Log{
Level: "error",
Code: "database_could_not_serialize_hardware_state",
Message: "Hardware State serialization failed: " + err.Error(),
})
return
}
err = q.ReplaceDeviceInfo(timeout, mysqlgen.ReplaceDeviceInfoParams{
ID: event.ID,
Kind: "hardware_state",
Data: serialized,
})
if err != nil {
bus.RunEvent(events.Log{
Level: "error",
Code: "database_could_not_save_hardware_state",
Message: err.Error(),
})
return
}
case events.HardwareMetadata:
if event.Stale {
return
}
event.Stale = true
serialized, err := json.Marshal(event)
if err != nil {
bus.RunEvent(events.Log{
Level: "error",
Code: "database_could_not_serialize_hardware_state",
Message: "Hardware State serialization failed: " + err.Error(),
})
return
}
err = q.ReplaceDeviceInfo(timeout, mysqlgen.ReplaceDeviceInfoParams{
ID: event.ID,
Kind: "hardware_metadata",
Data: serialized,
})
if err != nil {
bus.RunEvent(events.Log{
Level: "error",
Code: "database_could_not_save_hardware_state",
Message: err.Error(),
})
return
}
case events.AssignmentCreated:
serialized, err := json.Marshal(&effects.Serializable{Effect: event.Effect})
if err != nil {
bus.RunEvent(events.Log{
Level: "error",
Code: "database_could_not_serialize_effect",
Message: err.Error(),
})
return
}
err = q.ReplaceDeviceAssignment(timeout, mysqlgen.ReplaceDeviceAssignmentParams{
ID: event.ID,
CreatedDate: time.Now().UTC(),
Match: event.Match,
Effect: serialized,
})
if err != nil {
bus.RunEvent(events.Log{
Level: "error",
Code: "database_could_not_save_assignment",
Message: err.Error(),
})
return
}
case events.AssignmentRemoved:
err := q.DeleteDeviceAssignment(timeout, event.ID)
if err != nil {
bus.RunEvent(events.Log{
Level: "error",
Code: "database_could_not_remove_assignment",
Message: err.Error(),
})
return
}
case events.AliasAdded:
err := q.InsertDeviceAlias(timeout, mysqlgen.InsertDeviceAliasParams{
ID: event.ID,
Alias: event.Alias,
})
if err != nil {
bus.RunEvent(events.Log{
Level: "error",
Code: "database_could_not_remove_assignment",
Message: err.Error(),
})
return
}
case events.AliasRemoved:
err := q.DeleteDeviceAlias(timeout, mysqlgen.DeleteDeviceAliasParams{
ID: event.ID,
Alias: event.Alias,
})
if err != nil {
bus.RunEvent(events.Log{
Level: "error",
Code: "database_could_not_remove_assignment",
Message: err.Error(),
})
return
}
case events.DeviceForgotten:
err := q.DeleteDeviceInfoByID(timeout, event.ID)
if err != nil {
bus.RunEvent(events.Log{
Level: "error",
Code: "database_could_not_remove_info",
Message: "Failed to remove device info: " + err.Error(),
})
}
_ = q.DeleteDeviceAuth(timeout, event.ID)
err = q.DeleteDeviceAliasByID(timeout, event.ID)
if err != nil {
bus.RunEvent(events.Log{
Level: "error",
Code: "database_could_not_remove_aliases",
Message: "Failed to remove device aliases: " + err.Error(),
})
}
err = q.DeleteDeviceAliasByIDLike(timeout, event.ID+":%")
if err != nil {
bus.RunEvent(events.Log{
Level: "error",
Code: "database_could_not_remove_sub_aliases",
Message: "Failed to remove sub-device aliases: " + err.Error(),
})
}
}
}
func (d *database) HandleCommand(bus *lucifer3.EventBus, command lucifer3.Command) {
q := mysqlgen.New(d.db)
timeout, cancel := context.WithTimeout(context.Background(), time.Second/2)
defer cancel()
switch command := command.(type) {
case script.UpdateTrigger:
err := q.ReplaceScriptTrigger(timeout, mysqlgen.ReplaceScriptTriggerParams{
ID: command.ID,
Event: string(command.Event),
DeviceMatch: command.DeviceMatch,
Parameter: command.Parameter,
ScriptTarget: command.ScriptTarget,
ScriptName: command.ScriptName,
ScriptPre: toJSON(command.ScriptPre),
ScriptPost: toJSON(command.ScriptPost),
Name: command.Name,
})
if err != nil {
bus.RunEvent(events.Log{
Level: "error",
Code: "database_could_not_save_trigger",
Message: "Failed to save trigger: " + err.Error(),
})
}
case script.DeleteTrigger:
err := q.DeleteScriptTrigger(timeout, command.ID)
if err != nil {
bus.RunEvent(events.Log{
Level: "error",
Code: "database_could_not_delete_trigger",
Message: "Failed to delete trigger: " + err.Error(),
})
}
case script.SetVariable:
scopeName := "global"
if command.Match != nil {
scopeName = "match:" + *command.Match
} else if command.Devices != nil {
scopeName = "devices:" + strings.Join(command.Devices, ";")
}
if d.scriptVariables[scopeName+"::"+command.Key] != command.Value {
d.scriptVariables[scopeName+"::"+command.Key] = command.Value
err := q.UpdateScriptVariables(timeout, mysqlgen.UpdateScriptVariablesParams{
Scope: scopeName,
Name: command.Key,
Value: command.Value,
})
if err != nil {
bus.RunEvent(events.Log{
Level: "error",
Code: "database_could_not_save_variables",
Message: "Failed to save variable: " + err.Error(),
})
}
}
case script.Update:
j, _ := json.Marshal(command.Lines)
if (len(command.Lines)) > 0 {
err := q.SaveScript(timeout, mysqlgen.SaveScriptParams{
Name: command.Name,
Data: j,
})
if err != nil {
bus.RunEvent(events.Log{
Level: "error",
Code: "database_could_not_save_script",
Message: "Failed to save script: " + err.Error(),
})
}
} else {
err := q.DeleteScript(timeout, command.Name)
if err != nil {
bus.RunEvent(events.Log{
Level: "error",
Code: "database_could_not_delete_script",
Message: "Failed to delete script: " + err.Error(),
})
}
}
}
}
func fromJSON[T any](j []byte) T {
var t T
_ = json.Unmarshal(j, &t)
return t
}
func toJSON[T any](v T) json.RawMessage {
j, _ := json.Marshal(v)
return j
}
func Connect(host string, port int, username, password, dbname string) (lucifer3.ActiveService, error) {
db, err := sql.Open("mysql", fmt.Sprintf(
"%s:%s@(%s:%d)/%s?parseTime=true", username, password, host, port, dbname,
))
if err != nil {
return nil, err
}
db.SetMaxOpenConns(10)
db.SetMaxIdleConns(10)
db.SetConnMaxIdleTime(time.Minute)
err = db.Ping()
if err != nil {
return nil, err
}
return &database{db: db, scriptVariables: make(map[string]string)}, nil
}