diff --git a/archive/src/main/kotlin/org/openrs2/archive/cache/Js5ChannelHandler.kt b/archive/src/main/kotlin/org/openrs2/archive/cache/Js5ChannelHandler.kt index 94b95650..39b2a225 100644 --- a/archive/src/main/kotlin/org/openrs2/archive/cache/Js5ChannelHandler.kt +++ b/archive/src/main/kotlin/org/openrs2/archive/cache/Js5ChannelHandler.kt @@ -7,6 +7,7 @@ import io.netty.channel.ChannelHandler import io.netty.channel.ChannelHandlerContext import io.netty.channel.ChannelPipeline import io.netty.channel.SimpleChannelInboundHandler +import io.netty.handler.timeout.ReadTimeoutException import kotlinx.coroutines.runBlocking import org.openrs2.buffer.crc32 import org.openrs2.buffer.use @@ -15,6 +16,7 @@ import org.openrs2.cache.Js5Compression import org.openrs2.cache.Js5Index import org.openrs2.cache.Js5MasterIndex import org.openrs2.cache.MasterIndexFormat +import java.nio.channels.ClosedChannelException import java.time.Instant import kotlin.coroutines.Continuation import kotlin.coroutines.resume @@ -46,12 +48,13 @@ public abstract class Js5ChannelHandler( ) private enum class State { - ACTIVE, + CONNECTING, CLIENT_OUT_OF_DATE, + CONNECTED, RESUMING_CONTINUATION } - private var state = State.ACTIVE + private var state = State.CONNECTING private var buildAttempts = 0 private var reconnectionAttempts = 0 private val inFlightRequests = mutableSetOf() @@ -69,11 +72,22 @@ public abstract class Js5ChannelHandler( protected abstract fun incrementVersion() override fun channelActive(ctx: ChannelHandlerContext) { + assert(state == State.CONNECTING) + ctx.writeAndFlush(createInitMessage(), ctx.voidPromise()) ctx.read() } override fun channelReadComplete(ctx: ChannelHandlerContext) { + /* + * Wait for us to receive the OK message before we send JS5 requests, + * as the RS3 JS5 server ignores any JS5 requests sent before the OK + * message is received. + */ + if (state != State.CONNECTED) { + return + } + var flush = false while (inFlightRequests.size < maxInFlightRequests) { @@ -97,7 +111,7 @@ public abstract class Js5ChannelHandler( override fun channelInactive(ctx: ChannelHandlerContext) { if (state == State.CLIENT_OUT_OF_DATE) { - state = State.ACTIVE + state = State.CONNECTING bootstrap.connect(hostname, port) } else if (state != State.RESUMING_CONTINUATION) { if (isComplete()) { @@ -132,7 +146,7 @@ public abstract class Js5ChannelHandler( inFlightRequests.clear() // re-connect - state = State.ACTIVE + state = State.CONNECTING bootstrap.connect(hostname, port) } } @@ -140,12 +154,26 @@ public abstract class Js5ChannelHandler( override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) { releaseGroups() - state = State.RESUMING_CONTINUATION - ctx.close() - continuation.resumeWithException(cause) + if (state == State.RESUMING_CONTINUATION) { + logger.warn(cause) { "Swallowing exception as continuation has already resumed" } + } else if (cause != ReadTimeoutException.INSTANCE) { + /* + * We skip continuation resumption if there's a read timeout - this + * allows channelInactive() to attempt to reconnect if we haven't + * used too many reconnection attempts. + */ + state = State.RESUMING_CONTINUATION + continuation.resumeWithException(cause) + } + + if (cause !is ClosedChannelException) { + ctx.close() + } } protected fun handleOk(ctx: ChannelHandlerContext) { + assert(state == State.CONNECTING) + configurePipeline(ctx.pipeline()) val msg = createConnectedMessage() @@ -153,12 +181,16 @@ public abstract class Js5ChannelHandler( ctx.write(msg, ctx.voidPromise()) } + state = State.CONNECTED + if (masterIndex == null && pendingRequests.isEmpty()) { request(ctx, Js5Archive.ARCHIVESET, Js5Archive.ARCHIVESET, 0, 0) } } protected fun handleClientOutOfDate(ctx: ChannelHandlerContext) { + assert(state == State.CONNECTING) + if (++buildAttempts > maxBuildAttempts) { throw Exception("Failed to identify current version") } @@ -216,8 +248,9 @@ public abstract class Js5ChannelHandler( } state = State.RESUMING_CONTINUATION - ctx.close() continuation.resume(Unit) + + ctx.close() } }