You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
229 lines
4.8 KiB
229 lines
4.8 KiB
package bolt
|
|
|
|
import (
|
|
"context"
|
|
"github.com/gisle/stufflog/database/repositories"
|
|
"github.com/gisle/stufflog/models"
|
|
"github.com/gisle/stufflog/slerrors"
|
|
"github.com/vmihailenco/msgpack/v4"
|
|
"go.etcd.io/bbolt"
|
|
)
|
|
|
|
var bnActivities = []byte("Activity")
|
|
|
|
type activityRepository struct {
|
|
db *bbolt.DB
|
|
userIdIdx *index
|
|
}
|
|
|
|
func (r *activityRepository) FindID(ctx context.Context, id string) (*models.Activity, error) {
|
|
activity := new(models.Activity)
|
|
err := r.db.View(func(tx *bbolt.Tx) error {
|
|
value := tx.Bucket(bnActivities).Get(unsafeStringToBytes(id))
|
|
if value == nil {
|
|
return slerrors.NotFound("Activity")
|
|
}
|
|
err := msgpack.Unmarshal(value, activity)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return activity, nil
|
|
}
|
|
|
|
func (r *activityRepository) List(ctx context.Context) ([]*models.Activity, error) {
|
|
activities := make([]*models.Activity, 0, 16)
|
|
err := r.db.View(func(tx *bbolt.Tx) error {
|
|
cursor := tx.Bucket(bnActivities).Cursor()
|
|
|
|
for key, value := cursor.First(); key != nil; key, value = cursor.Next() {
|
|
activity := new(models.Activity)
|
|
err := msgpack.Unmarshal(value, activity)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
activities = append(activities, activity)
|
|
}
|
|
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return activities, nil
|
|
}
|
|
|
|
func (r *activityRepository) ListUser(ctx context.Context, user models.User) ([]*models.Activity, error) {
|
|
activities := make([]*models.Activity, 0, 16)
|
|
err := r.db.View(func(tx *bbolt.Tx) error {
|
|
bucket := tx.Bucket(bnActivities)
|
|
|
|
ids, err := r.userIdIdx.WithTx(tx).Get(user.ID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
for _, id := range ids {
|
|
value := bucket.Get(id)
|
|
if value == nil {
|
|
continue
|
|
}
|
|
|
|
activity := new(models.Activity)
|
|
err := msgpack.Unmarshal(value, activity)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
activities = append(activities, activity)
|
|
}
|
|
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return activities, nil
|
|
}
|
|
|
|
func (r *activityRepository) Insert(ctx context.Context, activity models.Activity) error {
|
|
value, err := msgpack.Marshal(&activity)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return r.db.Update(func(tx *bbolt.Tx) error {
|
|
err := tx.Bucket(bnActivities).Put(unsafeStringToBytes(activity.ID), value)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
err = r.userIdIdx.WithTx(tx).Set(unsafeStringToBytes(activity.ID), activity.UserID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
})
|
|
}
|
|
|
|
func (r *activityRepository) Update(ctx context.Context, activity models.Activity, updates []*models.ActivityUpdate) (*models.Activity, error) {
|
|
err := r.db.Update(func(tx *bbolt.Tx) error {
|
|
bucket := tx.Bucket(bnActivities)
|
|
|
|
// Re-Get to guarantee consistency.
|
|
value := bucket.Get(unsafeStringToBytes(activity.ID))
|
|
if value == nil {
|
|
return slerrors.NotFound("Activity")
|
|
}
|
|
err := msgpack.Unmarshal(value, &activity)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Perform updates
|
|
didChange := false
|
|
for _, update := range updates {
|
|
changed, err := activity.ApplyUpdate(*update)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if changed {
|
|
didChange = true
|
|
}
|
|
}
|
|
|
|
// Put back into bucket
|
|
if didChange {
|
|
value, err := msgpack.Marshal(&activity)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
err = tx.Bucket(bnActivities).Put(unsafeStringToBytes(activity.ID), value)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
} else {
|
|
return errUnchanged
|
|
}
|
|
|
|
return nil
|
|
})
|
|
if err != nil && err != errUnchanged {
|
|
return nil, err
|
|
}
|
|
|
|
return &activity, nil
|
|
}
|
|
|
|
func (r *activityRepository) Remove(ctx context.Context, activity models.Activity) error {
|
|
return r.db.Update(func(tx *bbolt.Tx) error {
|
|
err := tx.Bucket(bnActivities).Delete(unsafeStringToBytes(activity.ID))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
err = r.userIdIdx.WithTx(tx).Set(unsafeStringToBytes(activity.ID))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
})
|
|
}
|
|
|
|
func (r *activityRepository) reindex() error {
|
|
return r.db.Update(func(tx *bbolt.Tx) error {
|
|
cursor := tx.Bucket(bnActivities).Cursor()
|
|
|
|
err := r.userIdIdx.Reset(tx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
userIdIdxTx := r.userIdIdx.WithTx(tx)
|
|
|
|
for key, value := cursor.First(); key != nil; key, value = cursor.Next() {
|
|
activity := new(models.Activity)
|
|
err := msgpack.Unmarshal(value, activity)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
err = userIdIdxTx.Set(unsafeStringToBytes(activity.ID), activity.UserID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
})
|
|
}
|
|
|
|
func newActivityRepository(db *bbolt.DB) (repositories.ActivityRepository, error) {
|
|
err := db.Update(func(tx *bbolt.Tx) error {
|
|
_, err := tx.CreateBucketIfNotExists(bnActivities)
|
|
return err
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
userIdIdx, err := newModelIndex(db, "Activity", "UserID")
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &activityRepository{
|
|
db: db,
|
|
userIdIdx: userIdIdx,
|
|
}, nil
|
|
}
|