|
|
package mysqldb
import ( "context" "database/sql" "encoding/json" "fmt" lucifer3 "git.aiterp.net/lucifer3/server" "git.aiterp.net/lucifer3/server/commands" "git.aiterp.net/lucifer3/server/effects" "git.aiterp.net/lucifer3/server/events" "git.aiterp.net/lucifer3/server/internal/gentools" "git.aiterp.net/lucifer3/server/services/mysqldb/mysqlgen" "time"
_ "github.com/go-sql-driver/mysql" )
type database struct { db *sql.DB }
func (d *database) Active() bool { return true }
func (d *database) HandleEvent(bus *lucifer3.EventBus, event lucifer3.Event) { q := mysqlgen.New(d.db) timeout, cancel := context.WithTimeout(context.Background(), time.Second/2) defer cancel()
switch event := event.(type) { case events.Started: timeout, cancel := context.WithTimeout(context.Background(), time.Second*10) defer cancel()
// Fetch all aliases first
aliases, err := q.ListDeviceAliases(timeout) if err != nil { bus.RunEvent(events.Log{ Level: "error", Code: "database_could_not_list_aliases", Message: "Database could not list aliases: " + err.Error(), }) }
auths, err := q.ListDeviceAuth(timeout) if err != nil { bus.RunEvent(events.Log{ Level: "error", Code: "database_could_not_list_device_infos", Message: "Hardware State serialization failed: " + err.Error(), }) } for _, auth := range auths { bus.RunCommand(commands.ConnectDevice{ ID: auth.ID, APIKey: auth.ApiKey, }) }
infos, err := q.ListDeviceInfos(timeout) if err != nil { bus.RunEvent(events.Log{ Level: "error", Code: "database_could_not_list_device_infos", Message: "Hardware State serialization failed: " + err.Error(), }) } for _, info := range infos { switch info.Kind { case "hardware_state": staleEvent := events.HardwareState{} err := json.Unmarshal(info.Data, &staleEvent) if err == nil { bus.RunEvent(staleEvent) } case "hardware_metadata": staleEvent := events.HardwareMetadata{} err := json.Unmarshal(info.Data, &staleEvent) if err == nil { bus.RunEvent(staleEvent) } } }
// Run the aliases now
for _, alias := range aliases { bus.RunCommand(commands.AddAlias{ Match: alias.ID, Alias: alias.Alias, }) }
assignments, err := q.ListDeviceAssignments(timeout) if err != nil { bus.RunEvent(events.Log{ Level: "error", Code: "database_could_not_list_assignments", Message: "Database could not list assignments: " + err.Error(), }) } for _, ass := range assignments { effect := effects.Serializable{} err := json.Unmarshal(ass.Effect, &effect) if err != nil { bus.RunEvent(events.Log{ Level: "error", Code: "database_could_not_deserialize_effect", Message: err.Error(), }) continue }
bus.RunCommand(commands.Assign{ ID: gentools.ShallowCopy(&ass.ID), Match: ass.Match, Effect: effect.Effect, }) }
case events.DeviceAccepted: if event.Extras == nil { event.Extras = map[string]string{} }
extras, err := json.Marshal(event.Extras) if err != nil { extras = []byte("{}") }
err = q.ReplaceDeviceAuth(timeout, mysqlgen.ReplaceDeviceAuthParams{ ID: event.ID, ApiKey: event.APIKey, Extras: extras, }) if err != nil { bus.RunEvent(events.Log{ Level: "error", Code: "database_could_not_save_device_auth", Message: "Device auth save failed: " + err.Error(), }) }
case events.HardwareState: if event.Stale { return }
event.Stale = true serialized, err := json.Marshal(event) if err != nil { bus.RunEvent(events.Log{ Level: "error", Code: "database_could_not_serialize_hardware_state", Message: "Hardware State serialization failed: " + err.Error(), }) return }
err = q.ReplaceDeviceInfo(timeout, mysqlgen.ReplaceDeviceInfoParams{ ID: event.ID, Kind: "hardware_state", Data: serialized, }) if err != nil { bus.RunEvent(events.Log{ Level: "error", Code: "database_could_not_save_hardware_state", Message: err.Error(), })
return }
case events.HardwareMetadata: if event.Stale { return }
event.Stale = true serialized, err := json.Marshal(event) if err != nil { bus.RunEvent(events.Log{ Level: "error", Code: "database_could_not_serialize_hardware_state", Message: "Hardware State serialization failed: " + err.Error(), }) return }
err = q.ReplaceDeviceInfo(timeout, mysqlgen.ReplaceDeviceInfoParams{ ID: event.ID, Kind: "hardware_metadata", Data: serialized, }) if err != nil { bus.RunEvent(events.Log{ Level: "error", Code: "database_could_not_save_hardware_state", Message: err.Error(), })
return }
case events.AssignmentCreated: serialized, err := json.Marshal(&effects.Serializable{Effect: event.Effect}) if err != nil { bus.RunEvent(events.Log{ Level: "error", Code: "database_could_not_serialize_effect", Message: err.Error(), }) return }
err = q.ReplaceDeviceAssignment(timeout, mysqlgen.ReplaceDeviceAssignmentParams{ ID: event.ID, CreatedDate: time.Now().UTC(), Match: event.Match, Effect: serialized, }) if err != nil { bus.RunEvent(events.Log{ Level: "error", Code: "database_could_not_save_assignment", Message: err.Error(), }) return }
case events.AssignmentRemoved: err := q.DeleteDeviceAssignment(timeout, event.ID) if err != nil { bus.RunEvent(events.Log{ Level: "error", Code: "database_could_not_remove_assignment", Message: err.Error(), }) return }
case events.AliasAdded: err := q.InsertDeviceAlias(timeout, mysqlgen.InsertDeviceAliasParams{ ID: event.ID, Alias: event.Alias, }) if err != nil { bus.RunEvent(events.Log{ Level: "error", Code: "database_could_not_remove_assignment", Message: err.Error(), }) return }
case events.AliasRemoved: err := q.DeleteDeviceAlias(timeout, mysqlgen.DeleteDeviceAliasParams{ ID: event.ID, Alias: event.Alias, }) if err != nil { bus.RunEvent(events.Log{ Level: "error", Code: "database_could_not_remove_assignment", Message: err.Error(), }) return }
case events.DeviceForgotten: err := q.DeleteDeviceInfoByID(timeout, event.ID) if err != nil { bus.RunEvent(events.Log{ Level: "error", Code: "database_could_not_remove_info", Message: "Failed to remove device info: " + err.Error(), }) } _ = q.DeleteDeviceAuth(timeout, event.ID) err = q.DeleteDeviceAliasByID(timeout, event.ID) if err != nil { bus.RunEvent(events.Log{ Level: "error", Code: "database_could_not_remove_aliases", Message: "Failed to remove device aliases: " + err.Error(), }) } err = q.DeleteDeviceAliasByIDLike(timeout, event.ID+":%") if err != nil { bus.RunEvent(events.Log{ Level: "error", Code: "database_could_not_remove_sub_aliases", Message: "Failed to remove sub-device aliases: " + err.Error(), }) } } }
func (d *database) HandleCommand(*lucifer3.EventBus, lucifer3.Command) {}
func Connect(host string, port int, username, password, dbname string) (lucifer3.ActiveService, error) { db, err := sql.Open("mysql", fmt.Sprintf( "%s:%s@(%s:%d)/%s?parseTime=true", username, password, host, port, dbname, )) if err != nil { return nil, err }
db.SetMaxOpenConns(10) db.SetMaxIdleConns(10) db.SetConnMaxIdleTime(time.Minute)
err = db.Ping() if err != nil { return nil, err }
return &database{db: db}, nil }
|