GraphQL API and utilities for the rpdata project
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

180 lines
3.9 KiB

  1. package postgres
  2. import (
  3. "context"
  4. "database/sql"
  5. "encoding/json"
  6. "errors"
  7. "fmt"
  8. "git.aiterp.net/rpdata/api/database/postgres/psqlcore"
  9. "git.aiterp.net/rpdata/api/models"
  10. "strconv"
  11. "time"
  12. )
  13. type changeRepository struct {
  14. insertWithIDs bool
  15. db *sql.DB
  16. }
  17. func (r *changeRepository) Find(ctx context.Context, id string) (*models.Change, error) {
  18. row, err := psqlcore.New(r.db).SelectChangeByID(ctx, id)
  19. if err != nil {
  20. return nil, err
  21. }
  22. return r.change(row)
  23. }
  24. func (r *changeRepository) List(ctx context.Context, filter models.ChangeFilter) ([]*models.Change, error) {
  25. params := psqlcore.SelectChangesParams{LimitSize: 0}
  26. if filter.Keys != nil {
  27. params.FilterKeys = true
  28. keys := make([]string, len(filter.Keys))
  29. for i, ck := range filter.Keys {
  30. keys[i] = ck.String()
  31. }
  32. params.Keys = keys
  33. }
  34. if filter.Author != nil {
  35. params.FilterAuthor = true
  36. params.Author = *filter.Author
  37. }
  38. if filter.EarliestDate != nil {
  39. params.FilterEarliestDate = true
  40. params.EarliestDate = filter.EarliestDate.In(time.UTC)
  41. }
  42. if filter.LatestDate != nil {
  43. params.FilterLatestDate = true
  44. params.LatestDate = filter.LatestDate.In(time.UTC)
  45. }
  46. if filter.Limit != nil && *filter.Limit <= 0 {
  47. params.LimitSize = int32(*filter.Limit)
  48. }
  49. rows, err := psqlcore.New(r.db).SelectChanges(ctx, params)
  50. if err != nil {
  51. return nil, err
  52. }
  53. return r.changes(rows)
  54. }
  55. func (r *changeRepository) Insert(ctx context.Context, change models.Change) (*models.Change, error) {
  56. tx, err := r.db.BeginTx(ctx, nil)
  57. if err != nil {
  58. return nil, err
  59. }
  60. defer func() { _ = tx.Rollback() }()
  61. q := psqlcore.New(tx)
  62. if !r.insertWithIDs || change.ID == "" {
  63. next, err := q.IncrementCounter(ctx, "data_change_id")
  64. if err != nil {
  65. return nil, err
  66. }
  67. change.ID = fmt.Sprintf("Change_%d", next)
  68. } else {
  69. if len(change.ID) < 8 {
  70. return nil, errors.New("invalid change id")
  71. }
  72. n, err := strconv.Atoi(change.ID[7:])
  73. if err != nil {
  74. return nil, err
  75. }
  76. err = q.BumpCounter(ctx, psqlcore.BumpCounterParams{ID: "data_change_id", Value: int32(n)})
  77. if err != nil {
  78. return nil, err
  79. }
  80. }
  81. data, err := r.dataChange(change)
  82. if err != nil {
  83. return nil, err
  84. }
  85. err = q.InsertChange(ctx, psqlcore.InsertChangeParams(*data))
  86. if err != nil {
  87. return nil, err
  88. }
  89. return &change, tx.Commit()
  90. }
  91. func (r *changeRepository) Remove(ctx context.Context, change models.Change) error {
  92. return psqlcore.New(r.db).DeleteChange(ctx, change.ID)
  93. }
  94. func (r *changeRepository) dataChange(change models.Change) (*psqlcore.DataChange, error) {
  95. objectsJSON, err := json.Marshal(change.ChangeObjectSet)
  96. if err != nil {
  97. return nil, err
  98. }
  99. keys := make([]string, len(change.Keys))
  100. for i, ck := range change.Keys {
  101. keys[i] = ck.String()
  102. }
  103. return &psqlcore.DataChange{
  104. ID: change.ID,
  105. Model: string(change.Model),
  106. Op: change.Op,
  107. Author: change.Author,
  108. Listed: change.Listed,
  109. Date: change.Date.In(time.UTC),
  110. Keys: keys,
  111. Objects: objectsJSON,
  112. }, nil
  113. }
  114. func (r *changeRepository) change(row psqlcore.DataChange) (*models.Change, error) {
  115. objects := models.ChangeObjectSet{}
  116. err := json.Unmarshal(row.Objects, &objects)
  117. if err != nil {
  118. return nil, err
  119. }
  120. keys := make([]models.ChangeKey, len(row.Keys))
  121. for i := range keys {
  122. err := keys[i].Decode(row.Keys[i])
  123. if err != nil {
  124. return nil, err
  125. }
  126. }
  127. model := models.ChangeModel(row.Model)
  128. if !model.IsValid() {
  129. return nil, errors.New("invalid model")
  130. }
  131. return &models.Change{
  132. ID: row.ID,
  133. Model: model,
  134. Op: row.Op,
  135. Author: row.Author,
  136. Listed: row.Listed,
  137. Date: row.Date,
  138. Keys: keys,
  139. ChangeObjectSet: objects,
  140. }, nil
  141. }
  142. func (r *changeRepository) changes(rows []psqlcore.DataChange) ([]*models.Change, error) {
  143. results := make([]*models.Change, len(rows))
  144. for i, row := range rows {
  145. change, err := r.change(row)
  146. if err != nil {
  147. return nil, err
  148. }
  149. results[i] = change
  150. }
  151. return results, nil
  152. }