Browse Source

Added change.Timeline, added change submissions to channel mutations, made all change submissions spin off a goroutine

1.0 0.2.0
Gisle Aune 6 years ago
parent
commit
7adcdfe1e0
  1. 20
      cmd/rpdata-graphiql/main.go
  2. 30
      model/change/change.go
  3. 118
      model/change/timeline.go
  4. 17
      resolver/channel.go
  5. 6
      resolver/chapter.go
  6. 6
      resolver/log.go
  7. 8
      resolver/post.go
  8. 10
      resolver/story.go

20
cmd/rpdata-graphiql/main.go

@ -1,6 +1,7 @@
package main package main
import ( import (
"context"
"encoding/json" "encoding/json"
"log" "log"
"net/http" "net/http"
@ -8,6 +9,7 @@ import (
"git.aiterp.net/rpdata/api/internal/session" "git.aiterp.net/rpdata/api/internal/session"
"git.aiterp.net/rpdata/api/internal/store" "git.aiterp.net/rpdata/api/internal/store"
"git.aiterp.net/rpdata/api/loader" "git.aiterp.net/rpdata/api/loader"
"git.aiterp.net/rpdata/api/model/change"
"git.aiterp.net/rpdata/api/model/file" "git.aiterp.net/rpdata/api/model/file"
logModel "git.aiterp.net/rpdata/api/model/log" logModel "git.aiterp.net/rpdata/api/model/log"
"git.aiterp.net/rpdata/api/resolver" "git.aiterp.net/rpdata/api/resolver"
@ -89,6 +91,24 @@ func main() {
w.Write(json) w.Write(json)
}) })
change.InitializeTimeline()
go func() {
index := int64(0)
for {
index++
ch, err := change.Timeline.Get(context.Background(), index)
if err == change.ErrExpired {
continue
} else if err != nil {
log.Println("Timeline:", err)
break
}
log.Printf("Change: time=%s model=%s op=%s id=%s author=%s\n", ch.Time.Local().Format("[Mon Jan 02 15:04 MST]"), ch.Model, ch.Op, ch.ObjectID, ch.Author)
}
}()
log.Fatal(http.ListenAndServe(":17000", nil)) log.Fatal(http.ListenAndServe(":17000", nil))
} }

30
model/change/change.go

