diff --git a/database/mongodb/changes.go b/database/mongodb/changes.go index 07ed572..6c8b988 100644 --- a/database/mongodb/changes.go +++ b/database/mongodb/changes.go @@ -34,6 +34,14 @@ func (r *changeRepository) List(ctx context.Context, filter models.ChangeFilter) if filter.EarliestDate != nil && !filter.EarliestDate.IsZero() { query["date"] = bson.M{"$gte": *filter.EarliestDate} } + if filter.LatestDate != nil && !filter.LatestDate.IsZero() { + dateBson, ok := query["date"].(bson.M) + if !ok { + dateBson = bson.M{} + } + + dateBson["$gte"] = *filter.EarliestDate + } if len(filter.Keys) > 0 { query["keys"] = bson.M{"$in": filter.Keys} } else if !r.restoreIDs { diff --git a/database/postgres/changes.go b/database/postgres/changes.go new file mode 100644 index 0000000..61254c4 --- /dev/null +++ b/database/postgres/changes.go @@ -0,0 +1,184 @@ +package postgres + +import ( + "context" + "database/sql" + "encoding/json" + "errors" + "fmt" + "git.aiterp.net/rpdata/api/database/postgres/psqlcore" + "git.aiterp.net/rpdata/api/models" + "strconv" + "time" +) + +type changeRepository struct { + insertWithIDs bool + db *sql.DB +} + +func (r *changeRepository) Find(ctx context.Context, id string) (*models.Change, error) { + row, err := psqlcore.New(r.db).SelectChangeByID(ctx, id) + if err != nil { + return nil, err + } + + return r.change(row) +} + +func (r *changeRepository) List(ctx context.Context, filter models.ChangeFilter) ([]*models.Change, error) { + params := psqlcore.SelectChangesParams{LimitSize: 100} + if filter.Keys != nil { + params.FilterKeys = true + keys := make([]string, len(filter.Keys)) + for i, ck := range filter.Keys { + keys[i] = ck.String() + } + params.Keys = keys + } + if filter.Author != nil { + params.FilterAuthor = true + params.Author = *filter.Author + } + if filter.EarliestDate != nil { + params.FilterEarliestDate = true + params.EarliestDate = filter.EarliestDate.In(time.UTC) + } + if filter.LatestDate != nil { + params.FilterLatestDate = true + params.LatestDate = filter.LatestDate.In(time.UTC) + } + if filter.Limit != nil { + if *filter.Limit <= 0 { + params.LimitSize = 10000 + } else { + params.LimitSize = int32(*filter.Limit) + } + } + + rows, err := psqlcore.New(r.db).SelectChanges(ctx, params) + if err != nil { + return nil, err + } + + return r.changes(rows) +} + +func (r *changeRepository) Insert(ctx context.Context, change models.Change) (*models.Change, error) { + tx, err := r.db.BeginTx(ctx, nil) + if err != nil { + return nil, err + } + defer func() { _ = tx.Rollback() }() + + q := psqlcore.New(tx) + + if !r.insertWithIDs || change.ID == "" { + next, err := q.IncrementCounter(ctx, "data_change_id") + if err != nil { + return nil, err + } + + change.ID = fmt.Sprintf("Change_%d", next) + } else { + if len(change.ID) < 8 { + return nil, errors.New("invalid change id") + } + + n, err := strconv.Atoi(change.ID[7:]) + if err != nil { + return nil, err + } + + err = q.BumpCounter(ctx, psqlcore.BumpCounterParams{ID: "data_change_id", Value: int32(n)}) + if err != nil { + return nil, err + } + } + + data, err := r.dataChange(change) + if err != nil { + return nil, err + } + + err = q.InsertChange(ctx, psqlcore.InsertChangeParams(*data)) + if err != nil { + return nil, err + } + + return &change, tx.Commit() +} + +func (r *changeRepository) Remove(ctx context.Context, change models.Change) error { + return psqlcore.New(r.db).DeleteChange(ctx, change.ID) +} + +func (r *changeRepository) dataChange(change models.Change) (*psqlcore.DataChange, error) { + objectsJSON, err := json.Marshal(change.ChangeObjectSet) + if err != nil { + return nil, err + } + + keys := make([]string, len(change.Keys)) + for i, ck := range change.Keys { + keys[i] = ck.String() + } + + return &psqlcore.DataChange{ + ID: change.ID, + Model: string(change.Model), + Op: change.Op, + Author: change.Author, + Listed: change.Listed, + Date: change.Date.In(time.UTC), + Keys: keys, + Objects: objectsJSON, + }, nil +} + +func (r *changeRepository) change(row psqlcore.DataChange) (*models.Change, error) { + objects := models.ChangeObjectSet{} + + err := json.Unmarshal(row.Objects, &objects) + if err != nil { + return nil, err + } + + keys := make([]models.ChangeKey, len(row.Keys)) + for i := range keys { + err := keys[i].Decode(row.Keys[i]) + if err != nil { + return nil, err + } + } + + model := models.ChangeModel(row.Model) + if !model.IsValid() { + return nil, errors.New("invalid model") + } + + return &models.Change{ + ID: row.ID, + Model: model, + Op: row.Op, + Author: row.Author, + Listed: row.Listed, + Date: row.Date, + Keys: keys, + ChangeObjectSet: objects, + }, nil +} + +func (r *changeRepository) changes(rows []psqlcore.DataChange) ([]*models.Change, error) { + results := make([]*models.Change, len(rows)) + for i, row := range rows { + change, err := r.change(row) + if err != nil { + return nil, err + } + + results[i] = change + } + + return results, nil +} diff --git a/database/postgres/db.go b/database/postgres/db.go index d484171..62c4558 100644 --- a/database/postgres/db.go +++ b/database/postgres/db.go @@ -34,6 +34,12 @@ func Connect(cfg config.Database) (*DB, error) { if err := q.EnsureCounter(timeout, "data_character_id"); err != nil { return nil, err } + if err := q.EnsureCounter(timeout, "data_change_id"); err != nil { + return nil, err + } + if err := q.EnsureCounter(timeout, "data_log_short_id"); err != nil { + return nil, err + } return &DB{ db: db, @@ -47,7 +53,7 @@ type DB struct { } func (d *DB) Changes() repositories.ChangeRepository { - panic("implement me") + return &changeRepository{insertWithIDs: d.insertWithIDs, db: d.db} } func (d *DB) Channels() repositories.ChannelRepository { diff --git a/database/postgres/migrations/20210322193630_create_table_change.sql b/database/postgres/migrations/20210322193630_create_table_change.sql new file mode 100644 index 0000000..475b3c4 --- /dev/null +++ b/database/postgres/migrations/20210322193630_create_table_change.sql @@ -0,0 +1,18 @@ +-- +goose Up +-- +goose StatementBegin +CREATE TABLE data_change ( + id TEXT NOT NULL PRIMARY KEY, + model TEXT NOT NULL, + op TEXT NOT NULL, + author TEXT NOT NULL, + listed BOOLEAN NOT NULL, + date TIMESTAMP NOT NULL, + keys TEXT[] NOT NULL, + objects JSONB NOT NULL +); +-- +goose StatementEnd + +-- +goose Down +-- +goose StatementBegin +DROP TABLE IF EXISTS data_change; +-- +goose StatementEnd diff --git a/database/postgres/psqlcore/changes.sql.go b/database/postgres/psqlcore/changes.sql.go new file mode 100644 index 0000000..f660043 --- /dev/null +++ b/database/postgres/psqlcore/changes.sql.go @@ -0,0 +1,138 @@ +// Code generated by sqlc. DO NOT EDIT. +// source: changes.sql + +package psqlcore + +import ( + "context" + "encoding/json" + "time" + + "github.com/lib/pq" +) + +const deleteChange = `-- name: DeleteChange :exec +DELETE FROM data_change WHERE id = $1 +` + +func (q *Queries) DeleteChange(ctx context.Context, id string) error { + _, err := q.db.ExecContext(ctx, deleteChange, id) + return err +} + +const insertChange = `-- name: InsertChange :exec +INSERT INTO data_change (id, model, op, author, listed, date, keys, objects) +VALUES ( + $1::text, $2::text, $3::text, $4::text, + $5::boolean, $6::timestamp, $7::text[], + $8::jsonb +) +` + +type InsertChangeParams struct { + ID string `json:"id"` + Model string `json:"model"` + Op string `json:"op"` + Author string `json:"author"` + Listed bool `json:"listed"` + Date time.Time `json:"date"` + Keys []string `json:"keys"` + Objects json.RawMessage `json:"objects"` +} + +func (q *Queries) InsertChange(ctx context.Context, arg InsertChangeParams) error { + _, err := q.db.ExecContext(ctx, insertChange, + arg.ID, + arg.Model, + arg.Op, + arg.Author, + arg.Listed, + arg.Date, + pq.Array(arg.Keys), + arg.Objects, + ) + return err +} + +const selectChangeByID = `-- name: SelectChangeByID :one +SELECT id, model, op, author, listed, date, keys, objects FROM data_change WHERE id = $1 LIMIT 1 +` + +func (q *Queries) SelectChangeByID(ctx context.Context, id string) (DataChange, error) { + row := q.db.QueryRowContext(ctx, selectChangeByID, id) + var i DataChange + err := row.Scan( + &i.ID, + &i.Model, + &i.Op, + &i.Author, + &i.Listed, + &i.Date, + pq.Array(&i.Keys), + &i.Objects, + ) + return i, err +} + +const selectChanges = `-- name: SelectChanges :many +SELECT id, model, op, author, listed, date, keys, objects FROM data_change +WHERE ($1::bool = false OR keys && ($2::text[])) + AND ($3::bool = false OR date >= $4::timestamp) + AND ($5::bool = false OR date <= $6::timestamp) + AND ($7::bool = false OR author = $8::text) +LIMIT $9::int +` + +type SelectChangesParams struct { + FilterKeys bool `json:"filter_keys"` + Keys []string `json:"keys"` + FilterEarliestDate bool `json:"filter_earliest_date"` + EarliestDate time.Time `json:"earliest_date"` + FilterLatestDate bool `json:"filter_latest_date"` + LatestDate time.Time `json:"latest_date"` + FilterAuthor bool `json:"filter_author"` + Author string `json:"author"` + LimitSize int32 `json:"limit_size"` +} + +func (q *Queries) SelectChanges(ctx context.Context, arg SelectChangesParams) ([]DataChange, error) { + rows, err := q.db.QueryContext(ctx, selectChanges, + arg.FilterKeys, + pq.Array(arg.Keys), + arg.FilterEarliestDate, + arg.EarliestDate, + arg.FilterLatestDate, + arg.LatestDate, + arg.FilterAuthor, + arg.Author, + arg.LimitSize, + ) + if err != nil { + return nil, err + } + defer rows.Close() + items := []DataChange{} + for rows.Next() { + var i DataChange + if err := rows.Scan( + &i.ID, + &i.Model, + &i.Op, + &i.Author, + &i.Listed, + &i.Date, + pq.Array(&i.Keys), + &i.Objects, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} diff --git a/database/postgres/psqlcore/models.go b/database/postgres/psqlcore/models.go index f38fd5d..7913511 100644 --- a/database/postgres/psqlcore/models.go +++ b/database/postgres/psqlcore/models.go @@ -4,6 +4,8 @@ package psqlcore import ( "database/sql" + "encoding/json" + "time" ) type CoreCounter struct { @@ -11,6 +13,17 @@ type CoreCounter struct { Value sql.NullInt32 `json:"value"` } +type DataChange struct { + ID string `json:"id"` + Model string `json:"model"` + Op string `json:"op"` + Author string `json:"author"` + Listed bool `json:"listed"` + Date time.Time `json:"date"` + Keys []string `json:"keys"` + Objects json.RawMessage `json:"objects"` +} + type DataChannel struct { Name string `json:"name"` Logged bool `json:"logged"` diff --git a/database/postgres/queries/changes.sql b/database/postgres/queries/changes.sql new file mode 100644 index 0000000..06d8459 --- /dev/null +++ b/database/postgres/queries/changes.sql @@ -0,0 +1,21 @@ +-- name: SelectChangeByID :one +SELECT * FROM data_change WHERE id = $1 LIMIT 1; + +-- name: SelectChanges :many +SELECT * FROM data_change +WHERE (sqlc.arg(filter_keys)::bool = false OR keys && (sqlc.arg(keys)::text[])) + AND (sqlc.arg(filter_earliest_date)::bool = false OR date >= sqlc.arg(earliest_date)::timestamp) + AND (sqlc.arg(filter_latest_date)::bool = false OR date <= sqlc.arg(latest_date)::timestamp) + AND (sqlc.arg(filter_author)::bool = false OR author = sqlc.arg(author)::text) +LIMIT sqlc.arg(limit_size)::int; + +-- name: InsertChange :exec +INSERT INTO data_change (id, model, op, author, listed, date, keys, objects) +VALUES ( + sqlc.arg(id)::text, sqlc.arg(model)::text, sqlc.arg(op)::text, sqlc.arg(author)::text, + sqlc.arg(listed)::boolean, sqlc.arg(date)::timestamp, sqlc.arg(keys)::text[], + sqlc.arg(objects)::jsonb +); + +-- name: DeleteChange :exec +DELETE FROM data_change WHERE id = $1; \ No newline at end of file diff --git a/models/change.go b/models/change.go index da6612e..797335c 100644 --- a/models/change.go +++ b/models/change.go @@ -1,7 +1,9 @@ package models import ( + "fmt" "reflect" + "strings" "time" ) @@ -15,19 +17,23 @@ type Change struct { Keys []ChangeKey `bson:"keys"` Date time.Time `bson:"date"` - Logs []*Log `bson:"logs"` - Characters []*Character `bson:"characters"` - Channels []*Channel `bson:"channels"` - Posts []*Post `bson:"posts"` - Stories []*Story `bson:"stories"` - Tags []*Tag `bson:"tags"` - Chapters []*Chapter `bson:"chapters"` - Comments []*Comment `bson:"comments"` - Files []*File `bson:"files"` + ChangeObjectSet +} + +type ChangeObjectSet struct { + Logs []*Log `bson:"logs" json:"logs,omitempty"` + Characters []*Character `bson:"characters" json:"characters,omitempty"` + Channels []*Channel `bson:"channels" json:"channels,omitempty"` + Posts []*Post `bson:"posts" json:"posts,omitempty"` + Stories []*Story `bson:"stories" json:"stories,omitempty"` + Tags []*Tag `bson:"tags" json:"tags,omitempty"` + Chapters []*Chapter `bson:"chapters" json:"chapters,omitempty"` + Comments []*Comment `bson:"comments" json:"comments,omitempty"` + Files []*File `bson:"files" json:"files,omitempty"` } // AddObject adds the model into the appropriate array. -func (change *Change) AddObject(object interface{}) bool { +func (change *ChangeObjectSet) AddObject(object interface{}) bool { if v := reflect.ValueOf(object); v.Kind() == reflect.Struct { ptr := reflect.PtrTo(v.Type()) ptrValue := reflect.New(ptr.Elem()) @@ -174,10 +180,32 @@ type ChangeKey struct { ID string `bson:"id"` } +func (ck *ChangeKey) String() string { + return ck.Model.String() + ":" + ck.ID +} + +func (ck *ChangeKey) Decode(str string) error { + split := strings.Split(str, ":") + if len(split) != 2 { + return fmt.Errorf("invalid change key: %s", str) + } + + model := ChangeModel(split[0]) + if !model.IsValid() { + return fmt.Errorf("invalid change key model: %s", model) + } + + ck.Model = model + ck.ID = split[1] + + return nil +} + // ChangeFilter is a filter for listing changes. type ChangeFilter struct { Keys []ChangeKey EarliestDate *time.Time + LatestDate *time.Time Author *string Limit *int }