Plan stuff. Log stuff.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 

229 lines
4.8 KiB

package bolt
import (
"context"
"github.com/gisle/stufflog/database/repositories"
"github.com/gisle/stufflog/models"
"github.com/gisle/stufflog/slerrors"
"github.com/vmihailenco/msgpack/v4"
"go.etcd.io/bbolt"
)
var bnActivities = []byte("Activity")
type activityRepository struct {
db *bbolt.DB
userIdIdx *index
}
func (r *activityRepository) FindID(ctx context.Context, id string) (*models.Activity, error) {
activity := new(models.Activity)
err := r.db.View(func(tx *bbolt.Tx) error {
value := tx.Bucket(bnActivities).Get(unsafeStringToBytes(id))
if value == nil {
return slerrors.NotFound("Activity")
}
err := msgpack.Unmarshal(value, activity)
if err != nil {
return err
}
return nil
})
if err != nil {
return nil, err
}
return activity, nil
}
func (r *activityRepository) List(ctx context.Context) ([]*models.Activity, error) {
activities := make([]*models.Activity, 0, 16)
err := r.db.View(func(tx *bbolt.Tx) error {
cursor := tx.Bucket(bnActivities).Cursor()
for key, value := cursor.First(); key != nil; key, value = cursor.Next() {
activity := new(models.Activity)
err := msgpack.Unmarshal(value, activity)
if err != nil {
return err
}
activities = append(activities, activity)
}
return nil
})
if err != nil {
return nil, err
}
return activities, nil
}
func (r *activityRepository) ListUser(ctx context.Context, user models.User) ([]*models.Activity, error) {
activities := make([]*models.Activity, 0, 16)
err := r.db.View(func(tx *bbolt.Tx) error {
bucket := tx.Bucket(bnActivities)
ids, err := r.userIdIdx.WithTx(tx).Get(user.ID)
if err != nil {
return err
}
for _, id := range ids {
value := bucket.Get(id)
if value == nil {
continue
}
activity := new(models.Activity)
err := msgpack.Unmarshal(value, activity)
if err != nil {
return err
}
activities = append(activities, activity)
}
return nil
})
if err != nil {
return nil, err
}
return activities, nil
}
func (r *activityRepository) Insert(ctx context.Context, activity models.Activity) error {
value, err := msgpack.Marshal(&activity)
if err != nil {
return err
}
return r.db.Update(func(tx *bbolt.Tx) error {
err := tx.Bucket(bnActivities).Put(unsafeStringToBytes(activity.ID), value)
if err != nil {
return err
}
err = r.userIdIdx.WithTx(tx).Set(unsafeStringToBytes(activity.ID), activity.UserID)
if err != nil {
return err
}
return nil
})
}
func (r *activityRepository) Update(ctx context.Context, activity models.Activity, updates []*models.ActivityUpdate) (*models.Activity, error) {
err := r.db.Update(func(tx *bbolt.Tx) error {
bucket := tx.Bucket(bnActivities)
// Re-Get to guarantee consistency.
value := bucket.Get(unsafeStringToBytes(activity.ID))
if value == nil {
return slerrors.NotFound("Activity")
}
err := msgpack.Unmarshal(value, &activity)
if err != nil {
return err
}
// Perform updates
didChange := false
for _, update := range updates {
changed, err := activity.ApplyUpdate(*update)
if err != nil {
return err
}
if changed {
didChange = true
}
}
// Put back into bucket
if didChange {
value, err := msgpack.Marshal(&activity)
if err != nil {
return err
}
err = tx.Bucket(bnActivities).Put(unsafeStringToBytes(activity.ID), value)
if err != nil {
return err
}
} else {
return errUnchanged
}
return nil
})
if err != nil && err != errUnchanged {
return nil, err
}
return &activity, nil
}
func (r *activityRepository) Remove(ctx context.Context, activity models.Activity) error {
return r.db.Update(func(tx *bbolt.Tx) error {
err := tx.Bucket(bnActivities).Delete(unsafeStringToBytes(activity.ID))
if err != nil {
return err
}
err = r.userIdIdx.WithTx(tx).Set(unsafeStringToBytes(activity.ID))
if err != nil {
return err
}
return nil
})
}
func (r *activityRepository) reindex() error {
return r.db.Update(func(tx *bbolt.Tx) error {
cursor := tx.Bucket(bnActivities).Cursor()
err := r.userIdIdx.Reset(tx)
if err != nil {
return err
}
userIdIdxTx := r.userIdIdx.WithTx(tx)
for key, value := cursor.First(); key != nil; key, value = cursor.Next() {
activity := new(models.Activity)
err := msgpack.Unmarshal(value, activity)
if err != nil {
return err
}
err = userIdIdxTx.Set(unsafeStringToBytes(activity.ID), activity.UserID)
if err != nil {
return err
}
}
return nil
})
}
func newActivityRepository(db *bbolt.DB) (repositories.ActivityRepository, error) {
err := db.Update(func(tx *bbolt.Tx) error {
_, err := tx.CreateBucketIfNotExists(bnActivities)
return err
})
if err != nil {
return nil, err
}
userIdIdx, err := newModelIndex(db, "Activity", "UserID")
if err != nil {
return nil, err
}
return &activityRepository{
db: db,
userIdIdx: userIdIdx,
}, nil
}