diff --git a/cmd/rpdata-graphiql/main.go b/cmd/rpdata-graphiql/main.go index 5fc28b3..71ee86b 100644 --- a/cmd/rpdata-graphiql/main.go +++ b/cmd/rpdata-graphiql/main.go @@ -1,6 +1,7 @@ package main import ( + "context" "encoding/json" "log" "net/http" @@ -8,6 +9,7 @@ import ( "git.aiterp.net/rpdata/api/internal/session" "git.aiterp.net/rpdata/api/internal/store" "git.aiterp.net/rpdata/api/loader" + "git.aiterp.net/rpdata/api/model/change" "git.aiterp.net/rpdata/api/model/file" logModel "git.aiterp.net/rpdata/api/model/log" "git.aiterp.net/rpdata/api/resolver" @@ -89,6 +91,24 @@ func main() { w.Write(json) }) + change.InitializeTimeline() + go func() { + index := int64(0) + + for { + index++ + ch, err := change.Timeline.Get(context.Background(), index) + if err == change.ErrExpired { + continue + } else if err != nil { + log.Println("Timeline:", err) + break + } + + log.Printf("Change: time=%s model=%s op=%s id=%s author=%s\n", ch.Time.Local().Format("[Mon Jan 02 15:04 MST]"), ch.Model, ch.Op, ch.ObjectID, ch.Author) + } + }() + log.Fatal(http.ListenAndServe(":17000", nil)) } diff --git a/model/change/change.go b/model/change/change.go index 5469d90..a0d80e9 100644 --- a/model/change/change.go +++ b/model/change/change.go @@ -2,6 +2,7 @@ package change import ( "log" + "sync" "time" "git.aiterp.net/rpdata/api/model/counter" @@ -30,8 +31,13 @@ var PublicModels = []string{ "Post", } +var submitMutex sync.Mutex + // Submit submits a change to the history. func Submit(model, op, author, objectID string, data interface{}) (Change, error) { + submitMutex.Lock() + defer submitMutex.Unlock() + index, err := counter.Next("auto_increment", "Change") if err != nil { return Change{}, err @@ -47,6 +53,8 @@ func Submit(model, op, author, objectID string, data interface{}) (Change, error Data: data, } + Timeline.push(change) + err = collection.Insert(&change) if err != nil { return Change{}, err @@ -55,6 +63,28 @@ func Submit(model, op, author, objectID string, data interface{}) (Change, error return change, err } +// FindID gets a change by ID +func FindID(id int) (Change, error) { + change := Change{} + err := collection.FindId(id).One(&change) + if err != nil { + return Change{}, err + } + + return change, nil +} + +// List gets all changes in ascending order +func List() ([]Change, error) { + changes := make([]Change, 0, 64) + err := collection.Find(nil).Sort("_id").All(&changes) + if err != nil { + return nil, err + } + + return changes, nil +} + func init() { store.HandleInit(func(db *mgo.Database) { collection = db.C("common.history") diff --git a/model/change/timeline.go b/model/change/timeline.go new file mode 100644 index 0000000..e09936b --- /dev/null +++ b/model/change/timeline.go @@ -0,0 +1,118 @@ +package change + +import ( + "context" + "errors" + "sync" + + "github.com/globalsign/mgo" +) + +// ErrInvalidIndex is returned when a timeline subscriber tries to get a value +// with a negative id +var ErrInvalidIndex = errors.New("change.timeline: Invalid ID") + +// ErrWaitRequired is returned when a timeline subscriber requests an entry that +// has not yet been pushed +var ErrWaitRequired = errors.New("change.timeline: Waiting required") + +// ErrExpired is returned when a timeline subscriber requests a change that happened +// too long ago for it to be in the database. +var ErrExpired = errors.New("change.timeline: Change has expired") + +type timeline struct { + mutex sync.Mutex + firstIndex int64 + currentID int64 + cache []Change + notifyCh chan struct{} +} + +func (timeline *timeline) Get(ctx context.Context, id int64) (Change, error) { + change, err := timeline.get(id, false) + if err == ErrWaitRequired { + if err := timeline.wait(ctx); err != nil { + return Change{}, err + } + + return timeline.get(id, true) + } else if err == mgo.ErrNotFound { + return Change{}, ErrExpired + } + + return change, nil +} + +func (timeline *timeline) push(change Change) { + timeline.mutex.Lock() + timeline.cache = append(timeline.cache, change) + if len(timeline.cache) > 128 { + copy(timeline.cache, timeline.cache[32:]) + timeline.cache = timeline.cache[:len(timeline.cache)-32] + } + + // Only update the currentID if this is more recent than the existing ones + if int64(change.ID) > timeline.currentID { + timeline.currentID = int64(change.ID) + } + + if timeline.notifyCh != nil { + close(timeline.notifyCh) + timeline.notifyCh = nil + } + timeline.mutex.Unlock() +} + +func (timeline *timeline) get(id int64, hasWaited bool) (Change, error) { + if id < 0 { + return Change{}, ErrInvalidIndex + } + + timeline.mutex.Lock() + if !hasWaited && id > timeline.currentID { + timeline.mutex.Unlock() + return Change{}, ErrWaitRequired + } + + for _, change := range timeline.cache { + if change.ID == int(id) { + timeline.mutex.Unlock() + return change, nil + } + } + timeline.mutex.Unlock() + + return FindID(int(id)) +} + +func (timeline *timeline) wait(ctx context.Context) error { + timeline.mutex.Lock() + if timeline.notifyCh == nil { + timeline.notifyCh = make(chan struct{}) + } + + ch := timeline.notifyCh + timeline.mutex.Unlock() + + select { + case <-ctx.Done(): + return ctx.Err() + case <-ch: + return nil + } +} + +// Timeline has operations for subscribing to changes in real time +var Timeline = &timeline{} + +// InitializeTimeline fills in the timeline with existing entries +func InitializeTimeline() { + changes, err := List() + if err != nil { + return + } + + for _, change := range changes { + Timeline.push(change) + } +} diff --git a/resolver/channel.go b/resolver/channel.go index 2a5755a..6b5c355 100644 --- a/resolver/channel.go +++ b/resolver/channel.go @@ -4,6 +4,7 @@ import ( "context" "git.aiterp.net/rpdata/api/internal/session" + "git.aiterp.net/rpdata/api/model/change" "git.aiterp.net/rpdata/api/model/channel" ) @@ -84,6 +85,13 @@ func (r *MutationResolver) AddChannel(ctx context.Context, args *ChannelAddArgs) return nil, err } + go change.Submit("Channel", "add", user.ID, channel.Name, map[string]interface{}{ + "logged": channel.Logged, + "hub": channel.Hub, + "location": input.LocationName, + "event": input.EventName, + }) + return &ChannelResolver{C: channel}, nil } @@ -117,6 +125,13 @@ func (r *MutationResolver) EditChannel(ctx context.Context, args *ChannelEditArg return nil, err } + go change.Submit("Channel", "edit", user.ID, channel.Name, map[string]interface{}{ + "logged": input.Logged, + "hub": input.Hub, + "location": input.LocationName, + "event": input.EventName, + }) + return &ChannelResolver{C: channel}, nil } @@ -137,6 +152,8 @@ func (r *MutationResolver) RemoveChannel(ctx context.Context, args ChannelArgs) return nil, err } + go change.Submit("Channel", "remove", user.ID, channel.Name, nil) + return &ChannelResolver{C: channel}, nil } diff --git a/resolver/chapter.go b/resolver/chapter.go index d9aa8f1..f46a476 100644 --- a/resolver/chapter.go +++ b/resolver/chapter.go @@ -74,7 +74,7 @@ func (r *MutationResolver) AddChapter(ctx context.Context, args *AddChapterArgs) return nil, err } - change.Submit("Chapter", "add", user.ID, chapter.ID, map[string]interface{}{ + go change.Submit("Chapter", "add", user.ID, chapter.ID, map[string]interface{}{ "title": input.Title, "author": author, "fictionalDate": fictionalDate, @@ -126,7 +126,7 @@ func (r *MutationResolver) EditChapter(ctx context.Context, args *EditChapterArg return nil, err } - change.Submit("Chapter", "edit", user.ID, chapter.ID, map[string]interface{}{ + go change.Submit("Chapter", "edit", user.ID, chapter.ID, map[string]interface{}{ "title": input.Title, "source": input.Source, "fictionalDate": fictionalDate, @@ -157,7 +157,7 @@ func (r *MutationResolver) RemoveChapter(ctx context.Context, args *DeleteChapte return nil, err } - change.Submit("Chapter", "remove", user.ID, chapter.ID, nil) + go change.Submit("Chapter", "remove", user.ID, chapter.ID, nil) return &ChapterResolver{C: chapter}, nil } diff --git a/resolver/log.go b/resolver/log.go index 8c915e3..76b7737 100644 --- a/resolver/log.go +++ b/resolver/log.go @@ -164,7 +164,7 @@ func (r *MutationResolver) AddLog(ctx context.Context, args *LogAddArgs) (*LogRe return nil, err } - change.Submit("Log", "add", user.ID, log.ID, map[string]interface{}{ + go change.Submit("Log", "add", user.ID, log.ID, map[string]interface{}{ "channel": log.Channel, "title": log.Title, "event": log.Event, @@ -205,7 +205,7 @@ func (r *MutationResolver) EditLog(ctx context.Context, args *LogEditArgs) (*Log return nil, err } - change.Submit("Log", "edit", user.ID, log.ID, map[string]interface{}{ + go change.Submit("Log", "edit", user.ID, log.ID, map[string]interface{}{ "channel": log.Channel, "title": input.Title, "event": input.Event, @@ -233,7 +233,7 @@ func (r *MutationResolver) RemoveLog(ctx context.Context, args *struct{ ID strin return nil, err } - change.Submit("Log", "remove", user.ID, log.ID, nil) + go change.Submit("Log", "remove", user.ID, log.ID, nil) return &LogResolver{L: log}, nil } diff --git a/resolver/post.go b/resolver/post.go index 9840ef6..cb8cc73 100644 --- a/resolver/post.go +++ b/resolver/post.go @@ -82,7 +82,7 @@ func (r *MutationResolver) AddPost(ctx context.Context, args *PostAddArgs) (*Pos return nil, err } - change.Submit("Post", "add", user.ID, post.ID, map[string]interface{}{ + go change.Submit("Post", "add", user.ID, post.ID, map[string]interface{}{ "logId": post.LogID, "time": post.Time, "kind": post.Kind, @@ -136,7 +136,7 @@ func (r *MutationResolver) EditPost(ctx context.Context, args *PostEditArgs) (*P return nil, err } - change.Submit("Post", "edit", user.ID, post.ID, map[string]interface{}{ + go change.Submit("Post", "edit", user.ID, post.ID, map[string]interface{}{ "time": postTime, "kind": input.Kind, "nick": input.Nick, @@ -175,7 +175,7 @@ func (r *MutationResolver) MovePost(ctx context.Context, args *PostMoveArgs) (*P return nil, err } - change.Submit("Post", "move", user.ID, post.ID, map[string]interface{}{ + go change.Submit("Post", "move", user.ID, post.ID, map[string]interface{}{ "logId": post.LogID, "targetIndex": input.ToPosition, }) @@ -200,7 +200,7 @@ func (r *MutationResolver) RemovePost(ctx context.Context, args PostRemoveArgs) return nil, err } - change.Submit("Post", "remove", user.ID, post.ID, map[string]interface{}{ + go change.Submit("Post", "remove", user.ID, post.ID, map[string]interface{}{ "logId": post.LogID, }) diff --git a/resolver/story.go b/resolver/story.go index bb945e0..948dff0 100644 --- a/resolver/story.go +++ b/resolver/story.go @@ -152,7 +152,7 @@ func (r *MutationResolver) AddStory(ctx context.Context, args *StoryAddArgs) (*S return nil, err } - change.Submit("Story", "add", user.ID, story.ID, map[string]interface{}{ + go change.Submit("Story", "add", user.ID, story.ID, map[string]interface{}{ "name": input.Name, "category": input.Category, "author": input.Author, @@ -193,7 +193,7 @@ func (r *MutationResolver) AddStoryTag(ctx context.Context, args *StoryTagAddArg return nil, err } - change.Submit("Story", "add.tag", user.ID, story.ID, map[string]interface{}{ + go change.Submit("Story", "add.tag", user.ID, story.ID, map[string]interface{}{ "kind": tag.Kind, "name": tag.Name, }) @@ -233,7 +233,7 @@ func (r *MutationResolver) RemoveStoryTag(ctx context.Context, args *StoryTagRem return nil, err } - change.Submit("Story", "remove.tag", user.ID, story.ID, map[string]interface{}{ + go change.Submit("Story", "remove.tag", user.ID, story.ID, map[string]interface{}{ "kind": tag.Kind, "name": tag.Name, }) @@ -287,7 +287,7 @@ func (r *MutationResolver) EditStory(ctx context.Context, args *StoryEditArgs) ( return nil, err } - change.Submit("Story", "edit", user.ID, story.ID, map[string]interface{}{ + go change.Submit("Story", "edit", user.ID, story.ID, map[string]interface{}{ "name": input.Name, "category": input.Category, "author": input.Author, @@ -325,7 +325,7 @@ func (r *MutationResolver) RemoveStory(ctx context.Context, args *StoryRemoveArg return nil, err } - change.Submit("Story", "remove", user.ID, story.ID, nil) + go change.Submit("Story", "remove", user.ID, story.ID, nil) return &StoryResolver{S: story}, nil }