Browse Source

add event handler API and logic.

pull/1/head
Gisle Aune 3 years ago
parent
commit
56a0525b9d
  1. 71
      app/api/handlers.go
  2. 3
      app/config/db.go
  3. 1
      app/server.go
  4. 119
      app/services/events.go
  5. 2
      internal/mysql/eventhandlerrepo.go
  6. 53
      models/eventhandler.go

71
app/api/handlers.go

@ -0,0 +1,71 @@
package api
import (
"git.aiterp.net/lucifer/new-server/app/config"
"git.aiterp.net/lucifer/new-server/models"
"github.com/gin-gonic/gin"
)
func EventHandlers(r gin.IRoutes) {
r.GET("", handler(func(c *gin.Context) (interface{}, error) {
return config.EventHandlerRepository().FetchAll(ctxOf(c))
}))
r.GET("/:id", handler(func(c *gin.Context) (interface{}, error) {
return config.EventHandlerRepository().FindByID(ctxOf(c), intParam(c, "id"))
}))
r.POST("", handler(func(c *gin.Context) (interface{}, error) {
var body models.EventHandler
err := parseBody(c, &body)
if err != nil {
return nil, err
}
if body.Conditions == nil {
body.Conditions = make(map[string]models.EventCondition)
}
err = config.EventHandlerRepository().Save(ctxOf(c), &body)
if err != nil {
return nil, err
}
return body, nil
}))
r.PUT("/:id", handler(func(c *gin.Context) (interface{}, error) {
var body models.EventHandlerUpdate
err := parseBody(c, &body)
if err != nil {
return nil, err
}
handler, err := config.EventHandlerRepository().FindByID(ctxOf(c), intParam(c, "id"))
if err != nil {
return nil, err
}
handler.ApplyUpdate(body)
err = config.EventHandlerRepository().Save(ctxOf(c), handler)
if err != nil {
return nil, err
}
return handler, nil
}))
r.DELETE("/:id", handler(func(c *gin.Context) (interface{}, error) {
handler, err := config.EventHandlerRepository().FindByID(ctxOf(c), intParam(c, "id"))
if err != nil {
return nil, err
}
err = config.EventHandlerRepository().Delete(ctxOf(c), handler)
if err != nil {
return nil, err
}
return handler, nil
}))
}

3
app/config/db.go

@ -23,6 +23,9 @@ func DBX() *sqlx.DB {
MySqlPort(),
MySqlSchema(),
))
dbx.SetMaxIdleConns(50)
dbx.SetMaxOpenConns(100)
}
return dbx

1
app/server.go

@ -24,6 +24,7 @@ func StartServer() {
api.ColorPresets(apiGin.Group("/color-presets"))
api.DriverKinds(apiGin.Group("/driver-kinds"))
api.Events(apiGin.Group("/events"))
api.EventHandlers(apiGin.Group("/event-handlers"))
log.Fatal(ginny.Run(fmt.Sprintf("0.0.0.0:%d", config.ServerPort())))
}

119
app/services/events.go

