You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

478 lines
12 KiB

2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
  1. package mysqldb
  2. import (
  3. "context"
  4. "database/sql"
  5. "encoding/json"
  6. "fmt"
  7. lucifer3 "git.aiterp.net/lucifer3/server"
  8. "git.aiterp.net/lucifer3/server/commands"
  9. "git.aiterp.net/lucifer3/server/effects"
  10. "git.aiterp.net/lucifer3/server/events"
  11. "git.aiterp.net/lucifer3/server/internal/gentools"
  12. "git.aiterp.net/lucifer3/server/services/mysqldb/mysqlgen"
  13. "git.aiterp.net/lucifer3/server/services/script"
  14. "strings"
  15. "time"
  16. _ "github.com/go-sql-driver/mysql"
  17. )
  18. type database struct {
  19. db *sql.DB
  20. scriptVariables map[string]string
  21. }
  22. func (d *database) Active() bool {
  23. return true
  24. }
  25. func (d *database) HandleEvent(bus *lucifer3.EventBus, event lucifer3.Event) {
  26. q := mysqlgen.New(d.db)
  27. timeout, cancel := context.WithTimeout(context.Background(), time.Second*5)
  28. defer cancel()
  29. switch event := event.(type) {
  30. case events.Started:
  31. timeout, cancel := context.WithTimeout(context.Background(), time.Second*10)
  32. defer cancel()
  33. // Fetch all aliases first
  34. aliases, err := q.ListDeviceAliases(timeout)
  35. if err != nil {
  36. bus.RunEvent(events.Log{
  37. Level: "error",
  38. Code: "database_could_not_list_aliases",
  39. Message: "Database could not list aliases: " + err.Error(),
  40. })
  41. }
  42. auths, err := q.ListDeviceAuth(timeout)
  43. if err != nil {
  44. bus.RunEvent(events.Log{
  45. Level: "error",
  46. Code: "database_could_not_list_device_infos",
  47. Message: "Hardware State serialization failed: " + err.Error(),
  48. })
  49. }
  50. for _, auth := range auths {
  51. bus.RunCommand(commands.ConnectDevice{
  52. ID: auth.ID,
  53. APIKey: auth.ApiKey,
  54. })
  55. }
  56. infos, err := q.ListDeviceInfos(timeout)
  57. if err != nil {
  58. bus.RunEvent(events.Log{
  59. Level: "error",
  60. Code: "database_could_not_list_device_infos",
  61. Message: "Hardware State serialization failed: " + err.Error(),
  62. })
  63. }
  64. for _, info := range infos {
  65. switch info.Kind {
  66. case "hardware_state":
  67. staleEvent := events.HardwareState{}
  68. err := json.Unmarshal(info.Data, &staleEvent)
  69. if err == nil {
  70. bus.RunEvent(staleEvent)
  71. }
  72. case "hardware_metadata":
  73. staleEvent := events.HardwareMetadata{}
  74. err := json.Unmarshal(info.Data, &staleEvent)
  75. if err == nil {
  76. bus.RunEvent(staleEvent)
  77. }
  78. }
  79. }
  80. // Run the aliases now
  81. for _, alias := range aliases {
  82. bus.RunCommand(commands.AddAlias{
  83. Match: alias.ID,
  84. Alias: alias.Alias,
  85. })
  86. }
  87. assignments, err := q.ListDeviceAssignments(timeout)
  88. if err != nil {
  89. bus.RunEvent(events.Log{
  90. Level: "error",
  91. Code: "database_could_not_list_assignments",
  92. Message: "Database could not list assignments: " + err.Error(),
  93. })
  94. }
  95. for _, ass := range assignments {
  96. effect := effects.Serializable{}
  97. err := json.Unmarshal(ass.Effect, &effect)
  98. if err != nil {
  99. bus.RunEvent(events.Log{
  100. Level: "error",
  101. Code: "database_could_not_deserialize_effect",
  102. Message: err.Error(),
  103. })
  104. continue
  105. }
  106. bus.RunCommand(commands.Assign{
  107. ID: gentools.ShallowCopy(&ass.ID),
  108. Match: ass.Match,
  109. Effect: effect.Effect,
  110. })
  111. }
  112. scripts, err := q.ListScripts(timeout)
  113. if err != nil {
  114. bus.RunEvent(events.Log{
  115. Level: "error",
  116. Code: "database_could_not_list_scripts",
  117. Message: "Database could not list scripts: " + err.Error(),
  118. })
  119. }
  120. for _, scr := range scripts {
  121. var lines []script.Line
  122. err := json.Unmarshal(scr.Data, &lines)
  123. if err != nil {
  124. bus.RunEvent(events.Log{
  125. Level: "error",
  126. Code: "database_could_not_read_script",
  127. Message: "Database could not read script: " + err.Error(),
  128. })
  129. }
  130. bus.RunCommand(script.Update{
  131. Name: scr.Name,
  132. Lines: lines,
  133. })
  134. }
  135. scriptVariables, err := q.ListScriptVariables(timeout)
  136. if err != nil {
  137. bus.RunEvent(events.Log{
  138. Level: "error",
  139. Code: "database_could_not_list_script_variables",
  140. Message: "Database could not list script variables: " + err.Error(),
  141. })
  142. }
  143. for _, variable := range scriptVariables {
  144. d.scriptVariables[variable.Scope+"::"+variable.Name] = variable.Value
  145. cmd := script.SetVariable{
  146. Key: variable.Name,
  147. Value: variable.Value,
  148. }
  149. scopeKind, scopeValue, _ := strings.Cut(variable.Scope, ":")
  150. switch scopeKind {
  151. case "devices":
  152. cmd.Devices = strings.Split(scopeValue, ";")
  153. case "match":
  154. cmd.Match = gentools.Ptr(scopeValue)
  155. }
  156. bus.RunCommand(cmd)
  157. }
  158. scriptTriggers, err := q.ListScriptTriggers(timeout)
  159. if err != nil {
  160. bus.RunEvent(events.Log{
  161. Level: "error",
  162. Code: "database_could_not_list_script_triggers",
  163. Message: "Database could not list script triggers: " + err.Error(),
  164. })
  165. }
  166. for _, trig := range scriptTriggers {
  167. bus.RunCommand(script.UpdateTrigger{
  168. ID: trig.ID,
  169. Event: script.TriggerEvent(trig.Event),
  170. DeviceMatch: trig.DeviceMatch,
  171. Parameter: trig.Parameter,
  172. ScriptTarget: trig.ScriptTarget,
  173. ScriptName: trig.ScriptName,
  174. ScriptPre: fromJSON[[]script.Line](trig.ScriptPre),
  175. ScriptPost: fromJSON[[]script.Line](trig.ScriptPost),
  176. })
  177. }
  178. case events.DeviceAccepted:
  179. if event.Extras == nil {
  180. event.Extras = map[string]string{}
  181. }
  182. extras, err := json.Marshal(event.Extras)
  183. if err != nil {
  184. extras = []byte("{}")
  185. }
  186. err = q.ReplaceDeviceAuth(timeout, mysqlgen.ReplaceDeviceAuthParams{
  187. ID: event.ID,
  188. ApiKey: event.APIKey,
  189. Extras: extras,
  190. })
  191. if err != nil {
  192. bus.RunEvent(events.Log{
  193. Level: "error",
  194. Code: "database_could_not_save_device_auth",
  195. Message: "Device auth save failed: " + err.Error(),
  196. })
  197. }
  198. case events.HardwareState:
  199. if event.Stale {
  200. return
  201. }
  202. event.Stale = true
  203. serialized, err := json.Marshal(event)
  204. if err != nil {
  205. bus.RunEvent(events.Log{
  206. Level: "error",
  207. Code: "database_could_not_serialize_hardware_state",
  208. Message: "Hardware State serialization failed: " + err.Error(),
  209. })
  210. return
  211. }
  212. err = q.ReplaceDeviceInfo(timeout, mysqlgen.ReplaceDeviceInfoParams{
  213. ID: event.ID,
  214. Kind: "hardware_state",
  215. Data: serialized,
  216. })
  217. if err != nil {
  218. bus.RunEvent(events.Log{
  219. Level: "error",
  220. Code: "database_could_not_save_hardware_state",
  221. Message: err.Error(),
  222. })
  223. return
  224. }
  225. case events.HardwareMetadata:
  226. if event.Stale {
  227. return
  228. }
  229. event.Stale = true
  230. serialized, err := json.Marshal(event)
  231. if err != nil {
  232. bus.RunEvent(events.Log{
  233. Level: "error",
  234. Code: "database_could_not_serialize_hardware_state",
  235. Message: "Hardware State serialization failed: " + err.Error(),
  236. })
  237. return
  238. }
  239. err = q.ReplaceDeviceInfo(timeout, mysqlgen.ReplaceDeviceInfoParams{
  240. ID: event.ID,
  241. Kind: "hardware_metadata",
  242. Data: serialized,
  243. })
  244. if err != nil {
  245. bus.RunEvent(events.Log{
  246. Level: "error",
  247. Code: "database_could_not_save_hardware_state",
  248. Message: err.Error(),
  249. })
  250. return
  251. }
  252. case events.AssignmentCreated:
  253. serialized, err := json.Marshal(&effects.Serializable{Effect: event.Effect})
  254. if err != nil {
  255. bus.RunEvent(events.Log{
  256. Level: "error",
  257. Code: "database_could_not_serialize_effect",
  258. Message: err.Error(),
  259. })
  260. return
  261. }
  262. err = q.ReplaceDeviceAssignment(timeout, mysqlgen.ReplaceDeviceAssignmentParams{
  263. ID: event.ID,
  264. CreatedDate: time.Now().UTC(),
  265. Match: event.Match,
  266. Effect: serialized,
  267. })
  268. if err != nil {
  269. bus.RunEvent(events.Log{
  270. Level: "error",
  271. Code: "database_could_not_save_assignment",
  272. Message: err.Error(),
  273. })
  274. return
  275. }
  276. case events.AssignmentRemoved:
  277. err := q.DeleteDeviceAssignment(timeout, event.ID)
  278. if err != nil {
  279. bus.RunEvent(events.Log{
  280. Level: "error",
  281. Code: "database_could_not_remove_assignment",
  282. Message: err.Error(),
  283. })
  284. return
  285. }
  286. case events.AliasAdded:
  287. err := q.InsertDeviceAlias(timeout, mysqlgen.InsertDeviceAliasParams{
  288. ID: event.ID,
  289. Alias: event.Alias,
  290. })
  291. if err != nil {
  292. bus.RunEvent(events.Log{
  293. Level: "error",
  294. Code: "database_could_not_remove_assignment",
  295. Message: err.Error(),
  296. })
  297. return
  298. }
  299. case events.AliasRemoved:
  300. err := q.DeleteDeviceAlias(timeout, mysqlgen.DeleteDeviceAliasParams{
  301. ID: event.ID,
  302. Alias: event.Alias,
  303. })
  304. if err != nil {
  305. bus.RunEvent(events.Log{
  306. Level: "error",
  307. Code: "database_could_not_remove_assignment",
  308. Message: err.Error(),
  309. })
  310. return
  311. }
  312. case events.DeviceForgotten:
  313. err := q.DeleteDeviceInfoByID(timeout, event.ID)
  314. if err != nil {
  315. bus.RunEvent(events.Log{
  316. Level: "error",
  317. Code: "database_could_not_remove_info",
  318. Message: "Failed to remove device info: " + err.Error(),
  319. })
  320. }
  321. _ = q.DeleteDeviceAuth(timeout, event.ID)
  322. err = q.DeleteDeviceAliasByID(timeout, event.ID)
  323. if err != nil {
  324. bus.RunEvent(events.Log{
  325. Level: "error",
  326. Code: "database_could_not_remove_aliases",
  327. Message: "Failed to remove device aliases: " + err.Error(),
  328. })
  329. }
  330. err = q.DeleteDeviceAliasByIDLike(timeout, event.ID+":%")
  331. if err != nil {
  332. bus.RunEvent(events.Log{
  333. Level: "error",
  334. Code: "database_could_not_remove_sub_aliases",
  335. Message: "Failed to remove sub-device aliases: " + err.Error(),
  336. })
  337. }
  338. }
  339. }
  340. func (d *database) HandleCommand(bus *lucifer3.EventBus, command lucifer3.Command) {
  341. q := mysqlgen.New(d.db)
  342. timeout, cancel := context.WithTimeout(context.Background(), time.Second/2)
  343. defer cancel()
  344. switch command := command.(type) {
  345. case script.UpdateTrigger:
  346. err := q.ReplaceScriptTrigger(timeout, mysqlgen.ReplaceScriptTriggerParams{
  347. ID: command.ID,
  348. Event: string(command.Event),
  349. DeviceMatch: command.DeviceMatch,
  350. Parameter: command.Parameter,
  351. ScriptTarget: command.ScriptTarget,
  352. ScriptName: command.ScriptName,
  353. ScriptPre: toJSON(command.ScriptPre),
  354. ScriptPost: toJSON(command.ScriptPost),
  355. })
  356. if err != nil {
  357. bus.RunEvent(events.Log{
  358. Level: "error",
  359. Code: "database_could_not_save_trigger",
  360. Message: "Failed to save trigger: " + err.Error(),
  361. })
  362. }
  363. case script.DeleteTrigger:
  364. err := q.DeleteScriptTrigger(timeout, command.ID)
  365. if err != nil {
  366. bus.RunEvent(events.Log{
  367. Level: "error",
  368. Code: "database_could_not_delete_trigger",
  369. Message: "Failed to delete trigger: " + err.Error(),
  370. })
  371. }
  372. case script.SetVariable:
  373. scopeName := "global"
  374. if command.Match != nil {
  375. scopeName = "match:" + *command.Match
  376. } else if command.Devices != nil {
  377. scopeName = "devices:" + strings.Join(command.Devices, ";")
  378. }
  379. if d.scriptVariables[scopeName+"::"+command.Key] != command.Value {
  380. d.scriptVariables[scopeName+"::"+command.Key] = command.Value
  381. err := q.UpdateScriptVariables(timeout, mysqlgen.UpdateScriptVariablesParams{
  382. Scope: scopeName,
  383. Name: command.Key,
  384. Value: command.Value,
  385. })
  386. if err != nil {
  387. bus.RunEvent(events.Log{
  388. Level: "error",
  389. Code: "database_could_not_save_variables",
  390. Message: "Failed to save variable: " + err.Error(),
  391. })
  392. }
  393. }
  394. case script.Update:
  395. j, _ := json.Marshal(command.Lines)
  396. err := q.SaveScript(timeout, mysqlgen.SaveScriptParams{
  397. Name: command.Name,
  398. Data: j,
  399. })
  400. if err != nil {
  401. bus.RunEvent(events.Log{
  402. Level: "error",
  403. Code: "database_could_not_save_script",
  404. Message: "Failed to save script: " + err.Error(),
  405. })
  406. }
  407. }
  408. }
  409. func fromJSON[T any](j []byte) T {
  410. var t T
  411. _ = json.Unmarshal(j, &t)
  412. return t
  413. }
  414. func toJSON[T any](v T) json.RawMessage {
  415. j, _ := json.Marshal(v)
  416. return j
  417. }
  418. func Connect(host string, port int, username, password, dbname string) (lucifer3.ActiveService, error) {
  419. db, err := sql.Open("mysql", fmt.Sprintf(
  420. "%s:%s@(%s:%d)/%s?parseTime=true", username, password, host, port, dbname,
  421. ))
  422. if err != nil {
  423. return nil, err
  424. }
  425. db.SetMaxOpenConns(10)
  426. db.SetMaxIdleConns(10)
  427. db.SetConnMaxIdleTime(time.Minute)
  428. err = db.Ping()
  429. if err != nil {
  430. return nil, err
  431. }
  432. return &database{db: db, scriptVariables: make(map[string]string)}, nil
  433. }