package postgres import ( "context" "database/sql" "encoding/json" "errors" "fmt" "git.aiterp.net/rpdata/api/database/postgres/psqlcore" "git.aiterp.net/rpdata/api/models" "strconv" "time" ) type changeRepository struct { insertWithIDs bool db *sql.DB } func (r *changeRepository) Find(ctx context.Context, id string) (*models.Change, error) { row, err := psqlcore.New(r.db).SelectChangeByID(ctx, id) if err != nil { return nil, err } return r.change(row) } func (r *changeRepository) List(ctx context.Context, filter models.ChangeFilter) ([]*models.Change, error) { params := psqlcore.SelectChangesParams{LimitSize: 100} if filter.Keys != nil { params.FilterKeys = true keys := make([]string, len(filter.Keys)) for i, ck := range filter.Keys { keys[i] = ck.String() } params.Keys = keys } if filter.Author != nil { params.FilterAuthor = true params.Author = *filter.Author } if filter.EarliestDate != nil { params.FilterEarliestDate = true params.EarliestDate = filter.EarliestDate.In(time.UTC) } if filter.LatestDate != nil { params.FilterLatestDate = true params.LatestDate = filter.LatestDate.In(time.UTC) } if filter.Limit != nil { if *filter.Limit <= 0 { params.LimitSize = 10000 } else { params.LimitSize = int32(*filter.Limit) } } rows, err := psqlcore.New(r.db).SelectChanges(ctx, params) if err != nil { return nil, err } return r.changes(rows) } func (r *changeRepository) Insert(ctx context.Context, change models.Change) (*models.Change, error) { tx, err := r.db.BeginTx(ctx, nil) if err != nil { return nil, err } defer func() { _ = tx.Rollback() }() q := psqlcore.New(tx) if !r.insertWithIDs || change.ID == "" { next, err := q.IncrementCounter(ctx, "data_change_id") if err != nil { return nil, err } change.ID = fmt.Sprintf("Change_%d", next) } else { if len(change.ID) < 8 { return nil, errors.New("invalid change id") } n, err := strconv.Atoi(change.ID[7:]) if err != nil { return nil, err } err = q.BumpCounter(ctx, psqlcore.BumpCounterParams{ID: "data_change_id", Value: int32(n)}) if err != nil { return nil, err } } data, err := r.dataChange(change) if err != nil { return nil, err } err = q.InsertChange(ctx, psqlcore.InsertChangeParams(*data)) if err != nil { return nil, err } return &change, tx.Commit() } func (r *changeRepository) Remove(ctx context.Context, change models.Change) error { return psqlcore.New(r.db).DeleteChange(ctx, change.ID) } func (r *changeRepository) dataChange(change models.Change) (*psqlcore.DataChange, error) { objectsJSON, err := json.Marshal(change.ChangeObjectSet) if err != nil { return nil, err } keys := make([]string, len(change.Keys)) for i, ck := range change.Keys { keys[i] = ck.String() } return &psqlcore.DataChange{ ID: change.ID, Model: string(change.Model), Op: change.Op, Author: change.Author, Listed: change.Listed, Date: change.Date.In(time.UTC), Keys: keys, Objects: objectsJSON, }, nil } func (r *changeRepository) change(row psqlcore.DataChange) (*models.Change, error) { objects := models.ChangeObjectSet{} err := json.Unmarshal(row.Objects, &objects) if err != nil { return nil, err } keys := make([]models.ChangeKey, len(row.Keys)) for i := range keys { err := keys[i].Decode(row.Keys[i]) if err != nil { return nil, err } } model := models.ChangeModel(row.Model) if !model.IsValid() { return nil, errors.New("invalid model") } return &models.Change{ ID: row.ID, Model: model, Op: row.Op, Author: row.Author, Listed: row.Listed, Date: row.Date, Keys: keys, ChangeObjectSet: objects, }, nil } func (r *changeRepository) changes(rows []psqlcore.DataChange) ([]*models.Change, error) { results := make([]*models.Change, len(rows)) for i, row := range rows { change, err := r.change(row) if err != nil { return nil, err } results[i] = change } return results, nil }