@ -6,36 +6,54 @@ import (
"git.aiterp.net/lucifer/new-server/app/config"
"git.aiterp.net/lucifer/new-server/models"
"log"
"math"
"sort"
"strconv"
"strings"
"sync"
"time"
)
func StartEventHandler() {
config.EventChannel<-models.Event{Name: "LuciferStarted"}
config.EventChannel <- models.Event{Name: "LuciferStarted"}
go func() {
for event := range config.EventChannel {
handleEvent(event)
responses := handleEvent(event)
for _, response := range responses {
handleEvent(response)
}
}
}()
// Generate TimeChanged event
go func() {
drift := time.Now().Add(time.Minute).Truncate(time.Minute).Sub(time.Now())
time.Sleep(drift + time.Millisecond * 5)
time.Sleep(drift + time.Millisecond*5)
for range time.NewTicker(time.Minute).C {
config.EventChannel <- models.Event{Name: "TimeChanged"}
config.EventChannel <- models.Event{Name: "TimeChanged", Payload: map[string]string{
"weekdayName": time.Now().In(loc).Format("Monday"),
"month": time.Now().In(loc).Format("01"),
"year": time.Now().In(loc).Format("2006"),
}}
}
}()
}
var loc, _ = time.LoadLocation("Europe/Oslo")
var ctx = context.Background()
var X = false
func handleEvent(event models.Event) {
func handleEvent(event models.Event) (responses []models.Event) {
startTime := time.Now()
defer func() {
duration := time.Since(startTime)
if duration > time.Millisecond * 250 {
log.Println(event.Name, "took long to handle:", duration)
}
}()
if !event.HasPayload("hour") {
event.AddPayload("hour", time.Now().In(loc).Format("15"))
}
@ -49,33 +67,102 @@ func handleEvent(event models.Event) {
return
}
if !X {
paramStrings := make([]string, 0, 8)
for key, value := range event.Payload {
paramStrings = append(paramStrings, fmt.Sprintf("%s=%s", key, value))
}
log.Printf("Unhandled event %s(%s)", event.Name, strings.Join(paramStrings, ", "))
return
paramStrings := make([]string, 0, 8)
for key, value := range event.Payload {
paramStrings = append(paramStrings, fmt.Sprintf("%s=%s", key, value))
}
sort.Strings(paramStrings)
log.Printf("Event %s(%s)", event.Name, strings.Join(paramStrings, ", "))
handlers, err := config.EventHandlerRepository().FetchAll(ctx)
if err != nil {
log.Printf("Error fetchin event halders: %d", err)
log.Printf("Error fetchin' event halders: %s", err)
return
}
allDevices := make([]models.Device, 0, 16)
actions := make(map[int]models.EventAction, 16)
priorities := make(map[int]int, 16)
highestPriority := 0
var prioritizedEvent *models.Event
for _, handler := range handlers {
if handler.EventName != event.Name {
continue
}
devices, err := config.DeviceRepository().FetchByReference(ctx, handler.TargetKind, handler.TargetValue)
if err != nil {
log.Printf("Error fetchin event halders: %d", err)
log.Printf("Error fetchin' event devices: %s", err)
return
}
if !handler.MatchesEvent(event, devices) {
continue
}
if handler.Priority > highestPriority {
highestPriority = handler.Priority
prioritizedEvent = handler.Actions.FireEvent
}
for _, device := range devices {
_, ok := actions[device.ID]
if !ok {
allDevices = append(allDevices, device)
actions[device.ID] = handler.Actions
priorities[device.ID] = handler.Priority
} else if handler.Priority > priorities[device.ID] {
actions[device.ID] = handler.Actions
priorities[device.ID] = handler.Priority
}
}
}
if prioritizedEvent != nil {
responses = append(responses, *prioritizedEvent)
}
for i, device := range allDevices {
action := actions[device.ID]
newState := models.NewDeviceState{}
newState.Color = action.SetColor
newState.Power = action.SetPower
if action.SetIntensity != nil {
newState.Intensity = action.SetIntensity
} else if action.AddIntensity != nil {
newIntensity := math.Max(0, math.Min(1, device.State.Intensity + *action.AddIntensity))
newState.Intensity = &newIntensity
}
err = allDevices[i].SetState(newState)
if err != nil {
log.Println("Error updating state for device", device.ID, "err:", err)
}
}
config.PublishChannel <- allDevices
wg := sync.WaitGroup{}
for _, device := range allDevices {
wg.Add(1)
go func(device models.Device) {
err := config.DeviceRepository().Save(context.Background(), &device)
if err != nil {
log.Println("Failed to save device for state:", err)
}
wg.Done()
}(device)
}
wg.Wait()
return
}
func handleSpecial(event models.Event) error {

2
internal/mysql/eventhandlerrepo.go

@ -34,7 +34,7 @@ func (r *EventHandlerRepo) FindByID(ctx context.Context, id int) (*models.EventH
func (r *EventHandlerRepo) FetchAll(ctx context.Context) ([]models.EventHandler, error) {
var records []eventHandlerRecord
err := r.DBX.SelectContext(ctx, &records, "SELECT * FROM event_handler")
err := r.DBX.SelectContext(ctx, &records, "SELECT * FROM event_handler ORDER BY priority DESC, id")
if err != nil {
return nil, dbErr(err)
}

53
models/eventhandler.go

@ -13,7 +13,7 @@ type EventHandler struct {
Conditions map[string]EventCondition `json:"conditions"`
OneShot bool `json:"oneShot"`
Priority int `json:"priority"`
TargetKind ReferenceKind `json:"targetType"`
TargetKind ReferenceKind `json:"targetKind"`
TargetValue string `json:"targetValue"`
Actions EventAction `json:"actions"`
}
@ -33,13 +33,58 @@ type EventCondition struct {
LTE string `json:"lte,omitempty"`
}
type EventHandlerUpdate struct {
SetEventName *string `json:"setEventName"`
SetPriority *int `json:"setPriority"`
SetTargetKind *ReferenceKind `json:"setTargetKind"`
SetTargetValue *string `json:"setTargetValue"`
SetConditions map[string]EventCondition `json:"setConditions"`
PatchConditions map[string]*EventCondition `json:"patchConditions"`
SetActions *EventAction `json:"setActions"`
}
func (h *EventHandler) ApplyUpdate(update EventHandlerUpdate) {
if update.SetEventName != nil {
h.EventName = *update.SetEventName
}
if update.SetPriority != nil {
h.Priority = *update.SetPriority
}
if update.SetTargetKind != nil {
h.TargetKind = *update.SetTargetKind
}
if update.SetTargetValue != nil {
h.TargetValue = *update.SetTargetValue
}
if update.SetActions != nil {
h.Actions = *update.SetActions
}
if update.SetConditions != nil {
h.Conditions = update.SetConditions
}
if update.PatchConditions != nil {
if h.Conditions == nil {
h.Conditions = make(map[string]EventCondition)
}
for key, value := range update.PatchConditions {
if value == nil {
delete(h.Conditions, key)
} else {
h.Conditions[key] = *value
}
}
}
}
func (h *EventHandler) MatchesEvent(event Event, targets []Device) bool {
if event.Name != h.EventName {
return false
}
for key, condition := range h.Conditions {
if !event.HasPayload(key) {
if !strings.ContainsRune(key, '.') && !event.HasPayload(key) {
return false
}
@ -128,6 +173,7 @@ type EventAction struct {
SetColor *string `json:"setColor"`
SetIntensity *float64 `json:"setIntensity"`
AddIntensity *float64 `json:"addIntensity"`
FireEvent *Event `json:"fireEvent"`
}
func (action *EventAction) Apply(other EventAction) {
@ -143,4 +189,7 @@ func (action *EventAction) Apply(other EventAction) {
if action.AddIntensity == nil && action.SetIntensity == nil {
action.AddIntensity = other.AddIntensity
}
if action.FireEvent == nil {
action.FireEvent = other.FireEvent
}
}
Loading…
Cancel
Save