Browse Source

graph2, changes, rpdata-server: Added changes subscription.

module-madness-pointers
Gisle Aune 5 years ago
parent
commit
31239330df
  1. 33
      cmd/rpdata-server/main.go
  2. 4
      graph2/graph.go
  3. 11
      graph2/queries/changes.go
  4. 4
      graph2/queries/resolver.go
  5. 8
      graph2/schema/root.gql
  6. 5
      models/changes/submit.go
  7. 87
      models/changes/subscribe.go

33
cmd/rpdata-server/main.go

@ -8,6 +8,9 @@ import (
"runtime/debug"
"strings"
"git.aiterp.net/rpdata/api/models"
"git.aiterp.net/rpdata/api/models/changes"
"git.aiterp.net/rpdata/api/models/logs"
"git.aiterp.net/rpdata/api/graph2"
@ -35,9 +38,39 @@ func main() {
log.Println("Characters updated")
}()
go logListedChanges()
log.Fatal(http.ListenAndServe(":8081", nil))
}
func logListedChanges() {
sub := changes.Subscribe(context.Background(), nil, true)
for change := range sub {
log.Printf("Change: Author=%#+v Model=%#+v Op=%#+v", change.Author, change.Model, change.Op)
for _, object := range change.Objects() {
switch object := object.(type) {
case models.Log:
log.Printf("\tLog: %s (%s)", object.ID, object.ShortID)
case models.Character:
log.Printf("\tCharacter: %s", object.ID)
case models.Channel:
log.Printf("\tChannel: %s", object.Name)
case models.Post:
log.Printf("\tPost: %s", object.ID)
case models.Story:
log.Printf("\tStory: %s", object.ID)
case models.Tag:
log.Printf("\tTag: (%s,%s)", object.Kind, object.Name)
case models.Chapter:
log.Printf("\tChapter: %s", object.ID)
}
}
}
log.Println("Change subscription closed.")
}
func queryHandler() http.HandlerFunc {
handler := handler.GraphQL(
graph2.New(),

4
graph2/graph.go

@ -26,6 +26,10 @@ func (r *rootResolver) Mutation() MutationResolver {
return queries.MutationResolver
}
func (r *rootResolver) Subscription() SubscriptionResolver {
return queries.SubscriptionResolver
}
func (r *rootResolver) Log() LogResolver {
return &types.LogResolver
}

11
graph2/queries/changes.go

@ -2,6 +2,7 @@ package queries
import (
"context"
"errors"
"git.aiterp.net/rpdata/api/models"
"git.aiterp.net/rpdata/api/models/changes"
@ -12,3 +13,13 @@ import (
func (r *resolver) Changes(ctx context.Context, filter *changes.Filter) ([]models.Change, error) {
return changes.List(filter)
}
/// Subscriptions
func (r *subscriptionResolver) Changes(ctx context.Context, keys []models.ChangeKey) (<-chan models.Change, error) {
if len(keys) == 0 {
return nil, errors.New("At least one key is required for a subscription")
}
return changes.Subscribe(ctx, keys, false), nil
}

4
graph2/queries/resolver.go

@ -2,9 +2,13 @@ package queries
type resolver struct{}
type mutationResolver struct{}
type subscriptionResolver struct{}
// Resolver has all the queries
var Resolver resolver
// MutationResolver brings the mutagens.
var MutationResolver *mutationResolver
// SubscriptionResolver has the real-time magic.
var SubscriptionResolver *subscriptionResolver

8
graph2/schema/root.gql

@ -1,6 +1,7 @@
schema {
query: Query
mutation: Mutation
subscription: Subscription
}
type Query {
@ -141,6 +142,13 @@ type Mutation {
editChannel(input: ChannelEditInput!): Channel!
}
type Subscription {
"""
Changes subscribes to the changes matching the following keys.
"""
changes(keys: [ChangeKeyInput!]!): Change!
}
# A Date represents a RFC3339 encoded date with up to millisecond precision.
scalar Date

5
models/changes/submit.go

@ -1,6 +1,7 @@
package changes
import (
"log"
"strconv"
"time"
@ -80,10 +81,12 @@ func Submit(model, op, author string, listed bool, keys []models.ChangeKey, obje
change.Chapters = append(change.Chapters, object)
case []models.Chapter:
change.Chapters = append(change.Chapters, object...)
default:
log.Printf("Warning: unrecognized object in change: %#+v", object)
}
}
// TODO: Push to subscribers
pushToSubscribers(change)
err = collection.Insert(&change)
if err != nil {

87
models/changes/subscribe.go

@ -0,0 +1,87 @@
package changes
import (
"context"
"sync"
"git.aiterp.net/rpdata/api/models"
)
var subMutex sync.Mutex
var subList []*subscription
type subscription struct {
Keys map[string]bool
Channel chan<- models.Change
WildCard bool
}
// Subscribe subscribes to all changes.
func Subscribe(ctx context.Context, keys []models.ChangeKey, wildcard bool) <-chan models.Change {
channel := make(chan models.Change, 64)
sub := &subscription{
Keys: make(map[string]bool, len(keys)),
Channel: channel,
WildCard: wildcard,
}
for _, key := range keys {
sub.Keys[mapKey(key)] = true
}
subMutex.Lock()
subList = append(subList, sub)
subMutex.Unlock()
go func() {
<-ctx.Done()
subMutex.Lock()
for i := range subList {
if subList[i] == sub {
subList = append(subList[:i], subList[i+1:]...)
}
}
subMutex.Unlock()
}()
return channel
}
func pushToSubscribers(change models.Change) {
keys := make([]string, len(change.Keys))
for i := range change.Keys {
keys[i] = mapKey(change.Keys[i])
}
subMutex.Lock()
SubLoop:
for _, sub := range subList {
if sub.WildCard && change.Listed {
select {
case sub.Channel <- change:
default:
}
} else {
for _, key := range keys {
if sub.Keys[key] {
select {
case sub.Channel <- change:
default:
}
continue SubLoop
}
}
}
}
subMutex.Unlock()
}
func mapKey(ck models.ChangeKey) string {
return ck.Model.String() + "." + ck.ID
}
func init() {
subList = make([]*subscription, 0, 16)
}
Loading…
Cancel
Save