You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 

89 lines
3.3 KiB

package net.aiterp.git.ykonsole2.application.routes.ws
import io.ktor.server.routing.*
import io.ktor.server.websocket.*
import io.ktor.websocket.*
import kotlinx.coroutines.Job
import kotlinx.coroutines.cancelAndJoin
import kotlinx.coroutines.launch
import net.aiterp.git.ykonsole2.application.logging.log
import net.aiterp.git.ykonsole2.application.plugins.ykObjectMapper
import net.aiterp.git.ykonsole2.application.routes.MilestoneDTO
import net.aiterp.git.ykonsole2.application.routes.ValueDTO
import net.aiterp.git.ykonsole2.application.routes.WorkoutDTO
import net.aiterp.git.ykonsole2.domain.models.*
import net.aiterp.git.ykonsole2.domain.runtime.*
import java.lang.Exception
private object WebSocketHandler
fun Route.sockets(
deviceRepo: DeviceRepository,
programRepo: ProgramRepository,
workoutRepo: WorkoutRepository,
workoutStateRepo: WorkoutStateRepository,
commandBus: CommandBus,
eventBus: EventBus,
) {
val log = WebSocketHandler.log
webSocket("/workouts/active") {
val clientId = "ws-client-${randomId()}"
val active = workoutRepo.findActive()
if (active != null) {
var job: Job? = null
try {
log.info("$clientId: Sending workout metadata...")
val device = deviceRepo.findById(active.deviceId)!!
val program = active.programId?.let { programRepo.findById(it) }
sendSerialized(SocketOutput(workout = WorkoutDTO(active, device, program)))
log.info("$clientId: Sending workout states and milestones...")
val workoutStates = workoutStateRepo.fetchByWorkoutId(active.id)
val mrEvents = workoutStates.makeMilestoneReachedEvents()
sendSerialized(SocketOutput(
workoutStates = workoutStates.map { ValueDTO.from(it) },
oldMilestones = mrEvents.map { MilestoneDTO.from(it) },
))
job = launch {
log.info("$clientId: Starting event listener...")
eventBus.collect(forceAll = true) { event ->
when (event) {
Connected, Started, Stopped, Disconnected, Skipped -> {
sendSerialized(SocketOutput(event = SocketOutput.EventDTO(name = event.name)))
if (event is Disconnected) close()
}
is ErrorOccurred -> sendSerialized(SocketOutput(error = SocketOutput.Error(event.message)))
is ValuesReceived -> sendSerialized(SocketOutput(workoutStates = listOf(ValueDTO.from(event.values))))
is MilestoneReached -> sendSerialized(SocketOutput(milestone = MilestoneDTO.from(event)))
}
}
}
log.info("$clientId: Ready for incoming frames")
for (frame in incoming) {
frame as? Frame.Text ?: continue
val input: SocketInput = ykObjectMapper.readValue(frame.readText(), SocketInput::class.java)
input.makeCommands(device).forEach { commandBus.emit(it) }
}
} catch (e: Exception) {
log.info("$clientId: Interrupted by exception: ${e.message}", e)
} finally {
log.info("$clientId: Terminating event listener...")
job?.cancelAndJoin()
log.info("$clientId: Disconnected")
}
} else {
log.info("$clientId: Rejected due to missing workout")
sendSerialized(SocketOutput(error = SocketOutput.Error("No active workout found")))
}
}
}