You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

321 lines
7.5 KiB

package mysqldb
import (
"context"
"database/sql"
"encoding/json"
"fmt"
lucifer3 "git.aiterp.net/lucifer3/server"
"git.aiterp.net/lucifer3/server/commands"
"git.aiterp.net/lucifer3/server/effects"
"git.aiterp.net/lucifer3/server/events"
"git.aiterp.net/lucifer3/server/internal/gentools"
"git.aiterp.net/lucifer3/server/services/mysqldb/mysqlgen"
"time"
_ "github.com/go-sql-driver/mysql"
)
type database struct {
db *sql.DB
}
func (d *database) Active() bool {
return true
}
func (d *database) HandleEvent(bus *lucifer3.EventBus, event lucifer3.Event) {
q := mysqlgen.New(d.db)
timeout, cancel := context.WithTimeout(context.Background(), time.Second/2)
defer cancel()
switch event := event.(type) {
case events.Started:
timeout, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
// Fetch all aliases first
aliases, err := q.ListDeviceAliases(timeout)
if err != nil {
bus.RunEvent(events.Log{
Level: "error",
Code: "database_could_not_list_aliases",
Message: "Database could not list aliases: " + err.Error(),
})
}
auths, err := q.ListDeviceAuth(timeout)
if err != nil {
bus.RunEvent(events.Log{
Level: "error",
Code: "database_could_not_list_device_infos",
Message: "Hardware State serialization failed: " + err.Error(),
})
}
for _, auth := range auths {
bus.RunCommand(commands.ConnectDevice{
ID: auth.ID,
APIKey: auth.ApiKey,
})
}
infos, err := q.ListDeviceInfos(timeout)
if err != nil {
bus.RunEvent(events.Log{
Level: "error",
Code: "database_could_not_list_device_infos",
Message: "Hardware State serialization failed: " + err.Error(),
})
}
for _, info := range infos {
switch info.Kind {
case "hardware_state":
staleEvent := events.HardwareState{}
err := json.Unmarshal(info.Data, &staleEvent)
if err == nil {
bus.RunEvent(staleEvent)
}
case "hardware_metadata":
staleEvent := events.HardwareMetadata{}
err := json.Unmarshal(info.Data, &staleEvent)
if err == nil {
bus.RunEvent(staleEvent)
}
}
}
// Run the aliases now
for _, alias := range aliases {
bus.RunCommand(commands.AddAlias{
Match: alias.ID,
Alias: alias.Alias,
})
}
assignments, err := q.ListDeviceAssignments(timeout)
if err != nil {
bus.RunEvent(events.Log{
Level: "error",
Code: "database_could_not_list_assignments",
Message: "Database could not list assignments: " + err.Error(),
})
}
for _, ass := range assignments {
effect := effects.Serializable{}
err := json.Unmarshal(ass.Effect, &effect)
if err != nil {
bus.RunEvent(events.Log{
Level: "error",
Code: "database_could_not_deserialize_effect",
Message: err.Error(),
})
continue
}
bus.RunCommand(commands.Assign{
ID: gentools.ShallowCopy(&ass.ID),
Match: ass.Match,
Effect: effect.Effect,
})
}
case events.DeviceAccepted:
if event.Extras == nil {
event.Extras = map[string]string{}
}
extras, err := json.Marshal(event.Extras)
if err != nil {
extras = []byte("{}")
}
err = q.ReplaceDeviceAuth(timeout, mysqlgen.ReplaceDeviceAuthParams{
ID: event.ID,
ApiKey: event.APIKey,
Extras: extras,
})
if err != nil {
bus.RunEvent(events.Log{
Level: "error",
Code: "database_could_not_save_device_auth",
Message: "Device auth save failed: " + err.Error(),
})
}
case events.HardwareState:
if event.Stale {
return
}
event.Stale = true
serialized, err := json.Marshal(event)
if err != nil {
bus.RunEvent(events.Log{
Level: "error",
Code: "database_could_not_serialize_hardware_state",
Message: "Hardware State serialization failed: " + err.Error(),
})
return
}
err = q.ReplaceDeviceInfo(timeout, mysqlgen.ReplaceDeviceInfoParams{
ID: event.ID,
Kind: "hardware_state",
Data: serialized,
})
if err != nil {
bus.RunEvent(events.Log{
Level: "error",
Code: "database_could_not_save_hardware_state",
Message: err.Error(),
})
return
}
case events.HardwareMetadata:
if event.Stale {
return
}
event.Stale = true
serialized, err := json.Marshal(event)
if err != nil {
bus.RunEvent(events.Log{
Level: "error",
Code: "database_could_not_serialize_hardware_state",
Message: "Hardware State serialization failed: " + err.Error(),
})
return
}
err = q.ReplaceDeviceInfo(timeout, mysqlgen.ReplaceDeviceInfoParams{
ID: event.ID,
Kind: "hardware_metadata",
Data: serialized,
})
if err != nil {
bus.RunEvent(events.Log{
Level: "error",
Code: "database_could_not_save_hardware_state",
Message: err.Error(),
})
return
}
case events.AssignmentCreated:
serialized, err := json.Marshal(&effects.Serializable{Effect: event.Effect})
if err != nil {
bus.RunEvent(events.Log{
Level: "error",
Code: "database_could_not_serialize_effect",
Message: err.Error(),
})
return
}
err = q.ReplaceDeviceAssignment(timeout, mysqlgen.ReplaceDeviceAssignmentParams{
ID: event.ID,
CreatedDate: time.Now().UTC(),
Match: event.Match,
Effect: serialized,
})
if err != nil {
bus.RunEvent(events.Log{
Level: "error",
Code: "database_could_not_save_assignment",
Message: err.Error(),
})
return
}
case events.AssignmentRemoved:
err := q.DeleteDeviceAssignment(timeout, event.ID)
if err != nil {
bus.RunEvent(events.Log{
Level: "error",
Code: "database_could_not_remove_assignment",
Message: err.Error(),
})
return
}
case events.AliasAdded:
err := q.InsertDeviceAlias(timeout, mysqlgen.InsertDeviceAliasParams{
ID: event.ID,
Alias: event.Alias,
})
if err != nil {
bus.RunEvent(events.Log{
Level: "error",
Code: "database_could_not_remove_assignment",
Message: err.Error(),
})
return
}
case events.AliasRemoved:
err := q.DeleteDeviceAlias(timeout, mysqlgen.DeleteDeviceAliasParams{
ID: event.ID,
Alias: event.Alias,
})
if err != nil {
bus.RunEvent(events.Log{
Level: "error",
Code: "database_could_not_remove_assignment",
Message: err.Error(),
})
return
}
case events.DeviceForgotten:
err := q.DeleteDeviceInfoByID(timeout, event.ID)
if err != nil {
bus.RunEvent(events.Log{
Level: "error",
Code: "database_could_not_remove_info",
Message: "Failed to remove device info: " + err.Error(),
})
}
_ = q.DeleteDeviceAuth(timeout, event.ID)
err = q.DeleteDeviceAliasByID(timeout, event.ID)
if err != nil {
bus.RunEvent(events.Log{
Level: "error",
Code: "database_could_not_remove_aliases",
Message: "Failed to remove device aliases: " + err.Error(),
})
}
err = q.DeleteDeviceAliasByIDLike(timeout, event.ID+":%")
if err != nil {
bus.RunEvent(events.Log{
Level: "error",
Code: "database_could_not_remove_sub_aliases",
Message: "Failed to remove sub-device aliases: " + err.Error(),
})
}
}
}
func (d *database) HandleCommand(*lucifer3.EventBus, lucifer3.Command) {}
func Connect(host string, port int, username, password, dbname string) (lucifer3.ActiveService, error) {
db, err := sql.Open("mysql", fmt.Sprintf(
"%s:%s@(%s:%d)/%s?parseTime=true", username, password, host, port, dbname,
))
if err != nil {
return nil, err
}
db.SetMaxOpenConns(10)
db.SetMaxIdleConns(10)
db.SetConnMaxIdleTime(time.Minute)
err = db.Ping()
if err != nil {
return nil, err
}
return &database{db: db}, nil
}