Fix reconnection on client-side timeouts in the JS5 client

Signed-off-by: Graham <gpe@openrs2.org>
bzip2
Graham 3 years ago
parent 3ce76abde7
commit df55c3ece3
  1. 49
      archive/src/main/kotlin/org/openrs2/archive/cache/Js5ChannelHandler.kt

@ -7,6 +7,7 @@ import io.netty.channel.ChannelHandler
import io.netty.channel.ChannelHandlerContext import io.netty.channel.ChannelHandlerContext
import io.netty.channel.ChannelPipeline import io.netty.channel.ChannelPipeline
import io.netty.channel.SimpleChannelInboundHandler import io.netty.channel.SimpleChannelInboundHandler
import io.netty.handler.timeout.ReadTimeoutException
import kotlinx.coroutines.runBlocking import kotlinx.coroutines.runBlocking
import org.openrs2.buffer.crc32 import org.openrs2.buffer.crc32
import org.openrs2.buffer.use import org.openrs2.buffer.use
@ -15,6 +16,7 @@ import org.openrs2.cache.Js5Compression
import org.openrs2.cache.Js5Index import org.openrs2.cache.Js5Index
import org.openrs2.cache.Js5MasterIndex import org.openrs2.cache.Js5MasterIndex
import org.openrs2.cache.MasterIndexFormat import org.openrs2.cache.MasterIndexFormat
import java.nio.channels.ClosedChannelException
import java.time.Instant import java.time.Instant
import kotlin.coroutines.Continuation import kotlin.coroutines.Continuation
import kotlin.coroutines.resume import kotlin.coroutines.resume
@ -46,12 +48,13 @@ public abstract class Js5ChannelHandler(
) )
private enum class State { private enum class State {
ACTIVE, CONNECTING,
CLIENT_OUT_OF_DATE, CLIENT_OUT_OF_DATE,
CONNECTED,
RESUMING_CONTINUATION RESUMING_CONTINUATION
} }
private var state = State.ACTIVE private var state = State.CONNECTING
private var buildAttempts = 0 private var buildAttempts = 0
private var reconnectionAttempts = 0 private var reconnectionAttempts = 0
private val inFlightRequests = mutableSetOf<InFlightRequest>() private val inFlightRequests = mutableSetOf<InFlightRequest>()
@ -69,11 +72,22 @@ public abstract class Js5ChannelHandler(
protected abstract fun incrementVersion() protected abstract fun incrementVersion()
override fun channelActive(ctx: ChannelHandlerContext) { override fun channelActive(ctx: ChannelHandlerContext) {
assert(state == State.CONNECTING)
ctx.writeAndFlush(createInitMessage(), ctx.voidPromise()) ctx.writeAndFlush(createInitMessage(), ctx.voidPromise())
ctx.read() ctx.read()
} }
override fun channelReadComplete(ctx: ChannelHandlerContext) { override fun channelReadComplete(ctx: ChannelHandlerContext) {
/*
* Wait for us to receive the OK message before we send JS5 requests,
* as the RS3 JS5 server ignores any JS5 requests sent before the OK
* message is received.
*/
if (state != State.CONNECTED) {
return
}
var flush = false var flush = false
while (inFlightRequests.size < maxInFlightRequests) { while (inFlightRequests.size < maxInFlightRequests) {
@ -97,7 +111,7 @@ public abstract class Js5ChannelHandler(
override fun channelInactive(ctx: ChannelHandlerContext) { override fun channelInactive(ctx: ChannelHandlerContext) {
if (state == State.CLIENT_OUT_OF_DATE) { if (state == State.CLIENT_OUT_OF_DATE) {
state = State.ACTIVE state = State.CONNECTING
bootstrap.connect(hostname, port) bootstrap.connect(hostname, port)
} else if (state != State.RESUMING_CONTINUATION) { } else if (state != State.RESUMING_CONTINUATION) {
if (isComplete()) { if (isComplete()) {
@ -132,7 +146,7 @@ public abstract class Js5ChannelHandler(
inFlightRequests.clear() inFlightRequests.clear()
// re-connect // re-connect
state = State.ACTIVE state = State.CONNECTING
bootstrap.connect(hostname, port) bootstrap.connect(hostname, port)
} }
} }
@ -140,12 +154,26 @@ public abstract class Js5ChannelHandler(
override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) { override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) {
releaseGroups() releaseGroups()
state = State.RESUMING_CONTINUATION if (state == State.RESUMING_CONTINUATION) {
ctx.close() logger.warn(cause) { "Swallowing exception as continuation has already resumed" }
continuation.resumeWithException(cause) } else if (cause != ReadTimeoutException.INSTANCE) {
/*
* We skip continuation resumption if there's a read timeout - this
* allows channelInactive() to attempt to reconnect if we haven't
* used too many reconnection attempts.
*/
state = State.RESUMING_CONTINUATION
continuation.resumeWithException(cause)
}
if (cause !is ClosedChannelException) {
ctx.close()
}
} }
protected fun handleOk(ctx: ChannelHandlerContext) { protected fun handleOk(ctx: ChannelHandlerContext) {
assert(state == State.CONNECTING)
configurePipeline(ctx.pipeline()) configurePipeline(ctx.pipeline())
val msg = createConnectedMessage() val msg = createConnectedMessage()
@ -153,12 +181,16 @@ public abstract class Js5ChannelHandler(
ctx.write(msg, ctx.voidPromise()) ctx.write(msg, ctx.voidPromise())
} }
state = State.CONNECTED
if (masterIndex == null && pendingRequests.isEmpty()) { if (masterIndex == null && pendingRequests.isEmpty()) {
request(ctx, Js5Archive.ARCHIVESET, Js5Archive.ARCHIVESET, 0, 0) request(ctx, Js5Archive.ARCHIVESET, Js5Archive.ARCHIVESET, 0, 0)
} }
} }
protected fun handleClientOutOfDate(ctx: ChannelHandlerContext) { protected fun handleClientOutOfDate(ctx: ChannelHandlerContext) {
assert(state == State.CONNECTING)
if (++buildAttempts > maxBuildAttempts) { if (++buildAttempts > maxBuildAttempts) {
throw Exception("Failed to identify current version") throw Exception("Failed to identify current version")
} }
@ -216,8 +248,9 @@ public abstract class Js5ChannelHandler(
} }
state = State.RESUMING_CONTINUATION state = State.RESUMING_CONTINUATION
ctx.close()
continuation.resume(Unit) continuation.resume(Unit)
ctx.close()
} }
} }

Loading…
Cancel
Save