You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 

83 lines
2.9 KiB

package net.aiterp.git.ykonsole2.application.routes.ws
import io.ktor.server.routing.*
import io.ktor.server.websocket.*
import io.ktor.websocket.*
import kotlinx.coroutines.Job
import kotlinx.coroutines.cancelAndJoin
import kotlinx.coroutines.launch
import net.aiterp.git.ykonsole2.application.logging.log
import net.aiterp.git.ykonsole2.application.plugins.ykObjectMapper
import net.aiterp.git.ykonsole2.application.routes.ValueDTO
import net.aiterp.git.ykonsole2.application.routes.WorkoutDTO
import net.aiterp.git.ykonsole2.domain.models.*
import net.aiterp.git.ykonsole2.domain.runtime.*
import java.lang.Exception
private object WebSocketHandler
fun Route.sockets(
deviceRepo: DeviceRepository,
programRepo: ProgramRepository,
workoutRepo: WorkoutRepository,
workoutStateRepo: WorkoutStateRepository,
commandBus: CommandBus,
eventBus: EventBus,
) {
val log = WebSocketHandler.log
webSocket("/workouts/active") {
val clientId = "ws-client-${randomId()}"
val active = workoutRepo.findActive()
if (active != null) {
var job: Job? = null
try {
log.info("$clientId: Sending workout metadata...")
val device = deviceRepo.findById(active.deviceId)!!
val program = active.programId?.let { programRepo.findById(it) }
sendSerialized(SocketOutput(workout = WorkoutDTO(active, device, program)))
log.info("$clientId: Sending workout states...")
val workoutStates = workoutStateRepo.fetchByWorkoutId(active.id)
sendSerialized(SocketOutput(workoutStates = workoutStates.map { ValueDTO.from(it) }))
job = launch {
log.info("$clientId: Starting event listener...")
eventBus.collect { event ->
when (event) {
Connected, Started, Stopped, Disconnected -> {
sendSerialized(SocketOutput(event = SocketOutput.EventDTO(name = event.name)))
if (event is Disconnected) close()
}
is ErrorOccurred -> sendSerialized(SocketOutput(error = SocketOutput.Error(event.message)))
is ValuesReceived -> sendSerialized(SocketOutput(workoutStates = listOf(ValueDTO.from(event.values))))
}
}
}
log.info("$clientId: Ready for incoming frames")
for (frame in incoming) {
frame as? Frame.Text ?: continue
val input: SocketInput = ykObjectMapper.readValue(frame.readText(), SocketInput::class.java)
input.makeCommands(device).forEach { commandBus.emit(it) }
}
} catch (e: Exception) {
log.info("$clientId: Interrupted by exception: ${e.message}", e)
} finally {
log.info("$clientId: Terminating event listener...")
job?.cancelAndJoin()
log.info("$clientId: Disconnected")
}
} else {
log.info("$clientId: Rejected due to missing workout")
sendSerialized(SocketOutput(error = SocketOutput.Error("No active workout found")))
}
}
}