Gisle Aune
6 years ago
7 changed files with 239 additions and 4 deletions
-
11cmd/rpdata-server/main.go
-
7graph2/queries/character.go
-
29graph2/queries/post.go
-
84internal/task/task.go
-
6models/characters/list.go
-
106models/logs/update-characters.go
-
BINtest.prof
@ -0,0 +1,84 @@ |
|||
package task |
|||
|
|||
import ( |
|||
"context" |
|||
"sync" |
|||
"time" |
|||
) |
|||
|
|||
var globalCtx, globalCancel = context.WithCancel(context.Background()) |
|||
|
|||
// A Task is a wrapper around a function that offers scheduling
|
|||
// and syncronization.
|
|||
type Task struct { |
|||
mutex sync.Mutex |
|||
ctx context.Context |
|||
ctxCancel context.CancelFunc |
|||
waitDuration time.Duration |
|||
scheduled bool |
|||
callback func() error |
|||
} |
|||
|
|||
// Context gets the task's context.
|
|||
func (task *Task) Context() context.Context { |
|||
return task.ctx |
|||
} |
|||
|
|||
// Stop stops a task. This should not be done to stop a single run,
|
|||
// but as a way to cancel it forever.
|
|||
func (task *Task) Stop() { |
|||
task.ctxCancel() |
|||
} |
|||
|
|||
// Schedule schedules a task for later execution.
|
|||
func (task *Task) Schedule() { |
|||
// Don't if the context is closed.
|
|||
select { |
|||
case <-task.Context().Done(): |
|||
return |
|||
default: |
|||
} |
|||
|
|||
task.mutex.Lock() |
|||
if task.scheduled { |
|||
task.mutex.Unlock() |
|||
return |
|||
} |
|||
task.scheduled = true |
|||
task.mutex.Unlock() |
|||
|
|||
go func() { |
|||
<-time.After(task.waitDuration) |
|||
|
|||
// Mark as no longer scheduled
|
|||
task.mutex.Lock() |
|||
task.scheduled = false |
|||
task.mutex.Unlock() |
|||
|
|||
// Don't if the task is cancelled
|
|||
select { |
|||
case <-task.Context().Done(): |
|||
return |
|||
default: |
|||
} |
|||
|
|||
task.callback() |
|||
}() |
|||
} |
|||
|
|||
// New makes a new task with the callback.
|
|||
func New(waitDuration time.Duration, callback func() error) *Task { |
|||
ctx, ctxCancel := context.WithCancel(globalCtx) |
|||
|
|||
return &Task{ |
|||
callback: callback, |
|||
ctx: ctx, |
|||
ctxCancel: ctxCancel, |
|||
waitDuration: waitDuration, |
|||
} |
|||
} |
|||
|
|||
// StopAll stops all tasks.
|
|||
func StopAll() { |
|||
globalCancel() |
|||
} |
@ -0,0 +1,106 @@ |
|||
package logs |
|||
|
|||
import ( |
|||
"strings" |
|||
"time" |
|||
|
|||
"git.aiterp.net/rpdata/api/internal/task" |
|||
"git.aiterp.net/rpdata/api/models" |
|||
"git.aiterp.net/rpdata/api/models/characters" |
|||
"git.aiterp.net/rpdata/api/models/posts" |
|||
"github.com/globalsign/mgo/bson" |
|||
) |
|||
|
|||
var updateTask = task.New(time.Second*60, RunFullUpdate) |
|||
|
|||
// UpdateCharacters updates the characters for the given log.
|
|||
func UpdateCharacters(log models.Log) (models.Log, error) { |
|||
posts, err := posts.List(&posts.Filter{LogID: &log.ShortID, Kind: []string{"action", "text", "chars"}, Limit: 0}) |
|||
if err != nil { |
|||
return models.Log{}, err |
|||
} |
|||
|
|||
added := make(map[string]bool) |
|||
removed := make(map[string]bool) |
|||
for _, post := range posts { |
|||
if post.Kind == "text" || post.Kind == "action" { |
|||
if strings.HasPrefix(post.Text, "(") || strings.Contains(post.Nick, "(") || strings.Contains(post.Nick, "[E]") { |
|||
continue |
|||
} |
|||
|
|||
// Clean up the nick (remove possessive suffix, comma, formatting stuff)
|
|||
if strings.HasSuffix(post.Nick, "'s") || strings.HasSuffix(post.Nick, "`s") { |
|||
post.Nick = post.Nick[:len(post.Nick)-2] |
|||
} else if strings.HasSuffix(post.Nick, "'") || strings.HasSuffix(post.Nick, "`") || strings.HasSuffix(post.Nick, ",") || strings.HasSuffix(post.Nick, "\x0f") { |
|||
post.Nick = post.Nick[:len(post.Nick)-1] |
|||
} |
|||
|
|||
added[post.Nick] = true |
|||
} |
|||
if post.Kind == "chars" { |
|||
tokens := strings.Fields(post.Text) |
|||
for _, token := range tokens { |
|||
if strings.HasPrefix(token, "-") { |
|||
removed[token[1:]] = true |
|||
} else { |
|||
added[strings.Replace(token, "+", "", 1)] = true |
|||
} |
|||
} |
|||
} |
|||
} |
|||
|
|||
nicks := make([]string, 0, len(added)) |
|||
for nick := range added { |
|||
if added[nick] && !removed[nick] { |
|||
nicks = append(nicks, nick) |
|||
} |
|||
} |
|||
|
|||
characters, err := characters.List(&characters.Filter{Nicks: nicks}) |
|||
if err != nil { |
|||
return models.Log{}, err |
|||
} |
|||
|
|||
characterIDs := make([]string, len(characters)) |
|||
for i, char := range characters { |
|||
characterIDs[i] = char.ID |
|||
} |
|||
|
|||
err = collection.UpdateId(log.ID, bson.M{"$set": bson.M{"characterIds": characterIDs}}) |
|||
if err != nil { |
|||
return models.Log{}, err |
|||
} |
|||
|
|||
log.CharacterIDs = characterIDs |
|||
|
|||
return log, nil |
|||
} |
|||
|
|||
// RunFullUpdate runs a full update on all logs.
|
|||
func RunFullUpdate() error { |
|||
iter := iter(bson.M{}, 0) |
|||
err := iter.Err() |
|||
if err != nil { |
|||
return err |
|||
} |
|||
|
|||
log := models.Log{} |
|||
for iter.Next(&log) { |
|||
_, err = UpdateCharacters(log) |
|||
if err != nil { |
|||
return err |
|||
} |
|||
} |
|||
|
|||
err = iter.Err() |
|||
if err != nil { |
|||
return err |
|||
} |
|||
|
|||
return nil |
|||
} |
|||
|
|||
// ScheduleFullUpdate runs a full character update within the next 60 seconds.
|
|||
func ScheduleFullUpdate() { |
|||
updateTask.Schedule() |
|||
} |
Write
Preview
Loading…
Cancel
Save
Reference in new issue