GraphQL API and utilities for the rpdata project
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

180 lines
3.9 KiB

package postgres
import (
"context"
"database/sql"
"encoding/json"
"errors"
"fmt"
"git.aiterp.net/rpdata/api/database/postgres/psqlcore"
"git.aiterp.net/rpdata/api/models"
"strconv"
"time"
)
type changeRepository struct {
insertWithIDs bool
db *sql.DB
}
func (r *changeRepository) Find(ctx context.Context, id string) (*models.Change, error) {
row, err := psqlcore.New(r.db).SelectChangeByID(ctx, id)
if err != nil {
return nil, err
}
return r.change(row)
}
func (r *changeRepository) List(ctx context.Context, filter models.ChangeFilter) ([]*models.Change, error) {
params := psqlcore.SelectChangesParams{LimitSize: 0}
if filter.Keys != nil {
params.FilterKeys = true
keys := make([]string, len(filter.Keys))
for i, ck := range filter.Keys {
keys[i] = ck.String()
}
params.Keys = keys
}
if filter.Author != nil {
params.FilterAuthor = true
params.Author = *filter.Author
}
if filter.EarliestDate != nil {
params.FilterEarliestDate = true
params.EarliestDate = filter.EarliestDate.In(time.UTC)
}
if filter.LatestDate != nil {
params.FilterLatestDate = true
params.LatestDate = filter.LatestDate.In(time.UTC)
}
if filter.Limit != nil && *filter.Limit <= 0 {
params.LimitSize = int32(*filter.Limit)
}
rows, err := psqlcore.New(r.db).SelectChanges(ctx, params)
if err != nil {
return nil, err
}
return r.changes(rows)
}
func (r *changeRepository) Insert(ctx context.Context, change models.Change) (*models.Change, error) {
tx, err := r.db.BeginTx(ctx, nil)
if err != nil {
return nil, err
}
defer func() { _ = tx.Rollback() }()
q := psqlcore.New(tx)
if !r.insertWithIDs || change.ID == "" {
next, err := q.IncrementCounter(ctx, "data_change_id")
if err != nil {
return nil, err
}
change.ID = fmt.Sprintf("Change_%d", next)
} else {
if len(change.ID) < 8 {
return nil, errors.New("invalid change id")
}
n, err := strconv.Atoi(change.ID[7:])
if err != nil {
return nil, err
}
err = q.BumpCounter(ctx, psqlcore.BumpCounterParams{ID: "data_change_id", Value: int32(n)})
if err != nil {
return nil, err
}
}
data, err := r.dataChange(change)
if err != nil {
return nil, err
}
err = q.InsertChange(ctx, psqlcore.InsertChangeParams(*data))
if err != nil {
return nil, err
}
return &change, tx.Commit()
}
func (r *changeRepository) Remove(ctx context.Context, change models.Change) error {
return psqlcore.New(r.db).DeleteChange(ctx, change.ID)
}
func (r *changeRepository) dataChange(change models.Change) (*psqlcore.DataChange, error) {
objectsJSON, err := json.Marshal(change.ChangeObjectSet)
if err != nil {
return nil, err
}
keys := make([]string, len(change.Keys))
for i, ck := range change.Keys {
keys[i] = ck.String()
}
return &psqlcore.DataChange{
ID: change.ID,
Model: string(change.Model),
Op: change.Op,
Author: change.Author,
Listed: change.Listed,
Date: change.Date.In(time.UTC),
Keys: keys,
Objects: objectsJSON,
}, nil
}
func (r *changeRepository) change(row psqlcore.DataChange) (*models.Change, error) {
objects := models.ChangeObjectSet{}
err := json.Unmarshal(row.Objects, &objects)
if err != nil {
return nil, err
}
keys := make([]models.ChangeKey, len(row.Keys))
for i := range keys {
err := keys[i].Decode(row.Keys[i])
if err != nil {
return nil, err
}
}
model := models.ChangeModel(row.Model)
if !model.IsValid() {
return nil, errors.New("invalid model")
}
return &models.Change{
ID: row.ID,
Model: model,
Op: row.Op,
Author: row.Author,
Listed: row.Listed,
Date: row.Date,
Keys: keys,
ChangeObjectSet: objects,
}, nil
}
func (r *changeRepository) changes(rows []psqlcore.DataChange) ([]*models.Change, error) {
results := make([]*models.Change, len(rows))
for i, row := range rows {
change, err := r.change(row)
if err != nil {
return nil, err
}
results[i] = change
}
return results, nil
}