|
|
package net.aiterp.git.ykonsole2.application.routes.ws
import io.ktor.server.routing.* import io.ktor.server.websocket.* import io.ktor.websocket.* import kotlinx.coroutines.Job import kotlinx.coroutines.cancelAndJoin import kotlinx.coroutines.launch import net.aiterp.git.ykonsole2.application.logging.log import net.aiterp.git.ykonsole2.application.plugins.ykObjectMapper import net.aiterp.git.ykonsole2.application.routes.MilestoneDTO import net.aiterp.git.ykonsole2.application.routes.ValueDTO import net.aiterp.git.ykonsole2.application.routes.WorkoutDTO import net.aiterp.git.ykonsole2.domain.models.* import net.aiterp.git.ykonsole2.domain.runtime.* import java.lang.Exception
private object WebSocketHandler
fun Route.sockets( deviceRepo: DeviceRepository, programRepo: ProgramRepository, workoutRepo: WorkoutRepository, workoutStateRepo: WorkoutStateRepository, commandBus: CommandBus, eventBus: EventBus, ) { val log = WebSocketHandler.log
webSocket("/workouts/active") { val clientId = "ws-client-${randomId()}"
val active = workoutRepo.findActive() if (active != null) { var job: Job? = null
try { log.info("$clientId: Sending workout metadata...") val device = deviceRepo.findById(active.deviceId)!! val program = active.programId?.let { programRepo.findById(it) } sendSerialized(SocketOutput(workout = WorkoutDTO(active, device, program)))
log.info("$clientId: Sending workout states and milestones...") val workoutStates = workoutStateRepo.fetchByWorkoutId(active.id) val mrEvents = workoutStates.makeMilestoneReachedEvents() sendSerialized(SocketOutput( workoutStates = workoutStates.map { ValueDTO.from(it) }, oldMilestones = mrEvents.map { MilestoneDTO.from(it) }, ))
job = launch { log.info("$clientId: Starting event listener...")
eventBus.collect(forceAll = true) { event -> when (event) { is Connected, Started, Stopped, Disconnected, Skipped -> { sendSerialized(SocketOutput(event = SocketOutput.EventDTO(name = event.name)))
if (event is Disconnected) close() }
is ErrorOccurred -> sendSerialized(SocketOutput(error = SocketOutput.Error(event.message))) is ValuesReceived -> sendSerialized(SocketOutput(workoutStates = listOf(ValueDTO.from(event.values)))) is MilestoneReached -> sendSerialized(SocketOutput(milestone = MilestoneDTO.from(event))) } } }
log.info("$clientId: Ready for incoming frames")
for (frame in incoming) { frame as? Frame.Text ?: continue
val input: SocketInput = ykObjectMapper.readValue(frame.readText(), SocketInput::class.java) input.makeCommands(device).forEach { commandBus.emit(it) } } } catch (e: Exception) { log.info("$clientId: Interrupted by exception: ${e.message}", e) } finally { log.info("$clientId: Terminating event listener...") job?.cancelAndJoin() log.info("$clientId: Disconnected") } } else { log.info("$clientId: Rejected due to missing workout") sendSerialized(SocketOutput(error = SocketOutput.Error("No active workout found"))) } } }
|