@ -2,6 +2,7 @@ package change
import ( import (
"log" "log"
"sync"
"time" "time"
"git.aiterp.net/rpdata/api/model/counter" "git.aiterp.net/rpdata/api/model/counter"
@ -30,8 +31,13 @@ var PublicModels = []string{
"Post", "Post",
} }
var submitMutex sync.Mutex
// Submit submits a change to the history. // Submit submits a change to the history.
func Submit(model, op, author, objectID string, data interface{}) (Change, error) { func Submit(model, op, author, objectID string, data interface{}) (Change, error) {
submitMutex.Lock()
defer submitMutex.Unlock()
index, err := counter.Next("auto_increment", "Change") index, err := counter.Next("auto_increment", "Change")
if err != nil { if err != nil {
return Change{}, err return Change{}, err
@ -47,6 +53,8 @@ func Submit(model, op, author, objectID string, data interface{}) (Change, error
Data: data, Data: data,
} }
Timeline.push(change)
err = collection.Insert(&change) err = collection.Insert(&change)
if err != nil { if err != nil {
return Change{}, err return Change{}, err
@ -55,6 +63,28 @@ func Submit(model, op, author, objectID string, data interface{}) (Change, error
return change, err return change, err
} }
// FindID gets a change by ID
func FindID(id int) (Change, error) {
change := Change{}
err := collection.FindId(id).One(&change)
if err != nil {
return Change{}, err
}
return change, nil
}
// List gets all changes in ascending order
func List() ([]Change, error) {
changes := make([]Change, 0, 64)
err := collection.Find(nil).Sort("_id").All(&changes)
if err != nil {
return nil, err
}
return changes, nil
}
func init() { func init() {
store.HandleInit(func(db *mgo.Database) { store.HandleInit(func(db *mgo.Database) {
collection = db.C("common.history") collection = db.C("common.history")

118
model/change/timeline.go

@ -0,0 +1,118 @@
package change
import (
"context"
"errors"
"sync"
"github.com/globalsign/mgo"
)
// ErrInvalidIndex is returned when a timeline subscriber tries to get a value
// with a negative id
var ErrInvalidIndex = errors.New("change.timeline: Invalid ID")
// ErrWaitRequired is returned when a timeline subscriber requests an entry that
// has not yet been pushed
var ErrWaitRequired = errors.New("change.timeline: Waiting required")
// ErrExpired is returned when a timeline subscriber requests a change that happened
// too long ago for it to be in the database.
var ErrExpired = errors.New("change.timeline: Change has expired")
type timeline struct {
mutex sync.Mutex
firstIndex int64
currentID int64
cache []Change
notifyCh chan struct{}
}
func (timeline *timeline) Get(ctx context.Context, id int64) (Change, error) {
change, err := timeline.get(id, false)
if err == ErrWaitRequired {
if err := timeline.wait(ctx); err != nil {
return Change{}, err
}
return timeline.get(id, true)
} else if err == mgo.ErrNotFound {
return Change{}, ErrExpired
}
return change, nil
}
func (timeline *timeline) push(change Change) {
timeline.mutex.Lock()
timeline.cache = append(timeline.cache, change)
if len(timeline.cache) > 128 {
copy(timeline.cache, timeline.cache[32:])
timeline.cache = timeline.cache[:len(timeline.cache)-32]
}
// Only update the currentID if this is more recent than the existing ones
if int64(change.ID) > timeline.currentID {
timeline.currentID = int64(change.ID)
}
if timeline.notifyCh != nil {
close(timeline.notifyCh)
timeline.notifyCh = nil
}
timeline.mutex.Unlock()
}
func (timeline *timeline) get(id int64, hasWaited bool) (Change, error) {
if id < 0 {
return Change{}, ErrInvalidIndex
}
timeline.mutex.Lock()
if !hasWaited && id > timeline.currentID {
timeline.mutex.Unlock()
return Change{}, ErrWaitRequired
}
for _, change := range timeline.cache {
if change.ID == int(id) {
timeline.mutex.Unlock()
return change, nil
}
}
timeline.mutex.Unlock()
return FindID(int(id))
}
func (timeline *timeline) wait(ctx context.Context) error {
timeline.mutex.Lock()
if timeline.notifyCh == nil {
timeline.notifyCh = make(chan struct{})
}
ch := timeline.notifyCh
timeline.mutex.Unlock()
select {
case <-ctx.Done():
return ctx.Err()
case <-ch:
return nil
}
}
// Timeline has operations for subscribing to changes in real time
var Timeline = &timeline{}
// InitializeTimeline fills in the timeline with existing entries
func InitializeTimeline() {
changes, err := List()
if err != nil {
return
}
for _, change := range changes {
Timeline.push(change)
}
}

17
resolver/channel.go

@ -4,6 +4,7 @@ import (
"context" "context"
"git.aiterp.net/rpdata/api/internal/session" "git.aiterp.net/rpdata/api/internal/session"
"git.aiterp.net/rpdata/api/model/change"
"git.aiterp.net/rpdata/api/model/channel" "git.aiterp.net/rpdata/api/model/channel"
) )
@ -84,6 +85,13 @@ func (r *MutationResolver) AddChannel(ctx context.Context, args *ChannelAddArgs)
return nil, err return nil, err
} }
go change.Submit("Channel", "add", user.ID, channel.Name, map[string]interface{}{
"logged": channel.Logged,
"hub": channel.Hub,
"location": input.LocationName,
"event": input.EventName,
})
return &ChannelResolver{C: channel}, nil return &ChannelResolver{C: channel}, nil
} }
@ -117,6 +125,13 @@ func (r *MutationResolver) EditChannel(ctx context.Context, args *ChannelEditArg
return nil, err return nil, err
} }
go change.Submit("Channel", "edit", user.ID, channel.Name, map[string]interface{}{
"logged": input.Logged,
"hub": input.Hub,
"location": input.LocationName,
"event": input.EventName,
})
return &ChannelResolver{C: channel}, nil return &ChannelResolver{C: channel}, nil
} }
@ -137,6 +152,8 @@ func (r *MutationResolver) RemoveChannel(ctx context.Context, args ChannelArgs)
return nil, err return nil, err
} }
go change.Submit("Channel", "remove", user.ID, channel.Name, nil)
return &ChannelResolver{C: channel}, nil return &ChannelResolver{C: channel}, nil
} }

