diff --git a/cmd/rpdata-server/main.go b/cmd/rpdata-server/main.go index e1f59ba..23a8d2a 100644 --- a/cmd/rpdata-server/main.go +++ b/cmd/rpdata-server/main.go @@ -8,6 +8,9 @@ import ( "runtime/debug" "strings" + "git.aiterp.net/rpdata/api/models" + "git.aiterp.net/rpdata/api/models/changes" + "git.aiterp.net/rpdata/api/models/logs" "git.aiterp.net/rpdata/api/graph2" @@ -35,9 +38,39 @@ func main() { log.Println("Characters updated") }() + go logListedChanges() + log.Fatal(http.ListenAndServe(":8081", nil)) } +func logListedChanges() { + sub := changes.Subscribe(context.Background(), nil, true) + + for change := range sub { + log.Printf("Change: Author=%#+v Model=%#+v Op=%#+v", change.Author, change.Model, change.Op) + for _, object := range change.Objects() { + switch object := object.(type) { + case models.Log: + log.Printf("\tLog: %s (%s)", object.ID, object.ShortID) + case models.Character: + log.Printf("\tCharacter: %s", object.ID) + case models.Channel: + log.Printf("\tChannel: %s", object.Name) + case models.Post: + log.Printf("\tPost: %s", object.ID) + case models.Story: + log.Printf("\tStory: %s", object.ID) + case models.Tag: + log.Printf("\tTag: (%s,%s)", object.Kind, object.Name) + case models.Chapter: + log.Printf("\tChapter: %s", object.ID) + } + } + } + + log.Println("Change subscription closed.") +} + func queryHandler() http.HandlerFunc { handler := handler.GraphQL( graph2.New(), diff --git a/graph2/graph.go b/graph2/graph.go index f4ced67..28d0a1c 100644 --- a/graph2/graph.go +++ b/graph2/graph.go @@ -26,6 +26,10 @@ func (r *rootResolver) Mutation() MutationResolver { return queries.MutationResolver } +func (r *rootResolver) Subscription() SubscriptionResolver { + return queries.SubscriptionResolver +} + func (r *rootResolver) Log() LogResolver { return &types.LogResolver } diff --git a/graph2/queries/changes.go b/graph2/queries/changes.go index 51b089c..0d53269 100644 --- a/graph2/queries/changes.go +++ b/graph2/queries/changes.go @@ -2,6 +2,7 @@ package queries import ( "context" + "errors" "git.aiterp.net/rpdata/api/models" "git.aiterp.net/rpdata/api/models/changes" @@ -12,3 +13,13 @@ import ( func (r *resolver) Changes(ctx context.Context, filter *changes.Filter) ([]models.Change, error) { return changes.List(filter) } + +/// Subscriptions + +func (r *subscriptionResolver) Changes(ctx context.Context, keys []models.ChangeKey) (<-chan models.Change, error) { + if len(keys) == 0 { + return nil, errors.New("At least one key is required for a subscription") + } + + return changes.Subscribe(ctx, keys, false), nil +} diff --git a/graph2/queries/resolver.go b/graph2/queries/resolver.go index 8403a82..613ba80 100644 --- a/graph2/queries/resolver.go +++ b/graph2/queries/resolver.go @@ -2,9 +2,13 @@ package queries type resolver struct{} type mutationResolver struct{} +type subscriptionResolver struct{} // Resolver has all the queries var Resolver resolver // MutationResolver brings the mutagens. var MutationResolver *mutationResolver + +// SubscriptionResolver has the real-time magic. +var SubscriptionResolver *subscriptionResolver diff --git a/graph2/schema/root.gql b/graph2/schema/root.gql index 250dc48..937f97e 100644 --- a/graph2/schema/root.gql +++ b/graph2/schema/root.gql @@ -1,6 +1,7 @@ schema { query: Query mutation: Mutation + subscription: Subscription } type Query { @@ -141,6 +142,13 @@ type Mutation { editChannel(input: ChannelEditInput!): Channel! } +type Subscription { + """ + Changes subscribes to the changes matching the following keys. + """ + changes(keys: [ChangeKeyInput!]!): Change! +} + # A Date represents a RFC3339 encoded date with up to millisecond precision. scalar Date diff --git a/models/changes/submit.go b/models/changes/submit.go index cf628b2..e004dbe 100644 --- a/models/changes/submit.go +++ b/models/changes/submit.go @@ -1,6 +1,7 @@ package changes import ( + "log" "strconv" "time" @@ -80,10 +81,12 @@ func Submit(model, op, author string, listed bool, keys []models.ChangeKey, obje change.Chapters = append(change.Chapters, object) case []models.Chapter: change.Chapters = append(change.Chapters, object...) + default: + log.Printf("Warning: unrecognized object in change: %#+v", object) } } - // TODO: Push to subscribers + pushToSubscribers(change) err = collection.Insert(&change) if err != nil { diff --git a/models/changes/subscribe.go b/models/changes/subscribe.go new file mode 100644 index 0000000..489d693 --- /dev/null +++ b/models/changes/subscribe.go @@ -0,0 +1,87 @@ +package changes + +import ( + "context" + "sync" + + "git.aiterp.net/rpdata/api/models" +) + +var subMutex sync.Mutex +var subList []*subscription + +type subscription struct { + Keys map[string]bool + Channel chan<- models.Change + WildCard bool +} + +// Subscribe subscribes to all changes. +func Subscribe(ctx context.Context, keys []models.ChangeKey, wildcard bool) <-chan models.Change { + channel := make(chan models.Change, 64) + sub := &subscription{ + Keys: make(map[string]bool, len(keys)), + Channel: channel, + WildCard: wildcard, + } + + for _, key := range keys { + sub.Keys[mapKey(key)] = true + } + + subMutex.Lock() + subList = append(subList, sub) + subMutex.Unlock() + + go func() { + <-ctx.Done() + + subMutex.Lock() + for i := range subList { + if subList[i] == sub { + subList = append(subList[:i], subList[i+1:]...) + } + } + subMutex.Unlock() + }() + + return channel +} + +func pushToSubscribers(change models.Change) { + keys := make([]string, len(change.Keys)) + for i := range change.Keys { + keys[i] = mapKey(change.Keys[i]) + } + + subMutex.Lock() +SubLoop: + for _, sub := range subList { + if sub.WildCard && change.Listed { + select { + case sub.Channel <- change: + default: + } + } else { + for _, key := range keys { + if sub.Keys[key] { + select { + case sub.Channel <- change: + default: + } + + continue SubLoop + } + } + } + } + subMutex.Unlock() +} + +func mapKey(ck models.ChangeKey) string { + return ck.Model.String() + "." + ck.ID +} + +func init() { + subList = make([]*subscription, 0, 16) +}