You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

491 lines
12 KiB

2 years ago
2 years ago
2 years ago
2 years ago
1 year ago
2 years ago
1 year ago
2 years ago
1 year ago
1 year ago
1 year ago
2 years ago
2 years ago
2 years ago
  1. package mysqldb
  2. import (
  3. "context"
  4. "database/sql"
  5. "encoding/json"
  6. "fmt"
  7. lucifer3 "git.aiterp.net/lucifer3/server"
  8. "git.aiterp.net/lucifer3/server/commands"
  9. "git.aiterp.net/lucifer3/server/effects"
  10. "git.aiterp.net/lucifer3/server/events"
  11. "git.aiterp.net/lucifer3/server/internal/gentools"
  12. "git.aiterp.net/lucifer3/server/services/mysqldb/mysqlgen"
  13. "git.aiterp.net/lucifer3/server/services/script"
  14. "strings"
  15. "time"
  16. _ "github.com/go-sql-driver/mysql"
  17. )
  18. type database struct {
  19. db *sql.DB
  20. scriptVariables map[string]string
  21. }
  22. func (d *database) Active() bool {
  23. return true
  24. }
  25. func (d *database) HandleEvent(bus *lucifer3.EventBus, event lucifer3.Event) {
  26. q := mysqlgen.New(d.db)
  27. timeout, cancel := context.WithTimeout(context.Background(), time.Second*5)
  28. defer cancel()
  29. switch event := event.(type) {
  30. case events.Started:
  31. timeout, cancel := context.WithTimeout(context.Background(), time.Second*90)
  32. defer cancel()
  33. // Fetch all aliases first
  34. aliases, err := q.ListDeviceAliases(timeout)
  35. if err != nil {
  36. bus.RunEvent(events.Log{
  37. Level: "error",
  38. Code: "database_could_not_list_aliases",
  39. Message: "Database could not list aliases: " + err.Error(),
  40. })
  41. }
  42. auths, err := q.ListDeviceAuth(timeout)
  43. if err != nil {
  44. bus.RunEvent(events.Log{
  45. Level: "error",
  46. Code: "database_could_not_list_device_infos",
  47. Message: "Hardware State serialization failed: " + err.Error(),
  48. })
  49. }
  50. for _, auth := range auths {
  51. bus.RunCommand(commands.ConnectDevice{
  52. ID: auth.ID,
  53. APIKey: auth.ApiKey,
  54. })
  55. }
  56. infos, err := q.ListDeviceInfos(timeout)
  57. if err != nil {
  58. bus.RunEvent(events.Log{
  59. Level: "error",
  60. Code: "database_could_not_list_device_infos",
  61. Message: "Hardware State serialization failed: " + err.Error(),
  62. })
  63. }
  64. for _, info := range infos {
  65. switch info.Kind {
  66. case "hardware_state":
  67. staleEvent := events.HardwareState{}
  68. err := json.Unmarshal(info.Data, &staleEvent)
  69. if err == nil {
  70. bus.RunEvent(staleEvent)
  71. }
  72. case "hardware_metadata":
  73. staleEvent := events.HardwareMetadata{}
  74. err := json.Unmarshal(info.Data, &staleEvent)
  75. if err == nil {
  76. bus.RunEvent(staleEvent)
  77. }
  78. }
  79. }
  80. // Run the aliases now
  81. for _, alias := range aliases {
  82. bus.RunCommand(commands.AddAlias{
  83. Match: alias.ID,
  84. Alias: alias.Alias,
  85. })
  86. }
  87. assignments, err := q.ListDeviceAssignments(timeout)
  88. if err != nil {
  89. bus.RunEvent(events.Log{
  90. Level: "error",
  91. Code: "database_could_not_list_assignments",
  92. Message: "Database could not list assignments: " + err.Error(),
  93. })
  94. }
  95. for _, ass := range assignments {
  96. effect := effects.Serializable{}
  97. err := json.Unmarshal(ass.Effect, &effect)
  98. if err != nil {
  99. bus.RunEvent(events.Log{
  100. Level: "error",
  101. Code: "database_could_not_deserialize_effect",
  102. Message: err.Error(),
  103. })
  104. continue
  105. }
  106. bus.RunCommand(commands.Assign{
  107. ID: gentools.ShallowCopy(&ass.ID),
  108. Match: ass.Match,
  109. Effect: effect.Effect,
  110. })
  111. }
  112. scripts, err := q.ListScripts(timeout)
  113. if err != nil {
  114. bus.RunEvent(events.Log{
  115. Level: "error",
  116. Code: "database_could_not_list_scripts",
  117. Message: "Database could not list scripts: " + err.Error(),
  118. })
  119. }
  120. for _, scr := range scripts {
  121. var lines []script.Line
  122. err := json.Unmarshal(scr.Data, &lines)
  123. if err != nil {
  124. bus.RunEvent(events.Log{
  125. Level: "error",
  126. Code: "database_could_not_read_script",
  127. Message: "Database could not read script: " + err.Error(),
  128. })
  129. }
  130. bus.RunCommand(script.Update{
  131. Name: scr.Name,
  132. Lines: lines,
  133. })
  134. }
  135. scriptVariables, err := q.ListScriptVariables(timeout)
  136. if err != nil {
  137. bus.RunEvent(events.Log{
  138. Level: "error",
  139. Code: "database_could_not_list_script_variables",
  140. Message: "Database could not list script variables: " + err.Error(),
  141. })
  142. }
  143. for _, variable := range scriptVariables {
  144. d.scriptVariables[variable.Scope+"::"+variable.Name] = variable.Value
  145. cmd := script.SetVariable{
  146. Key: variable.Name,
  147. Value: variable.Value,
  148. }
  149. scopeKind, scopeValue, _ := strings.Cut(variable.Scope, ":")
  150. switch scopeKind {
  151. case "devices":
  152. cmd.Devices = strings.Split(scopeValue, ";")
  153. case "match":
  154. cmd.Match = gentools.Ptr(scopeValue)
  155. }
  156. bus.RunCommand(cmd)
  157. }
  158. scriptTriggers, err := q.ListScriptTriggers(timeout)
  159. if err != nil {
  160. bus.RunEvent(events.Log{
  161. Level: "error",
  162. Code: "database_could_not_list_script_triggers",
  163. Message: "Database could not list script triggers: " + err.Error(),
  164. })
  165. }
  166. for _, trig := range scriptTriggers {
  167. bus.RunCommand(script.UpdateTrigger{
  168. ID: trig.ID,
  169. Name: trig.Name,
  170. Event: script.TriggerEvent(trig.Event),
  171. DeviceMatch: trig.DeviceMatch,
  172. Parameter: trig.Parameter,
  173. ScriptTarget: trig.ScriptTarget,
  174. ScriptName: trig.ScriptName,
  175. ScriptPre: fromJSON[[]script.Line](trig.ScriptPre),
  176. ScriptPost: fromJSON[[]script.Line](trig.ScriptPost),
  177. })
  178. }
  179. case events.DeviceAccepted:
  180. if event.Extras == nil {
  181. event.Extras = map[string]string{}
  182. }
  183. extras, err := json.Marshal(event.Extras)
  184. if err != nil {
  185. extras = []byte("{}")
  186. }
  187. err = q.ReplaceDeviceAuth(timeout, mysqlgen.ReplaceDeviceAuthParams{
  188. ID: event.ID,
  189. ApiKey: event.APIKey,
  190. Extras: extras,
  191. })
  192. if err != nil {
  193. bus.RunEvent(events.Log{
  194. Level: "error",
  195. Code: "database_could_not_save_device_auth",
  196. Message: "Device auth save failed: " + err.Error(),
  197. })
  198. }
  199. case events.HardwareState:
  200. if event.Stale {
  201. return
  202. }
  203. event.Stale = true
  204. serialized, err := json.Marshal(event)
  205. if err != nil {
  206. bus.RunEvent(events.Log{
  207. Level: "error",
  208. Code: "database_could_not_serialize_hardware_state",
  209. Message: "Hardware State serialization failed: " + err.Error(),
  210. })
  211. return
  212. }
  213. err = q.ReplaceDeviceInfo(timeout, mysqlgen.ReplaceDeviceInfoParams{
  214. ID: event.ID,
  215. Kind: "hardware_state",
  216. Data: serialized,
  217. })
  218. if err != nil {
  219. bus.RunEvent(events.Log{
  220. Level: "error",
  221. Code: "database_could_not_save_hardware_state",
  222. Message: err.Error(),
  223. })
  224. return
  225. }
  226. case events.HardwareMetadata:
  227. if event.Stale {
  228. return
  229. }
  230. event.Stale = true
  231. serialized, err := json.Marshal(event)
  232. if err != nil {
  233. bus.RunEvent(events.Log{
  234. Level: "error",
  235. Code: "database_could_not_serialize_hardware_state",
  236. Message: "Hardware State serialization failed: " + err.Error(),
  237. })
  238. return
  239. }
  240. err = q.ReplaceDeviceInfo(timeout, mysqlgen.ReplaceDeviceInfoParams{
  241. ID: event.ID,
  242. Kind: "hardware_metadata",
  243. Data: serialized,
  244. })
  245. if err != nil {
  246. bus.RunEvent(events.Log{
  247. Level: "error",
  248. Code: "database_could_not_save_hardware_state",
  249. Message: err.Error(),
  250. })
  251. return
  252. }
  253. case events.AssignmentCreated:
  254. serialized, err := json.Marshal(&effects.Serializable{Effect: event.Effect})
  255. if err != nil {
  256. bus.RunEvent(events.Log{
  257. Level: "error",
  258. Code: "database_could_not_serialize_effect",
  259. Message: err.Error(),
  260. })
  261. return
  262. }
  263. err = q.ReplaceDeviceAssignment(timeout, mysqlgen.ReplaceDeviceAssignmentParams{
  264. ID: event.ID,
  265. CreatedDate: time.Now().UTC(),
  266. Match: event.Match,
  267. Effect: serialized,
  268. })
  269. if err != nil {
  270. bus.RunEvent(events.Log{
  271. Level: "error",
  272. Code: "database_could_not_save_assignment",
  273. Message: err.Error(),
  274. })
  275. return
  276. }
  277. case events.AssignmentRemoved:
  278. err := q.DeleteDeviceAssignment(timeout, event.ID)
  279. if err != nil {
  280. bus.RunEvent(events.Log{
  281. Level: "error",
  282. Code: "database_could_not_remove_assignment",
  283. Message: err.Error(),
  284. })
  285. return
  286. }
  287. case events.AliasAdded:
  288. err := q.InsertDeviceAlias(timeout, mysqlgen.InsertDeviceAliasParams{
  289. ID: event.ID,
  290. Alias: event.Alias,
  291. })
  292. if err != nil {
  293. bus.RunEvent(events.Log{
  294. Level: "error",
  295. Code: "database_could_not_remove_assignment",
  296. Message: err.Error(),
  297. })
  298. return
  299. }
  300. case events.AliasRemoved:
  301. err := q.DeleteDeviceAlias(timeout, mysqlgen.DeleteDeviceAliasParams{
  302. ID: event.ID,
  303. Alias: event.Alias,
  304. })
  305. if err != nil {
  306. bus.RunEvent(events.Log{
  307. Level: "error",
  308. Code: "database_could_not_remove_assignment",
  309. Message: err.Error(),
  310. })
  311. return
  312. }
  313. case events.DeviceForgotten:
  314. err := q.DeleteDeviceInfoByID(timeout, event.ID)
  315. if err != nil {
  316. bus.RunEvent(events.Log{
  317. Level: "error",
  318. Code: "database_could_not_remove_info",
  319. Message: "Failed to remove device info: " + err.Error(),
  320. })
  321. }
  322. _ = q.DeleteDeviceAuth(timeout, event.ID)
  323. err = q.DeleteDeviceAliasByID(timeout, event.ID)
  324. if err != nil {
  325. bus.RunEvent(events.Log{
  326. Level: "error",
  327. Code: "database_could_not_remove_aliases",
  328. Message: "Failed to remove device aliases: " + err.Error(),
  329. })
  330. }
  331. err = q.DeleteDeviceAliasByIDLike(timeout, event.ID+":%")
  332. if err != nil {
  333. bus.RunEvent(events.Log{
  334. Level: "error",
  335. Code: "database_could_not_remove_sub_aliases",
  336. Message: "Failed to remove sub-device aliases: " + err.Error(),
  337. })
  338. }
  339. }
  340. }
  341. func (d *database) HandleCommand(bus *lucifer3.EventBus, command lucifer3.Command) {
  342. q := mysqlgen.New(d.db)
  343. timeout, cancel := context.WithTimeout(context.Background(), time.Second/2)
  344. defer cancel()
  345. switch command := command.(type) {
  346. case script.UpdateTrigger:
  347. err := q.ReplaceScriptTrigger(timeout, mysqlgen.ReplaceScriptTriggerParams{
  348. ID: command.ID,
  349. Event: string(command.Event),
  350. DeviceMatch: command.DeviceMatch,
  351. Parameter: command.Parameter,
  352. ScriptTarget: command.ScriptTarget,
  353. ScriptName: command.ScriptName,
  354. ScriptPre: toJSON(command.ScriptPre),
  355. ScriptPost: toJSON(command.ScriptPost),
  356. Name: command.Name,
  357. })
  358. if err != nil {
  359. bus.RunEvent(events.Log{
  360. Level: "error",
  361. Code: "database_could_not_save_trigger",
  362. Message: "Failed to save trigger: " + err.Error(),
  363. })
  364. }
  365. case script.DeleteTrigger:
  366. err := q.DeleteScriptTrigger(timeout, command.ID)
  367. if err != nil {
  368. bus.RunEvent(events.Log{
  369. Level: "error",
  370. Code: "database_could_not_delete_trigger",
  371. Message: "Failed to delete trigger: " + err.Error(),
  372. })
  373. }
  374. case script.SetVariable:
  375. scopeName := "global"
  376. if command.Match != nil {
  377. scopeName = "match:" + *command.Match
  378. } else if command.Devices != nil {
  379. scopeName = "devices:" + strings.Join(command.Devices, ";")
  380. }
  381. if d.scriptVariables[scopeName+"::"+command.Key] != command.Value {
  382. d.scriptVariables[scopeName+"::"+command.Key] = command.Value
  383. err := q.UpdateScriptVariables(timeout, mysqlgen.UpdateScriptVariablesParams{
  384. Scope: scopeName,
  385. Name: command.Key,
  386. Value: command.Value,
  387. })
  388. if err != nil {
  389. bus.RunEvent(events.Log{
  390. Level: "error",
  391. Code: "database_could_not_save_variables",
  392. Message: "Failed to save variable: " + err.Error(),
  393. })
  394. }
  395. }
  396. case script.Update:
  397. j, _ := json.Marshal(command.Lines)
  398. if (len(command.Lines)) > 0 {
  399. err := q.SaveScript(timeout, mysqlgen.SaveScriptParams{
  400. Name: command.Name,
  401. Data: j,
  402. })
  403. if err != nil {
  404. bus.RunEvent(events.Log{
  405. Level: "error",
  406. Code: "database_could_not_save_script",
  407. Message: "Failed to save script: " + err.Error(),
  408. })
  409. }
  410. } else {
  411. err := q.DeleteScript(timeout, command.Name)
  412. if err != nil {
  413. bus.RunEvent(events.Log{
  414. Level: "error",
  415. Code: "database_could_not_delete_script",
  416. Message: "Failed to delete script: " + err.Error(),
  417. })
  418. }
  419. }
  420. }
  421. }
  422. func fromJSON[T any](j []byte) T {
  423. var t T
  424. _ = json.Unmarshal(j, &t)
  425. return t
  426. }
  427. func toJSON[T any](v T) json.RawMessage {
  428. j, _ := json.Marshal(v)
  429. return j
  430. }
  431. func Connect(host string, port int, username, password, dbname string) (lucifer3.ActiveService, error) {
  432. db, err := sql.Open("mysql", fmt.Sprintf(
  433. "%s:%s@(%s:%d)/%s?parseTime=true", username, password, host, port, dbname,
  434. ))
  435. if err != nil {
  436. return nil, err
  437. }
  438. db.SetMaxOpenConns(10)
  439. db.SetMaxIdleConns(10)
  440. db.SetConnMaxIdleTime(time.Minute)
  441. err = db.Ping()
  442. if err != nil {
  443. return nil, err
  444. }
  445. return &database{db: db, scriptVariables: make(map[string]string)}, nil
  446. }