|
|
package mysqldb
import ( "context" "database/sql" "encoding/json" "fmt" lucifer3 "git.aiterp.net/lucifer3/server" "git.aiterp.net/lucifer3/server/commands" "git.aiterp.net/lucifer3/server/effects" "git.aiterp.net/lucifer3/server/events" "git.aiterp.net/lucifer3/server/internal/gentools" "git.aiterp.net/lucifer3/server/services/mysqldb/mysqlgen" "git.aiterp.net/lucifer3/server/services/script" "strings" "time"
_ "github.com/go-sql-driver/mysql" )
type database struct { db *sql.DB scriptVariables map[string]string }
func (d *database) Active() bool { return true }
func (d *database) HandleEvent(bus *lucifer3.EventBus, event lucifer3.Event) { q := mysqlgen.New(d.db) timeout, cancel := context.WithTimeout(context.Background(), time.Second*5) defer cancel()
switch event := event.(type) { case events.Started: timeout, cancel := context.WithTimeout(context.Background(), time.Second*90) defer cancel()
// Fetch all aliases first
aliases, err := q.ListDeviceAliases(timeout) if err != nil { bus.RunEvent(events.Log{ Level: "error", Code: "database_could_not_list_aliases", Message: "Database could not list aliases: " + err.Error(), }) }
auths, err := q.ListDeviceAuth(timeout) if err != nil { bus.RunEvent(events.Log{ Level: "error", Code: "database_could_not_list_device_infos", Message: "Hardware State serialization failed: " + err.Error(), }) } for _, auth := range auths { bus.RunCommand(commands.ConnectDevice{ ID: auth.ID, APIKey: auth.ApiKey, }) }
infos, err := q.ListDeviceInfos(timeout) if err != nil { bus.RunEvent(events.Log{ Level: "error", Code: "database_could_not_list_device_infos", Message: "Hardware State serialization failed: " + err.Error(), }) } for _, info := range infos { switch info.Kind { case "hardware_state": staleEvent := events.HardwareState{} err := json.Unmarshal(info.Data, &staleEvent) if err == nil { bus.RunEvent(staleEvent) } case "hardware_metadata": staleEvent := events.HardwareMetadata{} err := json.Unmarshal(info.Data, &staleEvent) if err == nil { bus.RunEvent(staleEvent) } } }
// Run the aliases now
for _, alias := range aliases { bus.RunCommand(commands.AddAlias{ Match: alias.ID, Alias: alias.Alias, }) }
assignments, err := q.ListDeviceAssignments(timeout) if err != nil { bus.RunEvent(events.Log{ Level: "error", Code: "database_could_not_list_assignments", Message: "Database could not list assignments: " + err.Error(), }) } for _, ass := range assignments { effect := effects.Serializable{} err := json.Unmarshal(ass.Effect, &effect) if err != nil { bus.RunEvent(events.Log{ Level: "error", Code: "database_could_not_deserialize_effect", Message: err.Error(), }) continue }
bus.RunCommand(commands.Assign{ ID: gentools.ShallowCopy(&ass.ID), Match: ass.Match, Effect: effect.Effect, }) }
scripts, err := q.ListScripts(timeout) if err != nil { bus.RunEvent(events.Log{ Level: "error", Code: "database_could_not_list_scripts", Message: "Database could not list scripts: " + err.Error(), }) } for _, scr := range scripts { var lines []script.Line err := json.Unmarshal(scr.Data, &lines) if err != nil { bus.RunEvent(events.Log{ Level: "error", Code: "database_could_not_read_script", Message: "Database could not read script: " + err.Error(), }) }
bus.RunCommand(script.Update{ Name: scr.Name, Lines: lines, }) }
scriptVariables, err := q.ListScriptVariables(timeout) if err != nil { bus.RunEvent(events.Log{ Level: "error", Code: "database_could_not_list_script_variables", Message: "Database could not list script variables: " + err.Error(), }) } for _, variable := range scriptVariables { d.scriptVariables[variable.Scope+"::"+variable.Name] = variable.Value
cmd := script.SetVariable{ Key: variable.Name, Value: variable.Value, }
scopeKind, scopeValue, _ := strings.Cut(variable.Scope, ":") switch scopeKind { case "devices": cmd.Devices = strings.Split(scopeValue, ";") case "match": cmd.Match = gentools.Ptr(scopeValue) }
bus.RunCommand(cmd) }
scriptTriggers, err := q.ListScriptTriggers(timeout) if err != nil { bus.RunEvent(events.Log{ Level: "error", Code: "database_could_not_list_script_triggers", Message: "Database could not list script triggers: " + err.Error(), }) } for _, trig := range scriptTriggers { bus.RunCommand(script.UpdateTrigger{ ID: trig.ID, Name: trig.Name, Event: script.TriggerEvent(trig.Event), DeviceMatch: trig.DeviceMatch, Parameter: trig.Parameter, ScriptTarget: trig.ScriptTarget, ScriptName: trig.ScriptName, ScriptPre: fromJSON[[]script.Line](trig.ScriptPre), ScriptPost: fromJSON[[]script.Line](trig.ScriptPost), }) } case events.DeviceAccepted: if event.Extras == nil { event.Extras = map[string]string{} }
extras, err := json.Marshal(event.Extras) if err != nil { extras = []byte("{}") }
err = q.ReplaceDeviceAuth(timeout, mysqlgen.ReplaceDeviceAuthParams{ ID: event.ID, ApiKey: event.APIKey, Extras: extras, }) if err != nil { bus.RunEvent(events.Log{ Level: "error", Code: "database_could_not_save_device_auth", Message: "Device auth save failed: " + err.Error(), }) }
case events.HardwareState: if event.Stale { return }
event.Stale = true serialized, err := json.Marshal(event) if err != nil { bus.RunEvent(events.Log{ Level: "error", Code: "database_could_not_serialize_hardware_state", Message: "Hardware State serialization failed: " + err.Error(), }) return }
err = q.ReplaceDeviceInfo(timeout, mysqlgen.ReplaceDeviceInfoParams{ ID: event.ID, Kind: "hardware_state", Data: serialized, }) if err != nil { bus.RunEvent(events.Log{ Level: "error", Code: "database_could_not_save_hardware_state", Message: err.Error(), })
return }
case events.HardwareMetadata: if event.Stale { return }
event.Stale = true serialized, err := json.Marshal(event) if err != nil { bus.RunEvent(events.Log{ Level: "error", Code: "database_could_not_serialize_hardware_state", Message: "Hardware State serialization failed: " + err.Error(), }) return }
err = q.ReplaceDeviceInfo(timeout, mysqlgen.ReplaceDeviceInfoParams{ ID: event.ID, Kind: "hardware_metadata", Data: serialized, }) if err != nil { bus.RunEvent(events.Log{ Level: "error", Code: "database_could_not_save_hardware_state", Message: err.Error(), })
return }
case events.AssignmentCreated: serialized, err := json.Marshal(&effects.Serializable{Effect: event.Effect}) if err != nil { bus.RunEvent(events.Log{ Level: "error", Code: "database_could_not_serialize_effect", Message: err.Error(), }) return }
err = q.ReplaceDeviceAssignment(timeout, mysqlgen.ReplaceDeviceAssignmentParams{ ID: event.ID, CreatedDate: time.Now().UTC(), Match: event.Match, Effect: serialized, }) if err != nil { bus.RunEvent(events.Log{ Level: "error", Code: "database_could_not_save_assignment", Message: err.Error(), }) return }
case events.AssignmentRemoved: err := q.DeleteDeviceAssignment(timeout, event.ID) if err != nil { bus.RunEvent(events.Log{ Level: "error", Code: "database_could_not_remove_assignment", Message: err.Error(), }) return }
case events.AliasAdded: err := q.InsertDeviceAlias(timeout, mysqlgen.InsertDeviceAliasParams{ ID: event.ID, Alias: event.Alias, }) if err != nil { bus.RunEvent(events.Log{ Level: "error", Code: "database_could_not_remove_assignment", Message: err.Error(), }) return }
case events.AliasRemoved: err := q.DeleteDeviceAlias(timeout, mysqlgen.DeleteDeviceAliasParams{ ID: event.ID, Alias: event.Alias, }) if err != nil { bus.RunEvent(events.Log{ Level: "error", Code: "database_could_not_remove_assignment", Message: err.Error(), }) return }
case events.DeviceForgotten: err := q.DeleteDeviceInfoByID(timeout, event.ID) if err != nil { bus.RunEvent(events.Log{ Level: "error", Code: "database_could_not_remove_info", Message: "Failed to remove device info: " + err.Error(), }) } _ = q.DeleteDeviceAuth(timeout, event.ID) err = q.DeleteDeviceAliasByID(timeout, event.ID) if err != nil { bus.RunEvent(events.Log{ Level: "error", Code: "database_could_not_remove_aliases", Message: "Failed to remove device aliases: " + err.Error(), }) } err = q.DeleteDeviceAliasByIDLike(timeout, event.ID+":%") if err != nil { bus.RunEvent(events.Log{ Level: "error", Code: "database_could_not_remove_sub_aliases", Message: "Failed to remove sub-device aliases: " + err.Error(), }) } } }
func (d *database) HandleCommand(bus *lucifer3.EventBus, command lucifer3.Command) { q := mysqlgen.New(d.db) timeout, cancel := context.WithTimeout(context.Background(), time.Second/2) defer cancel()
switch command := command.(type) { case script.UpdateTrigger: err := q.ReplaceScriptTrigger(timeout, mysqlgen.ReplaceScriptTriggerParams{ ID: command.ID, Event: string(command.Event), DeviceMatch: command.DeviceMatch, Parameter: command.Parameter, ScriptTarget: command.ScriptTarget, ScriptName: command.ScriptName, ScriptPre: toJSON(command.ScriptPre), ScriptPost: toJSON(command.ScriptPost), Name: command.Name, }) if err != nil { bus.RunEvent(events.Log{ Level: "error", Code: "database_could_not_save_trigger", Message: "Failed to save trigger: " + err.Error(), }) } case script.DeleteTrigger: err := q.DeleteScriptTrigger(timeout, command.ID) if err != nil { bus.RunEvent(events.Log{ Level: "error", Code: "database_could_not_delete_trigger", Message: "Failed to delete trigger: " + err.Error(), }) } case script.SetVariable: scopeName := "global" if command.Match != nil { scopeName = "match:" + *command.Match } else if command.Devices != nil { scopeName = "devices:" + strings.Join(command.Devices, ";") }
if d.scriptVariables[scopeName+"::"+command.Key] != command.Value { d.scriptVariables[scopeName+"::"+command.Key] = command.Value err := q.UpdateScriptVariables(timeout, mysqlgen.UpdateScriptVariablesParams{ Scope: scopeName, Name: command.Key, Value: command.Value, }) if err != nil { bus.RunEvent(events.Log{ Level: "error", Code: "database_could_not_save_variables", Message: "Failed to save variable: " + err.Error(), }) } } case script.Update: j, _ := json.Marshal(command.Lines)
if (len(command.Lines)) > 0 { err := q.SaveScript(timeout, mysqlgen.SaveScriptParams{ Name: command.Name, Data: j, }) if err != nil { bus.RunEvent(events.Log{ Level: "error", Code: "database_could_not_save_script", Message: "Failed to save script: " + err.Error(), }) } } else { err := q.DeleteScript(timeout, command.Name) if err != nil { bus.RunEvent(events.Log{ Level: "error", Code: "database_could_not_delete_script", Message: "Failed to delete script: " + err.Error(), }) } } } }
func fromJSON[T any](j []byte) T { var t T _ = json.Unmarshal(j, &t) return t }
func toJSON[T any](v T) json.RawMessage { j, _ := json.Marshal(v) return j }
func Connect(host string, port int, username, password, dbname string) (lucifer3.ActiveService, error) { db, err := sql.Open("mysql", fmt.Sprintf( "%s:%s@(%s:%d)/%s?parseTime=true", username, password, host, port, dbname, )) if err != nil { return nil, err }
db.SetMaxOpenConns(10) db.SetMaxIdleConns(10) db.SetConnMaxIdleTime(time.Minute)
err = db.Ping() if err != nil { return nil, err }
return &database{db: db, scriptVariables: make(map[string]string)}, nil }
|