6
resolver/chapter.go

@ -74,7 +74,7 @@ func (r *MutationResolver) AddChapter(ctx context.Context, args *AddChapterArgs)
return nil, err return nil, err
} }
change.Submit("Chapter", "add", user.ID, chapter.ID, map[string]interface{}{
go change.Submit("Chapter", "add", user.ID, chapter.ID, map[string]interface{}{
"title": input.Title, "title": input.Title,
"author": author, "author": author,
"fictionalDate": fictionalDate, "fictionalDate": fictionalDate,
@ -126,7 +126,7 @@ func (r *MutationResolver) EditChapter(ctx context.Context, args *EditChapterArg
return nil, err return nil, err
} }
change.Submit("Chapter", "edit", user.ID, chapter.ID, map[string]interface{}{
go change.Submit("Chapter", "edit", user.ID, chapter.ID, map[string]interface{}{
"title": input.Title, "title": input.Title,
"source": input.Source, "source": input.Source,
"fictionalDate": fictionalDate, "fictionalDate": fictionalDate,
@ -157,7 +157,7 @@ func (r *MutationResolver) RemoveChapter(ctx context.Context, args *DeleteChapte
return nil, err return nil, err
} }
change.Submit("Chapter", "remove", user.ID, chapter.ID, nil)
go change.Submit("Chapter", "remove", user.ID, chapter.ID, nil)
return &ChapterResolver{C: chapter}, nil return &ChapterResolver{C: chapter}, nil
} }

6
resolver/log.go

@ -164,7 +164,7 @@ func (r *MutationResolver) AddLog(ctx context.Context, args *LogAddArgs) (*LogRe
return nil, err return nil, err
} }
change.Submit("Log", "add", user.ID, log.ID, map[string]interface{}{
go change.Submit("Log", "add", user.ID, log.ID, map[string]interface{}{
"channel": log.Channel, "channel": log.Channel,
"title": log.Title, "title": log.Title,
"event": log.Event, "event": log.Event,
@ -205,7 +205,7 @@ func (r *MutationResolver) EditLog(ctx context.Context, args *LogEditArgs) (*Log
return nil, err return nil, err
} }
change.Submit("Log", "edit", user.ID, log.ID, map[string]interface{}{
go change.Submit("Log", "edit", user.ID, log.ID, map[string]interface{}{
"channel": log.Channel, "channel": log.Channel,
"title": input.Title, "title": input.Title,
"event": input.Event, "event": input.Event,
@ -233,7 +233,7 @@ func (r *MutationResolver) RemoveLog(ctx context.Context, args *struct{ ID strin
return nil, err return nil, err
} }
change.Submit("Log", "remove", user.ID, log.ID, nil)
go change.Submit("Log", "remove", user.ID, log.ID, nil)
return &LogResolver{L: log}, nil return &LogResolver{L: log}, nil
} }

8
resolver/post.go

@ -82,7 +82,7 @@ func (r *MutationResolver) AddPost(ctx context.Context, args *PostAddArgs) (*Pos
return nil, err return nil, err
} }
change.Submit("Post", "add", user.ID, post.ID, map[string]interface{}{
go change.Submit("Post", "add", user.ID, post.ID, map[string]interface{}{
"logId": post.LogID, "logId": post.LogID,
"time": post.Time, "time": post.Time,
"kind": post.Kind, "kind": post.Kind,
@ -136,7 +136,7 @@ func (r *MutationResolver) EditPost(ctx context.Context, args *PostEditArgs) (*P
return nil, err return nil, err
} }
change.Submit("Post", "edit", user.ID, post.ID, map[string]interface{}{
go change.Submit("Post", "edit", user.ID, post.ID, map[string]interface{}{
"time": postTime, "time": postTime,
"kind": input.Kind, "kind": input.Kind,
"nick": input.Nick, "nick": input.Nick,
@ -175,7 +175,7 @@ func (r *MutationResolver) MovePost(ctx context.Context, args *PostMoveArgs) (*P
return nil, err return nil, err
} }
change.Submit("Post", "move", user.ID, post.ID, map[string]interface{}{
go change.Submit("Post", "move", user.ID, post.ID, map[string]interface{}{
"logId": post.LogID, "logId": post.LogID,
"targetIndex": input.ToPosition, "targetIndex": input.ToPosition,
}) })
@ -200,7 +200,7 @@ func (r *MutationResolver) RemovePost(ctx context.Context, args PostRemoveArgs)
return nil, err return nil, err
} }
change.Submit("Post", "remove", user.ID, post.ID, map[string]interface{}{
go change.Submit("Post", "remove", user.ID, post.ID, map[string]interface{}{
"logId": post.LogID, "logId": post.LogID,
}) })

10
resolver/story.go

@ -152,7 +152,7 @@ func (r *MutationResolver) AddStory(ctx context.Context, args *StoryAddArgs) (*S
return nil, err return nil, err
} }
change.Submit("Story", "add", user.ID, story.ID, map[string]interface{}{
go change.Submit("Story", "add", user.ID, story.ID, map[string]interface{}{
"name": input.Name, "name": input.Name,
"category": input.Category, "category": input.Category,
"author": input.Author, "author": input.Author,
@ -193,7 +193,7 @@ func (r *MutationResolver) AddStoryTag(ctx context.Context, args *StoryTagAddArg
return nil, err return nil, err
} }
change.Submit("Story", "add.tag", user.ID, story.ID, map[string]interface{}{
go change.Submit("Story", "add.tag", user.ID, story.ID, map[string]interface{}{
"kind": tag.Kind, "kind": tag.Kind,
"name": tag.Name, "name": tag.Name,
}) })
@ -233,7 +233,7 @@ func (r *MutationResolver) RemoveStoryTag(ctx context.Context, args *StoryTagRem
return nil, err return nil, err
} }
change.Submit("Story", "remove.tag", user.ID, story.ID, map[string]interface{}{
go change.Submit("Story", "remove.tag", user.ID, story.ID, map[string]interface{}{
"kind": tag.Kind, "kind": tag.Kind,
"name": tag.Name, "name": tag.Name,
}) })
@ -287,7 +287,7 @@ func (r *MutationResolver) EditStory(ctx context.Context, args *StoryEditArgs) (
return nil, err return nil, err
} }
change.Submit("Story", "edit", user.ID, story.ID, map[string]interface{}{
go change.Submit("Story", "edit", user.ID, story.ID, map[string]interface{}{
"name": input.Name, "name": input.Name,
"category": input.Category, "category": input.Category,
"author": input.Author, "author": input.Author,
@ -325,7 +325,7 @@ func (r *MutationResolver) RemoveStory(ctx context.Context, args *StoryRemoveArg
return nil, err return nil, err
} }
change.Submit("Story", "remove", user.ID, story.ID, nil)
go change.Submit("Story", "remove", user.ID, story.ID, nil)
return &StoryResolver{S: story}, nil return &StoryResolver{S: story}, nil
} }

Loading…
Cancel
Save