From 6f02ab2f657d55f3977a341d14ec2c72efeba619 Mon Sep 17 00:00:00 2001 From: Graham Date: Wed, 7 Jul 2021 21:03:21 +0100 Subject: [PATCH] Add NXT cache downloader I'm still not particularly happy with this: if the JS5 download finishes before HTTP, it'll time out and kill the whole process. Similarly, because it takes so long to import the indexes and as we can't fetch groups in parallel with that, it can often time out early during the process. In the long term, I think I am going to try and move most of the logic outside of the Netty threads and communicate between threads with queues or channels. This would also allow us to run multiple JS5 clients in parallel. The code also needs some tidying up, particularly constants in the Js5ChannelHandler constructors. Signed-off-by: Graham --- .../openrs2/archive/cache/CacheDownloader.kt | 74 ++++++-- .../openrs2/archive/cache/CacheExporter.kt | 112 +++++++++---- .../openrs2/archive/cache/CacheImporter.kt | 69 +++++--- .../openrs2/archive/cache/ImportCommand.kt | 5 +- .../archive/cache/ImportMasterIndexCommand.kt | 5 +- .../archive/cache/Js5ChannelHandler.kt | 142 +++++++++------- .../archive/cache/NxtJs5ChannelHandler.kt | 158 ++++++++++++++++++ .../archive/cache/NxtJs5ChannelInitializer.kt | 22 +++ .../archive/cache/OsrsJs5ChannelHandler.kt | 77 +++++++++ ...alizer.kt => OsrsJs5ChannelInitializer.kt} | 6 +- .../archive/cache/nxt/ClientOutOfDateCodec.kt | 8 + .../cache/nxt/InitJs5RemoteConnection.kt | 10 ++ .../cache/nxt/InitJs5RemoteConnectionCodec.kt | 29 ++++ .../openrs2/archive/cache/nxt/Js5OkCodec.kt | 25 +++ .../openrs2/archive/cache/nxt/Js5Request.kt | 14 ++ .../archive/cache/nxt/Js5RequestEncoder.kt | 36 ++++ .../openrs2/archive/cache/nxt/Js5Response.kt | 11 ++ .../archive/cache/nxt/Js5ResponseDecoder.kt | 121 ++++++++++++++ .../archive/cache/nxt/LoginResponse.kt | 13 ++ .../archive/cache/nxt/MusicStreamClient.kt | 30 ++++ .../kotlin/org/openrs2/archive/game/Game.kt | 8 +- .../org/openrs2/archive/game/GameDatabase.kt | 26 ++- .../openrs2/archive/migrations/V3__nxt.sql | 49 ++++++ .../org/openrs2/cache/Js5MasterIndex.kt | 43 +++-- .../org/openrs2/cache/Js5MasterIndexTest.kt | 30 +++- 25 files changed, 958 insertions(+), 165 deletions(-) create mode 100644 archive/src/main/kotlin/org/openrs2/archive/cache/NxtJs5ChannelHandler.kt create mode 100644 archive/src/main/kotlin/org/openrs2/archive/cache/NxtJs5ChannelInitializer.kt create mode 100644 archive/src/main/kotlin/org/openrs2/archive/cache/OsrsJs5ChannelHandler.kt rename archive/src/main/kotlin/org/openrs2/archive/cache/{Js5ChannelInitializer.kt => OsrsJs5ChannelInitializer.kt} (67%) create mode 100644 archive/src/main/kotlin/org/openrs2/archive/cache/nxt/ClientOutOfDateCodec.kt create mode 100644 archive/src/main/kotlin/org/openrs2/archive/cache/nxt/InitJs5RemoteConnection.kt create mode 100644 archive/src/main/kotlin/org/openrs2/archive/cache/nxt/InitJs5RemoteConnectionCodec.kt create mode 100644 archive/src/main/kotlin/org/openrs2/archive/cache/nxt/Js5OkCodec.kt create mode 100644 archive/src/main/kotlin/org/openrs2/archive/cache/nxt/Js5Request.kt create mode 100644 archive/src/main/kotlin/org/openrs2/archive/cache/nxt/Js5RequestEncoder.kt create mode 100644 archive/src/main/kotlin/org/openrs2/archive/cache/nxt/Js5Response.kt create mode 100644 archive/src/main/kotlin/org/openrs2/archive/cache/nxt/Js5ResponseDecoder.kt create mode 100644 archive/src/main/kotlin/org/openrs2/archive/cache/nxt/LoginResponse.kt create mode 100644 archive/src/main/kotlin/org/openrs2/archive/cache/nxt/MusicStreamClient.kt create mode 100644 archive/src/main/resources/org/openrs2/archive/migrations/V3__nxt.sql diff --git a/archive/src/main/kotlin/org/openrs2/archive/cache/CacheDownloader.kt b/archive/src/main/kotlin/org/openrs2/archive/cache/CacheDownloader.kt index e3ccde01..c0c5a270 100644 --- a/archive/src/main/kotlin/org/openrs2/archive/cache/CacheDownloader.kt +++ b/archive/src/main/kotlin/org/openrs2/archive/cache/CacheDownloader.kt @@ -1,7 +1,9 @@ package org.openrs2.archive.cache +import org.openrs2.archive.cache.nxt.MusicStreamClient import org.openrs2.archive.game.GameDatabase import org.openrs2.archive.jav.JavConfig +import org.openrs2.buffer.ByteBufBodyHandler import org.openrs2.net.BootstrapFactory import org.openrs2.net.awaitSuspend import java.net.URI @@ -13,6 +15,7 @@ import kotlin.coroutines.suspendCoroutine @Singleton public class CacheDownloader @Inject constructor( private val client: HttpClient, + private val byteBufBodyHandler: ByteBufBodyHandler, private val bootstrapFactory: BootstrapFactory, private val gameDatabase: GameDatabase, private val importer: CacheImporter @@ -21,28 +24,67 @@ public class CacheDownloader @Inject constructor( val game = gameDatabase.getGame(gameName) ?: throw Exception("Game not found") val url = game.url ?: throw Exception("URL not set") - val build = game.build ?: throw Exception("Current build not set") + val buildMajor = game.buildMajor ?: throw Exception("Current major build not set") val config = JavConfig.download(client, url) - val codebase = config.config[CODEBASE] ?: throw Exception("Codebase missing") - val hostname = URI(codebase).host ?: throw Exception("Hostname missing") val group = bootstrapFactory.createEventLoopGroup() try { suspendCoroutine { continuation -> val bootstrap = bootstrapFactory.createBootstrap(group) - val handler = Js5ChannelHandler( - bootstrap, - game.id, - hostname, - PORT, - build, - game.lastMasterIndexId, - continuation, - importer - ) - - bootstrap.handler(Js5ChannelInitializer(handler)) + + val hostname: String + + val initializer = when (gameName) { + "oldschool" -> { + val codebase = config.config[CODEBASE] ?: throw Exception("Codebase missing") + hostname = URI(codebase).host ?: throw Exception("Hostname missing") + + OsrsJs5ChannelInitializer( + OsrsJs5ChannelHandler( + bootstrap, + game.id, + hostname, + PORT, + buildMajor, + game.lastMasterIndexId, + continuation, + importer, + game.key + ) + ) + } + "runescape" -> { + val buildMinor = game.buildMinor ?: throw Exception("Current minor build not set") + + val tokens = config.params.values.filter { TOKEN_REGEX.matches(it) } + val token = tokens.singleOrNull() ?: throw Exception("Multiple candidate tokens: $tokens") + + hostname = NXT_HOSTNAME + + val musicStreamClient = MusicStreamClient(client, byteBufBodyHandler, "http://$hostname") + + NxtJs5ChannelInitializer( + NxtJs5ChannelHandler( + bootstrap, + game.id, + hostname, + PORT, + buildMajor, + buildMinor, + game.lastMasterIndexId, + continuation, + importer, + game.key, + token, + musicStreamClient + ) + ) + } + else -> throw UnsupportedOperationException() + } + + bootstrap.handler(initializer) .connect(hostname, PORT) } } finally { @@ -52,6 +94,8 @@ public class CacheDownloader @Inject constructor( private companion object { private const val CODEBASE = "codebase" + private const val NXT_HOSTNAME = "content.runescape.com" private const val PORT = 443 + private val TOKEN_REGEX = Regex("[A-Za-z0-9*-]{32}") } } diff --git a/archive/src/main/kotlin/org/openrs2/archive/cache/CacheExporter.kt b/archive/src/main/kotlin/org/openrs2/archive/cache/CacheExporter.kt index 4ce091f6..d05be3cb 100644 --- a/archive/src/main/kotlin/org/openrs2/archive/cache/CacheExporter.kt +++ b/archive/src/main/kotlin/org/openrs2/archive/cache/CacheExporter.kt @@ -11,6 +11,7 @@ import org.openrs2.cache.MasterIndexFormat import org.openrs2.cache.Store import org.openrs2.crypto.XteaKey import org.openrs2.db.Database +import org.postgresql.util.PGobject import java.time.Instant import java.util.SortedSet import javax.inject.Inject @@ -52,10 +53,43 @@ public class CacheExporter @Inject constructor( } } + public data class Build(val major: Int, val minor: Int?) : Comparable { + override fun compareTo(other: Build): Int { + return compareValuesBy(this, other, Build::major, Build::minor) + } + + override fun toString(): String { + return if (minor != null) { + "$major.$minor" + } else { + major.toString() + } + } + + internal companion object { + internal fun fromPgObject(o: PGobject): Build? { + val value = o.value!! + require(value.length >= 2) + + val parts = value.substring(1, value.length - 1).split(",") + require(parts.size == 2) + + val major = parts[0] + val minor = parts[1] + + if (major.isEmpty()) { + return null + } + + return Build(major.toInt(), if (minor.isEmpty()) null else minor.toInt()) + } + } + } + public data class CacheSummary( val id: Int, val game: String, - val builds: SortedSet, + val builds: SortedSet, val timestamp: Instant?, val names: SortedSet, val stats: Stats? @@ -70,7 +104,7 @@ public class CacheExporter @Inject constructor( public data class Source( val game: String, - val build: Int?, + val build: Build?, val timestamp: Instant?, val name: String?, val description: String?, @@ -90,26 +124,29 @@ public class CacheExporter @Inject constructor( return database.execute { connection -> connection.prepareStatement( """ - SELECT - m.id, - g.name, - array_remove(array_agg(DISTINCT s.build ORDER BY s.build ASC), NULL), - MIN(s.timestamp), - array_remove(array_agg(DISTINCT s.name ORDER BY s.name ASC), NULL), - ms.valid_indexes, - ms.indexes, - ms.valid_groups, - ms.groups, - ms.valid_keys, - ms.keys, - ms.size - FROM master_indexes m - JOIN sources s ON s.master_index_id = m.id - JOIN games g ON g.id = s.game_id - LEFT JOIN master_index_stats ms ON ms.master_index_id = m.id - GROUP BY m.id, g.name, ms.valid_indexes, ms.indexes, ms.valid_groups, ms.groups, ms.valid_keys, ms.keys, - ms.size - ORDER BY g.name ASC, MIN(s.build) ASC, MIN(s.timestamp) ASC + SELECT * + FROM ( + SELECT + m.id, + g.name, + array_remove(array_agg(DISTINCT ROW(s.build_major, s.build_minor)::build ORDER BY ROW(s.build_major, s.build_minor)::build ASC), NULL) builds, + MIN(s.timestamp) AS timestamp, + array_remove(array_agg(DISTINCT s.name ORDER BY s.name ASC), NULL) sources, + ms.valid_indexes, + ms.indexes, + ms.valid_groups, + ms.groups, + ms.valid_keys, + ms.keys, + ms.size + FROM master_indexes m + JOIN sources s ON s.master_index_id = m.id + JOIN games g ON g.id = s.game_id + LEFT JOIN master_index_stats ms ON ms.master_index_id = m.id + GROUP BY m.id, g.name, ms.valid_indexes, ms.indexes, ms.valid_groups, ms.groups, ms.valid_keys, ms.keys, + ms.size + ) t + ORDER BY t.name ASC, t.builds[1] ASC, t.timestamp ASC """.trimIndent() ).use { stmt -> stmt.executeQuery().use { rows -> @@ -118,7 +155,7 @@ public class CacheExporter @Inject constructor( while (rows.next()) { val id = rows.getInt(1) val game = rows.getString(2) - val builds = rows.getArray(3).array as Array + val builds = rows.getArray(3).array as Array val timestamp = rows.getTimestamp(4)?.toInstant() val names = rows.getArray(5).array as Array @@ -138,7 +175,7 @@ public class CacheExporter @Inject constructor( caches += CacheSummary( id, game, - builds.toSortedSet(), + builds.mapNotNull { o -> Build.fromPgObject(o as PGobject) }.toSortedSet(), timestamp, names.toSortedSet(), stats @@ -185,7 +222,7 @@ public class CacheExporter @Inject constructor( masterIndex = Unpooled.wrappedBuffer(rows.getBytes(2)).use { compressed -> Js5Compression.uncompress(compressed).use { uncompressed -> - Js5MasterIndex.read(uncompressed, format) + Js5MasterIndex.readUnverified(uncompressed, format) } } @@ -208,7 +245,7 @@ public class CacheExporter @Inject constructor( connection.prepareStatement( """ - SELECT g.name, s.build, s.timestamp, s.name, s.description, s.url + SELECT g.name, s.build_major, s.build_minor, s.timestamp, s.name, s.description, s.url FROM sources s JOIN games g ON g.id = s.game_id WHERE s.master_index_id = ? @@ -221,15 +258,26 @@ public class CacheExporter @Inject constructor( while (rows.next()) { val game = rows.getString(1) - var build: Int? = rows.getInt(2) + var buildMajor: Int? = rows.getInt(2) if (rows.wasNull()) { - build = null + buildMajor = null } - val timestamp = rows.getTimestamp(3)?.toInstant() - val name = rows.getString(4) - val description = rows.getString(5) - val url = rows.getString(6) + var buildMinor: Int? = rows.getInt(3) + if (rows.wasNull()) { + buildMinor = null + } + + val build = if (buildMajor != null) { + Build(buildMajor, buildMinor) + } else { + null + } + + val timestamp = rows.getTimestamp(4)?.toInstant() + val name = rows.getString(5) + val description = rows.getString(6) + val url = rows.getString(7) sources += Source(game, build, timestamp, name, description, url) } diff --git a/archive/src/main/kotlin/org/openrs2/archive/cache/CacheImporter.kt b/archive/src/main/kotlin/org/openrs2/archive/cache/CacheImporter.kt index f4a6f335..45bbe20e 100644 --- a/archive/src/main/kotlin/org/openrs2/archive/cache/CacheImporter.kt +++ b/archive/src/main/kotlin/org/openrs2/archive/cache/CacheImporter.kt @@ -88,7 +88,8 @@ public class CacheImporter @Inject constructor( public suspend fun import( store: Store, game: String, - build: Int?, + buildMajor: Int?, + buildMinor: Int?, timestamp: Instant?, name: String?, description: String?, @@ -117,7 +118,8 @@ public class CacheImporter @Inject constructor( SourceType.DISK, masterIndexId, gameId, - build, + buildMajor, + buildMinor, timestamp, name, description, @@ -182,21 +184,38 @@ public class CacheImporter @Inject constructor( buf: ByteBuf, format: MasterIndexFormat, game: String, - build: Int?, + buildMajor: Int?, + buildMinor: Int?, timestamp: Instant?, name: String?, description: String?, url: String? ) { Js5Compression.uncompress(buf.slice()).use { uncompressed -> - val masterIndex = MasterIndex(Js5MasterIndex.read(uncompressed.slice(), format), buf, uncompressed) + val masterIndex = MasterIndex( + Js5MasterIndex.readUnverified(uncompressed.slice(), format), + buf, + uncompressed + ) database.execute { connection -> prepare(connection) val gameId = getGameId(connection, game) val masterIndexId = addMasterIndex(connection, masterIndex) - addSource(connection, SourceType.DISK, masterIndexId, gameId, build, timestamp, name, description, url) + + addSource( + connection, + SourceType.DISK, + masterIndexId, + gameId, + buildMajor, + buildMinor, + timestamp, + name, + description, + url + ) } } } @@ -206,7 +225,8 @@ public class CacheImporter @Inject constructor( buf: ByteBuf, uncompressed: ByteBuf, gameId: Int, - build: Int, + buildMajor: Int, + buildMinor: Int?, lastId: Int?, timestamp: Instant ): MasterIndexResult { @@ -216,12 +236,13 @@ public class CacheImporter @Inject constructor( connection.prepareStatement( """ UPDATE games - SET build = ? + SET build_major = ?, build_minor = ? WHERE id = ? """.trimIndent() ).use { stmt -> - stmt.setInt(1, build) - stmt.setInt(2, gameId) + stmt.setInt(1, buildMajor) + stmt.setObject(2, buildMinor, Types.INTEGER) + stmt.setInt(3, gameId) stmt.execute() } @@ -236,7 +257,8 @@ public class CacheImporter @Inject constructor( SourceType.JS5REMOTE, masterIndexId, gameId, - build, + buildMajor, + buildMinor, timestamp, name = "Jagex", description = null, @@ -451,23 +473,25 @@ public class CacheImporter @Inject constructor( type: SourceType, masterIndexId: Int, gameId: Int, - build: Int?, + buildMajor: Int?, + buildMinor: Int?, timestamp: Instant?, name: String?, description: String?, url: String? ): Int { - if (type == SourceType.JS5REMOTE && build != null) { + if (type == SourceType.JS5REMOTE && buildMajor != null) { connection.prepareStatement( """ SELECT id FROM sources - WHERE type = 'js5remote' AND master_index_id = ? AND game_id = ? AND build = ? + WHERE type = 'js5remote' AND master_index_id = ? AND game_id = ? AND build_major = ? AND build_minor IS NOT DISTINCT FROM ? """.trimIndent() ).use { stmt -> stmt.setInt(1, masterIndexId) stmt.setInt(2, gameId) - stmt.setInt(3, build) + stmt.setInt(3, buildMajor) + stmt.setObject(4, buildMinor, Types.INTEGER) stmt.executeQuery().use { rows -> if (rows.next()) { @@ -479,25 +503,26 @@ public class CacheImporter @Inject constructor( connection.prepareStatement( """ - INSERT INTO sources (type, master_index_id, game_id, build, timestamp, name, description, url) - VALUES (?::source_type, ?, ?, ?, ?, ?, ?, ?) + INSERT INTO sources (type, master_index_id, game_id, build_major, build_minor, timestamp, name, description, url) + VALUES (?::source_type, ?, ?, ?, ?, ?, ?, ?, ?) RETURNING id """.trimIndent() ).use { stmt -> stmt.setString(1, type.toString().lowercase()) stmt.setInt(2, masterIndexId) stmt.setInt(3, gameId) - stmt.setObject(4, build, Types.INTEGER) + stmt.setObject(4, buildMajor, Types.INTEGER) + stmt.setObject(5, buildMinor, Types.INTEGER) if (timestamp != null) { - stmt.setObject(5, timestamp.atOffset(ZoneOffset.UTC), Types.TIMESTAMP_WITH_TIMEZONE) + stmt.setObject(6, timestamp.atOffset(ZoneOffset.UTC), Types.TIMESTAMP_WITH_TIMEZONE) } else { - stmt.setNull(5, Types.TIMESTAMP_WITH_TIMEZONE) + stmt.setNull(6, Types.TIMESTAMP_WITH_TIMEZONE) } - stmt.setString(6, name) - stmt.setString(7, description) - stmt.setString(8, url) + stmt.setString(7, name) + stmt.setString(8, description) + stmt.setString(9, url) stmt.executeQuery().use { rows -> check(rows.next()) diff --git a/archive/src/main/kotlin/org/openrs2/archive/cache/ImportCommand.kt b/archive/src/main/kotlin/org/openrs2/archive/cache/ImportCommand.kt index 2c0aadb4..df9acd30 100644 --- a/archive/src/main/kotlin/org/openrs2/archive/cache/ImportCommand.kt +++ b/archive/src/main/kotlin/org/openrs2/archive/cache/ImportCommand.kt @@ -13,7 +13,8 @@ import org.openrs2.cli.instant import org.openrs2.inject.CloseableInjector public class ImportCommand : CliktCommand(name = "import") { - private val build by option().int() + private val buildMajor by option().int() + private val buildMinor by option().int() private val timestamp by option().instant() private val name by option() private val description by option() @@ -31,7 +32,7 @@ public class ImportCommand : CliktCommand(name = "import") { val importer = injector.getInstance(CacheImporter::class.java) Store.open(input).use { store -> - importer.import(store, game, build, timestamp, name, description, url) + importer.import(store, game, buildMajor, buildMinor, timestamp, name, description, url) } } } diff --git a/archive/src/main/kotlin/org/openrs2/archive/cache/ImportMasterIndexCommand.kt b/archive/src/main/kotlin/org/openrs2/archive/cache/ImportMasterIndexCommand.kt index e3ca8efe..1f4249ab 100644 --- a/archive/src/main/kotlin/org/openrs2/archive/cache/ImportMasterIndexCommand.kt +++ b/archive/src/main/kotlin/org/openrs2/archive/cache/ImportMasterIndexCommand.kt @@ -17,7 +17,8 @@ import org.openrs2.inject.CloseableInjector import java.nio.file.Files public class ImportMasterIndexCommand : CliktCommand(name = "import-master-index") { - private val build by option().int() + private val buildMajor by option().int() + private val buildMinor by option().int() private val timestamp by option().instant() private val name by option() private val description by option() @@ -36,7 +37,7 @@ public class ImportMasterIndexCommand : CliktCommand(name = "import-master-index val importer = injector.getInstance(CacheImporter::class.java) Unpooled.wrappedBuffer(Files.readAllBytes(input)).use { buf -> - importer.importMasterIndex(buf, format, game, build, timestamp, name, description, url) + importer.importMasterIndex(buf, format, game, buildMajor, buildMinor, timestamp, name, description, url) } } } diff --git a/archive/src/main/kotlin/org/openrs2/archive/cache/Js5ChannelHandler.kt b/archive/src/main/kotlin/org/openrs2/archive/cache/Js5ChannelHandler.kt index d0ca5bf6..70d315c2 100644 --- a/archive/src/main/kotlin/org/openrs2/archive/cache/Js5ChannelHandler.kt +++ b/archive/src/main/kotlin/org/openrs2/archive/cache/Js5ChannelHandler.kt @@ -5,8 +5,10 @@ import io.netty.bootstrap.Bootstrap import io.netty.buffer.ByteBuf import io.netty.channel.ChannelHandler import io.netty.channel.ChannelHandlerContext +import io.netty.channel.ChannelPipeline import io.netty.channel.SimpleChannelInboundHandler import kotlinx.coroutines.runBlocking +import org.bouncycastle.crypto.params.RSAKeyParameters import org.openrs2.buffer.crc32 import org.openrs2.buffer.use import org.openrs2.cache.Js5Archive @@ -14,34 +16,36 @@ import org.openrs2.cache.Js5Compression import org.openrs2.cache.Js5Index import org.openrs2.cache.Js5MasterIndex import org.openrs2.cache.MasterIndexFormat -import org.openrs2.protocol.Rs2Decoder -import org.openrs2.protocol.Rs2Encoder -import org.openrs2.protocol.js5.Js5Request -import org.openrs2.protocol.js5.Js5RequestEncoder -import org.openrs2.protocol.js5.Js5Response -import org.openrs2.protocol.js5.Js5ResponseDecoder -import org.openrs2.protocol.js5.XorDecoder -import org.openrs2.protocol.login.LoginRequest -import org.openrs2.protocol.login.LoginResponse import java.time.Instant import kotlin.coroutines.Continuation import kotlin.coroutines.resume import kotlin.coroutines.resumeWithException @ChannelHandler.Sharable -public class Js5ChannelHandler( +public abstract class Js5ChannelHandler( private val bootstrap: Bootstrap, private val gameId: Int, private val hostname: String, private val port: Int, - private var build: Int, + protected var buildMajor: Int, + protected var buildMinor: Int?, private val lastMasterIndexId: Int?, private val continuation: Continuation, private val importer: CacheImporter, - private val masterIndexFormat: MasterIndexFormat = MasterIndexFormat.VERSIONED, - private val maxInFlightRequests: Int = 200, - maxBuildAttempts: Int = 10 + private val key: RSAKeyParameters?, + private val masterIndexFormat: MasterIndexFormat, + private val maxInFlightRequests: Int, + private val maxBuildAttempts: Int = 10 ) : SimpleChannelInboundHandler(Object::class.java) { + protected data class InFlightRequest(val prefetch: Boolean, val archive: Int, val group: Int) + protected data class PendingRequest( + val prefetch: Boolean, + val archive: Int, + val group: Int, + val version: Int, + val checksum: Int + ) + private enum class State { ACTIVE, CLIENT_OUT_OF_DATE, @@ -49,39 +53,35 @@ public class Js5ChannelHandler( } private var state = State.ACTIVE - private val maxBuild = build + maxBuildAttempts - private val inFlightRequests = mutableSetOf() - private val pendingRequests = ArrayDeque() + private var buildAttempts = 0 + private val inFlightRequests = mutableSetOf() + private val pendingRequests = ArrayDeque() private var masterIndexId: Int = 0 private var sourceId: Int = 0 private var masterIndex: Js5MasterIndex? = null private lateinit var indexes: Array private val groups = mutableListOf() + protected abstract fun createInitMessage(): Any + protected abstract fun createRequestMessage(prefetch: Boolean, archive: Int, group: Int): Any + protected abstract fun createConnectedMessage(): Any? + protected abstract fun configurePipeline(pipeline: ChannelPipeline) + protected abstract fun incrementVersion() + override fun channelActive(ctx: ChannelHandlerContext) { - ctx.writeAndFlush(LoginRequest.InitJs5RemoteConnection(build), ctx.voidPromise()) + ctx.writeAndFlush(createInitMessage(), ctx.voidPromise()) ctx.read() } - override fun channelRead0(ctx: ChannelHandlerContext, msg: Any) { - when (msg) { - is LoginResponse.Js5Ok -> handleOk(ctx) - is LoginResponse.ClientOutOfDate -> handleClientOutOfDate(ctx) - is LoginResponse -> throw Exception("Invalid response: $msg") - is Js5Response -> handleResponse(ctx, msg) - else -> throw Exception("Unknown message type: ${msg.javaClass.name}") - } - } - override fun channelReadComplete(ctx: ChannelHandlerContext) { var flush = false while (inFlightRequests.size < maxInFlightRequests) { val request = pendingRequests.removeFirstOrNull() ?: break - inFlightRequests += request + inFlightRequests += InFlightRequest(request.prefetch, request.archive, request.group) logger.info { "Requesting archive ${request.archive} group ${request.group}" } - ctx.write(request, ctx.voidPromise()) + ctx.write(createRequestMessage(request.prefetch, request.archive, request.group), ctx.voidPromise()) flush = true } @@ -112,53 +112,60 @@ public class Js5ChannelHandler( continuation.resumeWithException(cause) } - private fun handleOk(ctx: ChannelHandlerContext) { - val pipeline = ctx.pipeline() + protected fun handleOk(ctx: ChannelHandlerContext) { + configurePipeline(ctx.pipeline()) - pipeline.remove(Rs2Encoder::class.java) - pipeline.remove(Rs2Decoder::class.java) - pipeline.addFirst( - Js5RequestEncoder, - XorDecoder(), - Js5ResponseDecoder() - ) + val msg = createConnectedMessage() + if (msg != null) { + ctx.write(msg, ctx.voidPromise()) + } - request(Js5Archive.ARCHIVESET, Js5Archive.ARCHIVESET) + request(ctx, Js5Archive.ARCHIVESET, Js5Archive.ARCHIVESET, 0, 0) } - private fun handleClientOutOfDate(ctx: ChannelHandlerContext) { - if (++build > maxBuild) { + protected fun handleClientOutOfDate(ctx: ChannelHandlerContext) { + if (++buildAttempts > maxBuildAttempts) { throw Exception("Failed to identify current version") } state = State.CLIENT_OUT_OF_DATE + incrementVersion() + ctx.close() } - private fun handleResponse(ctx: ChannelHandlerContext, response: Js5Response) { - val request = Js5Request.Group(response.prefetch, response.archive, response.group) + protected fun handleResponse( + ctx: ChannelHandlerContext, + prefetch: Boolean, + archive: Int, + group: Int, + data: ByteBuf + ) { + val request = InFlightRequest(prefetch, archive, group) val removed = inFlightRequests.remove(request) if (!removed) { - val type = if (response.prefetch) { + val type = if (prefetch) { "prefetch" } else { "urgent" } - val archive = response.archive - val group = response.group throw Exception("Received response for $type request (archive $archive group $group) not in-flight") } - if (response.archive == Js5Archive.ARCHIVESET && response.group == Js5Archive.ARCHIVESET) { - processMasterIndex(response.data) - } else if (response.archive == Js5Archive.ARCHIVESET) { - processIndex(response.group, response.data) + processResponse(ctx, archive, group, data) + } + + protected fun processResponse(ctx: ChannelHandlerContext, archive: Int, group: Int, data: ByteBuf) { + if (archive == Js5Archive.ARCHIVESET && group == Js5Archive.ARCHIVESET) { + processMasterIndex(ctx, data) + } else if (archive == Js5Archive.ARCHIVESET) { + processIndex(ctx, group, data) } else { - processGroup(response.archive, response.group, response.data) + processGroup(archive, group, data) } - val complete = pendingRequests.isEmpty() && inFlightRequests.isEmpty() + val complete = isComplete() if (groups.size >= CacheImporter.BATCH_SIZE || complete) { runBlocking { @@ -179,9 +186,13 @@ public class Js5ChannelHandler( } } - private fun processMasterIndex(buf: ByteBuf) { + protected open fun isComplete(): Boolean { + return pendingRequests.isEmpty() && inFlightRequests.isEmpty() + } + + private fun processMasterIndex(ctx: ChannelHandlerContext, buf: ByteBuf) { Js5Compression.uncompress(buf.slice()).use { uncompressed -> - masterIndex = Js5MasterIndex.read(uncompressed.slice(), masterIndexFormat) + masterIndex = Js5MasterIndex.read(uncompressed.slice(), masterIndexFormat, key) val (masterIndexId, sourceId, rawIndexes) = runBlocking { importer.importMasterIndexAndGetIndexes( @@ -189,7 +200,8 @@ public class Js5ChannelHandler( buf, uncompressed, gameId, - build, + buildMajor, + buildMinor, lastMasterIndexId, timestamp = Instant.now() ) @@ -202,10 +214,15 @@ public class Js5ChannelHandler( indexes = arrayOfNulls(rawIndexes.size) for ((archive, index) in rawIndexes.withIndex()) { + val entry = masterIndex!!.entries[archive] + if (entry.version == 0 && entry.checksum == 0) { + continue + } + if (index != null) { - processIndex(archive, index) + processIndex(ctx, archive, index) } else { - request(Js5Archive.ARCHIVESET, archive) + request(ctx, Js5Archive.ARCHIVESET, archive, entry.version, entry.checksum) } } } finally { @@ -214,7 +231,7 @@ public class Js5ChannelHandler( } } - private fun processIndex(archive: Int, buf: ByteBuf) { + private fun processIndex(ctx: ChannelHandlerContext, archive: Int, buf: ByteBuf) { val checksum = buf.crc32() val entry = masterIndex!!.entries[archive] if (checksum != entry.checksum) { @@ -233,7 +250,8 @@ public class Js5ChannelHandler( importer.importIndexAndGetMissingGroups(sourceId, archive, index, buf, uncompressed, lastMasterIndexId) } for (group in groups) { - request(archive, group) + val groupEntry = index[group]!! + request(ctx, archive, group, groupEntry.version, groupEntry.checksum) } } } @@ -257,8 +275,8 @@ public class Js5ChannelHandler( ) } - private fun request(archive: Int, group: Int) { - pendingRequests += Js5Request.Group(false, archive, group) + protected open fun request(ctx: ChannelHandlerContext, archive: Int, group: Int, version: Int, checksum: Int) { + pendingRequests += PendingRequest(false, archive, group, version, checksum) } private fun releaseGroups() { diff --git a/archive/src/main/kotlin/org/openrs2/archive/cache/NxtJs5ChannelHandler.kt b/archive/src/main/kotlin/org/openrs2/archive/cache/NxtJs5ChannelHandler.kt new file mode 100644 index 00000000..04b082e0 --- /dev/null +++ b/archive/src/main/kotlin/org/openrs2/archive/cache/NxtJs5ChannelHandler.kt @@ -0,0 +1,158 @@ +package org.openrs2.archive.cache + +import com.github.michaelbull.logging.InlineLogger +import io.netty.bootstrap.Bootstrap +import io.netty.channel.ChannelHandlerContext +import io.netty.channel.ChannelPipeline +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.asCoroutineDispatcher +import kotlinx.coroutines.cancel +import kotlinx.coroutines.launch +import org.bouncycastle.crypto.params.RSAKeyParameters +import org.openrs2.archive.cache.nxt.InitJs5RemoteConnection +import org.openrs2.archive.cache.nxt.Js5Request +import org.openrs2.archive.cache.nxt.Js5RequestEncoder +import org.openrs2.archive.cache.nxt.Js5Response +import org.openrs2.archive.cache.nxt.Js5ResponseDecoder +import org.openrs2.archive.cache.nxt.LoginResponse +import org.openrs2.archive.cache.nxt.MusicStreamClient +import org.openrs2.buffer.use +import org.openrs2.cache.MasterIndexFormat +import org.openrs2.protocol.Rs2Decoder +import org.openrs2.protocol.Rs2Encoder +import org.openrs2.protocol.js5.XorDecoder +import kotlin.coroutines.Continuation + +public class NxtJs5ChannelHandler( + bootstrap: Bootstrap, + gameId: Int, + hostname: String, + port: Int, + buildMajor: Int, + buildMinor: Int, + lastMasterIndexId: Int?, + continuation: Continuation, + importer: CacheImporter, + key: RSAKeyParameters?, + private val token: String, + private val musicStreamClient: MusicStreamClient, + private val maxMinorBuildAttempts: Int = 5 +) : Js5ChannelHandler( + bootstrap, + gameId, + hostname, + port, + buildMajor, + buildMinor, + lastMasterIndexId, + continuation, + importer, + key, + MasterIndexFormat.LENGTHS, + maxInFlightRequests = 500 +) { + private data class MusicRequest(val archive: Int, val group: Int, val version: Int, val checksum: Int) + + private var inFlightRequests = 0 + private val pendingRequests = ArrayDeque() + private var scope: CoroutineScope? = null + private var minorBuildAttempts = 0 + + override fun createInitMessage(): Any { + return InitJs5RemoteConnection(buildMajor, buildMinor!!, token, 0) + } + + override fun createRequestMessage(prefetch: Boolean, archive: Int, group: Int): Any { + return Js5Request.Group(prefetch, archive, group, buildMajor) + } + + override fun createConnectedMessage(): Any? { + return Js5Request.Connected(buildMajor) + } + + override fun configurePipeline(pipeline: ChannelPipeline) { + pipeline.addBefore("handler", null, Js5RequestEncoder) + pipeline.addBefore("handler", null, XorDecoder()) + pipeline.addBefore("handler", null, Js5ResponseDecoder()) + + pipeline.remove(Rs2Encoder::class.java) + pipeline.remove(Rs2Decoder::class.java) + } + + override fun incrementVersion() { + buildMinor = buildMinor!! + 1 + + if (++minorBuildAttempts >= maxMinorBuildAttempts) { + buildMajor++ + buildMinor = 1 + } + } + + override fun channelActive(ctx: ChannelHandlerContext) { + super.channelActive(ctx) + scope = CoroutineScope(ctx.channel().eventLoop().asCoroutineDispatcher()) + } + + override fun channelInactive(ctx: ChannelHandlerContext) { + super.channelInactive(ctx) + scope!!.cancel() + } + + override fun channelRead0(ctx: ChannelHandlerContext, msg: Any) { + when (msg) { + is LoginResponse.Js5Ok -> handleOk(ctx) + is LoginResponse.ClientOutOfDate -> handleClientOutOfDate(ctx) + is LoginResponse -> throw Exception("Invalid response: $msg") + is Js5Response -> handleResponse(ctx, msg.prefetch, msg.archive, msg.group, msg.data) + else -> throw Exception("Unknown message type: ${msg.javaClass.name}") + } + } + + override fun channelReadComplete(ctx: ChannelHandlerContext) { + super.channelReadComplete(ctx) + + while (inFlightRequests < 6) { + val request = pendingRequests.removeFirstOrNull() ?: break + inFlightRequests++ + + logger.info { "Requesting archive ${request.archive} group ${request.group}" } + + scope!!.launch { + val archive = request.archive + val group = request.group + val version = request.version + val checksum = request.checksum + + musicStreamClient.request(archive, group, version, checksum, buildMajor).use { buf -> + inFlightRequests-- + + processResponse(ctx, archive, group, buf) + + /* + * Inject a fake channelReadComplete event to ensure we + * don't time out and to send any new music requests. + */ + ctx.channel().pipeline().fireChannelReadComplete() + } + } + } + } + + override fun isComplete(): Boolean { + return super.isComplete() && pendingRequests.isEmpty() && inFlightRequests == 0 + } + + override fun request(ctx: ChannelHandlerContext, archive: Int, group: Int, version: Int, checksum: Int) { + if (archive == MUSIC_ARCHIVE) { + pendingRequests += MusicRequest(archive, group, version, checksum) + } else { + super.request(ctx, archive, group, version, checksum) + } + } + + private companion object { + private val logger = InlineLogger() + + private const val MUSIC_ARCHIVE = 40 + } +} diff --git a/archive/src/main/kotlin/org/openrs2/archive/cache/NxtJs5ChannelInitializer.kt b/archive/src/main/kotlin/org/openrs2/archive/cache/NxtJs5ChannelInitializer.kt new file mode 100644 index 00000000..8f7c18ed --- /dev/null +++ b/archive/src/main/kotlin/org/openrs2/archive/cache/NxtJs5ChannelInitializer.kt @@ -0,0 +1,22 @@ +package org.openrs2.archive.cache + +import io.netty.channel.Channel +import io.netty.channel.ChannelInitializer +import io.netty.handler.timeout.ReadTimeoutHandler +import org.openrs2.archive.cache.nxt.ClientOutOfDateCodec +import org.openrs2.archive.cache.nxt.InitJs5RemoteConnectionCodec +import org.openrs2.archive.cache.nxt.Js5OkCodec +import org.openrs2.protocol.Protocol +import org.openrs2.protocol.Rs2Decoder +import org.openrs2.protocol.Rs2Encoder + +public class NxtJs5ChannelInitializer(private val handler: NxtJs5ChannelHandler) : ChannelInitializer() { + override fun initChannel(ch: Channel) { + ch.pipeline().addLast( + ReadTimeoutHandler(30), + Rs2Encoder(Protocol(InitJs5RemoteConnectionCodec)), + Rs2Decoder(Protocol(Js5OkCodec, ClientOutOfDateCodec)) + ) + ch.pipeline().addLast("handler", handler) + } +} diff --git a/archive/src/main/kotlin/org/openrs2/archive/cache/OsrsJs5ChannelHandler.kt b/archive/src/main/kotlin/org/openrs2/archive/cache/OsrsJs5ChannelHandler.kt new file mode 100644 index 00000000..da933410 --- /dev/null +++ b/archive/src/main/kotlin/org/openrs2/archive/cache/OsrsJs5ChannelHandler.kt @@ -0,0 +1,77 @@ +package org.openrs2.archive.cache + +import io.netty.bootstrap.Bootstrap +import io.netty.channel.ChannelHandlerContext +import io.netty.channel.ChannelPipeline +import org.bouncycastle.crypto.params.RSAKeyParameters +import org.openrs2.cache.MasterIndexFormat +import org.openrs2.protocol.Rs2Decoder +import org.openrs2.protocol.Rs2Encoder +import org.openrs2.protocol.js5.Js5Request +import org.openrs2.protocol.js5.Js5RequestEncoder +import org.openrs2.protocol.js5.Js5Response +import org.openrs2.protocol.js5.Js5ResponseDecoder +import org.openrs2.protocol.js5.XorDecoder +import org.openrs2.protocol.login.LoginRequest +import org.openrs2.protocol.login.LoginResponse +import kotlin.coroutines.Continuation + +public class OsrsJs5ChannelHandler( + bootstrap: Bootstrap, + gameId: Int, + hostname: String, + port: Int, + build: Int, + lastMasterIndexId: Int?, + continuation: Continuation, + importer: CacheImporter, + key: RSAKeyParameters? +) : Js5ChannelHandler( + bootstrap, + gameId, + hostname, + port, + build, + null, + lastMasterIndexId, + continuation, + importer, + key, + MasterIndexFormat.VERSIONED, + maxInFlightRequests = 200 +) { + override fun createInitMessage(): Any { + return LoginRequest.InitJs5RemoteConnection(buildMajor) + } + + override fun createRequestMessage(prefetch: Boolean, archive: Int, group: Int): Any { + return Js5Request.Group(prefetch, archive, group) + } + + override fun createConnectedMessage(): Any? { + return null + } + + override fun configurePipeline(pipeline: ChannelPipeline) { + pipeline.addBefore("handler", null, Js5RequestEncoder) + pipeline.addBefore("handler", null, XorDecoder()) + pipeline.addBefore("handler", null, Js5ResponseDecoder()) + + pipeline.remove(Rs2Encoder::class.java) + pipeline.remove(Rs2Decoder::class.java) + } + + override fun incrementVersion() { + buildMajor++ + } + + override fun channelRead0(ctx: ChannelHandlerContext, msg: Any) { + when (msg) { + is LoginResponse.Js5Ok -> handleOk(ctx) + is LoginResponse.ClientOutOfDate -> handleClientOutOfDate(ctx) + is LoginResponse -> throw Exception("Invalid response: $msg") + is Js5Response -> handleResponse(ctx, msg.prefetch, msg.archive, msg.group, msg.data) + else -> throw Exception("Unknown message type: ${msg.javaClass.name}") + } + } +} diff --git a/archive/src/main/kotlin/org/openrs2/archive/cache/Js5ChannelInitializer.kt b/archive/src/main/kotlin/org/openrs2/archive/cache/OsrsJs5ChannelInitializer.kt similarity index 67% rename from archive/src/main/kotlin/org/openrs2/archive/cache/Js5ChannelInitializer.kt rename to archive/src/main/kotlin/org/openrs2/archive/cache/OsrsJs5ChannelInitializer.kt index 01e80b0c..7c53c615 100644 --- a/archive/src/main/kotlin/org/openrs2/archive/cache/Js5ChannelInitializer.kt +++ b/archive/src/main/kotlin/org/openrs2/archive/cache/OsrsJs5ChannelInitializer.kt @@ -7,13 +7,13 @@ import org.openrs2.protocol.Protocol import org.openrs2.protocol.Rs2Decoder import org.openrs2.protocol.Rs2Encoder -public class Js5ChannelInitializer(private val handler: Js5ChannelHandler) : ChannelInitializer() { +public class OsrsJs5ChannelInitializer(private val handler: OsrsJs5ChannelHandler) : ChannelInitializer() { override fun initChannel(ch: Channel) { ch.pipeline().addLast( ReadTimeoutHandler(30), Rs2Encoder(Protocol.LOGIN_UPSTREAM), - Rs2Decoder(Protocol.LOGIN_DOWNSTREAM), - handler + Rs2Decoder(Protocol.LOGIN_DOWNSTREAM) ) + ch.pipeline().addLast("handler", handler) } } diff --git a/archive/src/main/kotlin/org/openrs2/archive/cache/nxt/ClientOutOfDateCodec.kt b/archive/src/main/kotlin/org/openrs2/archive/cache/nxt/ClientOutOfDateCodec.kt new file mode 100644 index 00000000..54e6d42d --- /dev/null +++ b/archive/src/main/kotlin/org/openrs2/archive/cache/nxt/ClientOutOfDateCodec.kt @@ -0,0 +1,8 @@ +package org.openrs2.archive.cache.nxt + +import org.openrs2.protocol.EmptyPacketCodec + +public object ClientOutOfDateCodec : EmptyPacketCodec( + opcode = 6, + packet = LoginResponse.ClientOutOfDate +) diff --git a/archive/src/main/kotlin/org/openrs2/archive/cache/nxt/InitJs5RemoteConnection.kt b/archive/src/main/kotlin/org/openrs2/archive/cache/nxt/InitJs5RemoteConnection.kt new file mode 100644 index 00000000..83af26e8 --- /dev/null +++ b/archive/src/main/kotlin/org/openrs2/archive/cache/nxt/InitJs5RemoteConnection.kt @@ -0,0 +1,10 @@ +package org.openrs2.archive.cache.nxt + +import org.openrs2.protocol.Packet + +public data class InitJs5RemoteConnection( + public val buildMajor: Int, + public val buildMinor: Int, + public val token: String, + public val language: Int +) : Packet diff --git a/archive/src/main/kotlin/org/openrs2/archive/cache/nxt/InitJs5RemoteConnectionCodec.kt b/archive/src/main/kotlin/org/openrs2/archive/cache/nxt/InitJs5RemoteConnectionCodec.kt new file mode 100644 index 00000000..f384ffc2 --- /dev/null +++ b/archive/src/main/kotlin/org/openrs2/archive/cache/nxt/InitJs5RemoteConnectionCodec.kt @@ -0,0 +1,29 @@ +package org.openrs2.archive.cache.nxt + +import io.netty.buffer.ByteBuf +import org.openrs2.buffer.readString +import org.openrs2.buffer.writeString +import org.openrs2.crypto.StreamCipher +import org.openrs2.protocol.PacketCodec +import org.openrs2.protocol.PacketLength + +public object InitJs5RemoteConnectionCodec : PacketCodec( + length = PacketLength.VARIABLE_BYTE, + opcode = 15, + type = InitJs5RemoteConnection::class.java +) { + override fun decode(input: ByteBuf, cipher: StreamCipher): InitJs5RemoteConnection { + val buildMajor = input.readInt() + val buildMinor = input.readInt() + val token = input.readString() + val language = input.readUnsignedByte().toInt() + return InitJs5RemoteConnection(buildMajor, buildMinor, token, language) + } + + override fun encode(input: InitJs5RemoteConnection, output: ByteBuf, cipher: StreamCipher) { + output.writeInt(input.buildMajor) + output.writeInt(input.buildMinor) + output.writeString(input.token) + output.writeByte(input.language) + } +} diff --git a/archive/src/main/kotlin/org/openrs2/archive/cache/nxt/Js5OkCodec.kt b/archive/src/main/kotlin/org/openrs2/archive/cache/nxt/Js5OkCodec.kt new file mode 100644 index 00000000..6648b467 --- /dev/null +++ b/archive/src/main/kotlin/org/openrs2/archive/cache/nxt/Js5OkCodec.kt @@ -0,0 +1,25 @@ +package org.openrs2.archive.cache.nxt + +import io.netty.buffer.ByteBuf +import org.openrs2.crypto.StreamCipher +import org.openrs2.protocol.PacketCodec + +public object Js5OkCodec : PacketCodec( + opcode = 0, + length = LoginResponse.Js5Ok.LOADING_REQUIREMENTS * 4, + type = LoginResponse.Js5Ok::class.java +) { + override fun decode(input: ByteBuf, cipher: StreamCipher): LoginResponse.Js5Ok { + val loadingRequirements = mutableListOf() + for (i in 0 until LoginResponse.Js5Ok.LOADING_REQUIREMENTS) { + loadingRequirements += input.readInt() + } + return LoginResponse.Js5Ok(loadingRequirements) + } + + override fun encode(input: LoginResponse.Js5Ok, output: ByteBuf, cipher: StreamCipher) { + for (requirement in input.loadingRequirements) { + output.writeInt(requirement) + } + } +} diff --git a/archive/src/main/kotlin/org/openrs2/archive/cache/nxt/Js5Request.kt b/archive/src/main/kotlin/org/openrs2/archive/cache/nxt/Js5Request.kt new file mode 100644 index 00000000..5a653a3b --- /dev/null +++ b/archive/src/main/kotlin/org/openrs2/archive/cache/nxt/Js5Request.kt @@ -0,0 +1,14 @@ +package org.openrs2.archive.cache.nxt + +public sealed class Js5Request { + public data class Group( + public val prefetch: Boolean, + public val archive: Int, + public val group: Int, + public val build: Int + ) : Js5Request() + + public data class Connected( + public val build: Int + ) : Js5Request() +} diff --git a/archive/src/main/kotlin/org/openrs2/archive/cache/nxt/Js5RequestEncoder.kt b/archive/src/main/kotlin/org/openrs2/archive/cache/nxt/Js5RequestEncoder.kt new file mode 100644 index 00000000..993d2268 --- /dev/null +++ b/archive/src/main/kotlin/org/openrs2/archive/cache/nxt/Js5RequestEncoder.kt @@ -0,0 +1,36 @@ +package org.openrs2.archive.cache.nxt + +import io.netty.buffer.ByteBuf +import io.netty.channel.ChannelHandler +import io.netty.channel.ChannelHandlerContext +import io.netty.handler.codec.MessageToByteEncoder + +@ChannelHandler.Sharable +public object Js5RequestEncoder : MessageToByteEncoder(Js5Request::class.java) { + override fun encode(ctx: ChannelHandlerContext, msg: Js5Request, out: ByteBuf) { + when (msg) { + is Js5Request.Group -> { + out.writeByte(if (msg.prefetch) 32 else 33) + out.writeByte(msg.archive) + out.writeInt(msg.group) + out.writeShort(msg.build) + out.writeShort(0) + } + is Js5Request.Connected -> { + out.writeByte(6) + out.writeMedium(5) + out.writeShort(0) + out.writeShort(msg.build) + out.writeShort(0) + } + } + } + + override fun allocateBuffer(ctx: ChannelHandlerContext, msg: Js5Request, preferDirect: Boolean): ByteBuf { + return if (preferDirect) { + ctx.alloc().ioBuffer(10, 10) + } else { + ctx.alloc().heapBuffer(10, 10) + } + } +} diff --git a/archive/src/main/kotlin/org/openrs2/archive/cache/nxt/Js5Response.kt b/archive/src/main/kotlin/org/openrs2/archive/cache/nxt/Js5Response.kt new file mode 100644 index 00000000..792e46fd --- /dev/null +++ b/archive/src/main/kotlin/org/openrs2/archive/cache/nxt/Js5Response.kt @@ -0,0 +1,11 @@ +package org.openrs2.archive.cache.nxt + +import io.netty.buffer.ByteBuf +import io.netty.buffer.DefaultByteBufHolder + +public data class Js5Response( + public val prefetch: Boolean, + public val archive: Int, + public val group: Int, + public val data: ByteBuf +) : DefaultByteBufHolder(data) diff --git a/archive/src/main/kotlin/org/openrs2/archive/cache/nxt/Js5ResponseDecoder.kt b/archive/src/main/kotlin/org/openrs2/archive/cache/nxt/Js5ResponseDecoder.kt new file mode 100644 index 00000000..3a882cb4 --- /dev/null +++ b/archive/src/main/kotlin/org/openrs2/archive/cache/nxt/Js5ResponseDecoder.kt @@ -0,0 +1,121 @@ +package org.openrs2.archive.cache.nxt + +import io.netty.buffer.ByteBuf +import io.netty.channel.ChannelHandlerContext +import io.netty.handler.codec.ByteToMessageDecoder +import io.netty.handler.codec.DecoderException +import kotlin.math.min + +public class Js5ResponseDecoder : ByteToMessageDecoder() { + private data class Request(val prefetch: Boolean, val archive: Int, val group: Int) + + private enum class State { + READ_HEADER, + READ_LEN, + READ_DATA + } + + private var state = State.READ_HEADER + private val buffers = mutableMapOf() + private var request: Request? = null + + override fun decode(ctx: ChannelHandlerContext, input: ByteBuf, out: MutableList) { + if (state == State.READ_HEADER) { + if (input.readableBytes() < 5) { + return + } + + val prefetch: Boolean + val archive = input.readUnsignedByte().toInt() + var group = input.readInt() + + if (group and 0x80000000.toInt() != 0) { + prefetch = true + group = group and 0x7FFFFFFF + } else { + prefetch = false + } + + request = Request(prefetch, archive, group) + + if (buffers.containsKey(request)) { + state = State.READ_DATA + } else { + state = State.READ_LEN + } + } + + if (state == State.READ_LEN) { + if (input.readableBytes() < 5) { + return + } + + val type = input.readUnsignedByte().toInt() + + val len = input.readInt() + if (len < 0) { + throw DecoderException("Length is negative: $len") + } + + val totalLen = if (type == 0) { + len + 5 + } else { + len + 9 + } + + if (totalLen < 0) { + throw DecoderException("Total length exceeds maximum ByteBuf size") + } + + val data = ctx.alloc().buffer(totalLen, totalLen) + data.writeByte(type) + data.writeInt(len) + + buffers[request!!] = data + + state = State.READ_DATA + } + + if (state == State.READ_DATA) { + val data = buffers[request!!]!! + + var blockLen = if (data.writerIndex() == 5) { + 102400 - 10 + } else { + 102400 - 5 + } + + blockLen = min(blockLen, data.writableBytes()) + + if (input.readableBytes() < blockLen) { + return + } + + data.writeBytes(input, blockLen) + + if (!data.isWritable) { + out += Js5Response(request!!.prefetch, request!!.archive, request!!.group, data) + buffers.remove(request!!) + request = null + } + + state = State.READ_HEADER + } + } + + override fun channelInactive(ctx: ChannelHandlerContext) { + super.channelInactive(ctx) + reset() + } + + override fun handlerRemoved0(ctx: ChannelHandlerContext?) { + reset() + } + + private fun reset() { + buffers.values.forEach(ByteBuf::release) + buffers.clear() + + state = State.READ_HEADER + } +} diff --git a/archive/src/main/kotlin/org/openrs2/archive/cache/nxt/LoginResponse.kt b/archive/src/main/kotlin/org/openrs2/archive/cache/nxt/LoginResponse.kt new file mode 100644 index 00000000..56afe0d7 --- /dev/null +++ b/archive/src/main/kotlin/org/openrs2/archive/cache/nxt/LoginResponse.kt @@ -0,0 +1,13 @@ +package org.openrs2.archive.cache.nxt + +import org.openrs2.protocol.Packet + +public sealed class LoginResponse : Packet { + public data class Js5Ok(val loadingRequirements: List) : LoginResponse() { + public companion object { + public const val LOADING_REQUIREMENTS: Int = 31 + } + } + + public object ClientOutOfDate : LoginResponse() +} diff --git a/archive/src/main/kotlin/org/openrs2/archive/cache/nxt/MusicStreamClient.kt b/archive/src/main/kotlin/org/openrs2/archive/cache/nxt/MusicStreamClient.kt new file mode 100644 index 00000000..234d9fb0 --- /dev/null +++ b/archive/src/main/kotlin/org/openrs2/archive/cache/nxt/MusicStreamClient.kt @@ -0,0 +1,30 @@ +package org.openrs2.archive.cache.nxt + +import io.netty.buffer.ByteBuf +import kotlinx.coroutines.future.await +import org.openrs2.buffer.ByteBufBodyHandler +import org.openrs2.buffer.use +import org.openrs2.http.checkStatusCode +import java.net.URI +import java.net.http.HttpClient +import java.net.http.HttpRequest + +public class MusicStreamClient( + private val client: HttpClient, + private val byteBufBodyHandler: ByteBufBodyHandler, + private val origin: String +) { + public suspend fun request(archive: Int, group: Int, version: Int, checksum: Int, build: Int): ByteBuf { + val uri = URI("$origin/ms?m=0&a=$archive&k=$build&g=$group&c=$checksum&v=$version") + + val request = HttpRequest.newBuilder(uri) + .GET() + .build() + + val response = client.sendAsync(request, byteBufBodyHandler).await() + response.body().use { buf -> + response.checkStatusCode() + return buf.retain() + } + } +} diff --git a/archive/src/main/kotlin/org/openrs2/archive/game/Game.kt b/archive/src/main/kotlin/org/openrs2/archive/game/Game.kt index 38ffc8fc..4e1f0cdb 100644 --- a/archive/src/main/kotlin/org/openrs2/archive/game/Game.kt +++ b/archive/src/main/kotlin/org/openrs2/archive/game/Game.kt @@ -1,8 +1,12 @@ package org.openrs2.archive.game +import org.bouncycastle.crypto.params.RSAKeyParameters + public data class Game( public val id: Int, public val url: String?, - public val build: Int?, - public val lastMasterIndexId: Int? + public val buildMajor: Int?, + public val buildMinor: Int?, + public val lastMasterIndexId: Int?, + public val key: RSAKeyParameters? ) diff --git a/archive/src/main/kotlin/org/openrs2/archive/game/GameDatabase.kt b/archive/src/main/kotlin/org/openrs2/archive/game/GameDatabase.kt index 80c2686b..85feaf48 100644 --- a/archive/src/main/kotlin/org/openrs2/archive/game/GameDatabase.kt +++ b/archive/src/main/kotlin/org/openrs2/archive/game/GameDatabase.kt @@ -1,6 +1,8 @@ package org.openrs2.archive.game +import org.openrs2.crypto.Rsa import org.openrs2.db.Database +import java.io.StringReader import javax.inject.Inject import javax.inject.Singleton @@ -12,7 +14,7 @@ public class GameDatabase @Inject constructor( return database.execute { connection -> connection.prepareStatement( """ - SELECT id, url, build, last_master_index_id + SELECT id, url, build_major, build_minor, last_master_index_id, key FROM games WHERE name = ? """.trimIndent() @@ -27,17 +29,31 @@ public class GameDatabase @Inject constructor( val id = rows.getInt(1) val url: String? = rows.getString(2) - var build: Int? = rows.getInt(3) + var buildMajor: Int? = rows.getInt(3) if (rows.wasNull()) { - build = null + buildMajor = null } - var lastMasterIndexId: Int? = rows.getInt(4) + var buildMinor: Int? = rows.getInt(4) + if (rows.wasNull()) { + buildMinor = null + } + + var lastMasterIndexId: Int? = rows.getInt(5) if (rows.wasNull()) { lastMasterIndexId = null } - return@execute Game(id, url, build, lastMasterIndexId) + val pem = rows.getString(6) + val key = if (rows.wasNull()) { + null + } else { + StringReader(pem).use { reader -> + Rsa.readPublicKey(reader) + } + } + + return@execute Game(id, url, buildMajor, buildMinor, lastMasterIndexId, key) } } } diff --git a/archive/src/main/resources/org/openrs2/archive/migrations/V3__nxt.sql b/archive/src/main/resources/org/openrs2/archive/migrations/V3__nxt.sql new file mode 100644 index 00000000..f58a2d26 --- /dev/null +++ b/archive/src/main/resources/org/openrs2/archive/migrations/V3__nxt.sql @@ -0,0 +1,49 @@ +-- @formatter:off +CREATE TYPE build AS ( + major INTEGER, + minor INTEGER +); + +ALTER TABLE games + ADD COLUMN key TEXT NULL, + ADD COLUMN build_minor INTEGER NULL; + +ALTER TABLE games + RENAME COLUMN build TO build_major; + +ALTER TABLE sources + ADD COLUMN build_minor INTEGER NULL; + +ALTER TABLE sources + RENAME COLUMN build TO build_major; + +DROP INDEX sources_master_index_id_game_id_build_idx; + +CREATE UNIQUE INDEX ON sources (master_index_id, game_id, build_major) + WHERE type = 'js5remote' AND build_minor IS NULL; + +CREATE UNIQUE INDEX ON sources (master_index_id, game_id, build_major, build_minor) + WHERE type = 'js5remote' AND build_minor IS NOT NULL; + +UPDATE games +SET + url = 'https://www.runescape.com/k=5/l=0/jav_config.ws?binaryType=2', + build_major = 919, + build_minor = 1, + key = $$-----BEGIN PUBLIC KEY----- +MIICIjANBgkqhkiG9w0BAQEFAAOCAg8AMIICCgKCAgEAnnP2Sqv7uMM3rjmsLTQ3 +z4yt8/4j9MDS/2+/9KEkfnH2K/toJbyBUCMHvfS7SBvPiLXWaArNvIArEz/e5Cr3 +dk2mcSzmoVcsE1dJq/2eDIqRzhH9WB6zDz+5DO6ysRYap1VdMa4bKXMkM+e7V0c3 +9xiMEpjpeSs0cHGxTGlxLBGTFHYG1IZPLkDRJzhKD58Lu8bn2e3KCTuzzvZFf2AF +FZauENC6OswdfAZutlWdWkVOZsD9IB/ALNaY4W35PABZbsfT/ar85S/foXFwHJ+B +OHuF6BR5dYETUQ5Oasl0GUEaVUM9POv7KRv6cW7HWUQHYfQdApjdH+dORHtk4kMG +QAmk/VpTwWBkZWqDbglZBIkd5G7gs8JpluiUh11eRMC/xj99iZp4nt/FOoSNw2NO +GMTUPkHIySC4FQHNSxzbfCW5rQdSRw5+eyuo8MA6mg0LZH3jQuNnnYBg1hJTsdBp +0IrjOQWsfTiX+xZ6lUfRhFtGISuKchpGDZfmOtrZPJDvUgNy0z8w41V6NyiU/h7X +2TKYFQG1/c4Kr4BxT4tPl85nVbMulonfk/AD5l6BflEuHlChpkAhv14j6xRzGHWx +4pdpbHSzDkg/HBR5ka0D7Ua7W6uL3VFVCPAygPERZK1lpYE+m+k92H+i/K7gIV1M +1E07p8x5X9i0oDbZ0lxv8I8CAwEAAQ== +-----END PUBLIC KEY----- +$$ +WHERE name = 'runescape'; +-- @formatter:on diff --git a/cache/src/main/kotlin/org/openrs2/cache/Js5MasterIndex.kt b/cache/src/main/kotlin/org/openrs2/cache/Js5MasterIndex.kt index da5838f6..885021a7 100644 --- a/cache/src/main/kotlin/org/openrs2/cache/Js5MasterIndex.kt +++ b/cache/src/main/kotlin/org/openrs2/cache/Js5MasterIndex.kt @@ -180,6 +180,19 @@ public data class Js5MasterIndex( } public fun read(buf: ByteBuf, format: MasterIndexFormat, key: RSAKeyParameters? = null): Js5MasterIndex { + return read(buf, format, key, true) + } + + public fun readUnverified(buf: ByteBuf, format: MasterIndexFormat): Js5MasterIndex { + return read(buf, format, null, false) + } + + private fun read( + buf: ByteBuf, + format: MasterIndexFormat, + key: RSAKeyParameters?, + verify: Boolean + ): Js5MasterIndex { val index = Js5MasterIndex(format) val start = buf.readerIndex() @@ -233,24 +246,26 @@ public data class Js5MasterIndex( index.entries += Entry(version, checksum, groups, totalUncompressedLength, digest) } - val end = buf.readerIndex() + if (verify) { + val end = buf.readerIndex() - if (format >= MasterIndexFormat.DIGESTS) { - val ciphertext = buf.readSlice(buf.readableBytes()) - decrypt(ciphertext, key).use { plaintext -> - require(plaintext.readableBytes() == SIGNATURE_LENGTH) { - "Invalid signature length" - } + if (format >= MasterIndexFormat.DIGESTS) { + val ciphertext = buf.readSlice(buf.readableBytes()) + decrypt(ciphertext, key).use { plaintext -> + require(plaintext.readableBytes() == SIGNATURE_LENGTH) { + "Invalid signature length" + } - // the client doesn't verify what I presume is the RSA magic byte - plaintext.skipBytes(1) + // the client doesn't verify what I presume is the RSA magic byte + plaintext.skipBytes(1) - val expected = ByteArray(Whirlpool.DIGESTBYTES) - plaintext.readBytes(expected) + val expected = ByteArray(Whirlpool.DIGESTBYTES) + plaintext.readBytes(expected) - val actual = buf.whirlpool(start, end - start) - require(expected.contentEquals(actual)) { - "Invalid signature" + val actual = buf.whirlpool(start, end - start) + require(expected.contentEquals(actual)) { + "Invalid signature" + } } } } diff --git a/cache/src/test/kotlin/org/openrs2/cache/Js5MasterIndexTest.kt b/cache/src/test/kotlin/org/openrs2/cache/Js5MasterIndexTest.kt index 706f1553..08a1c8c1 100644 --- a/cache/src/test/kotlin/org/openrs2/cache/Js5MasterIndexTest.kt +++ b/cache/src/test/kotlin/org/openrs2/cache/Js5MasterIndexTest.kt @@ -206,8 +206,11 @@ class Js5MasterIndexTest { @Test fun testReadWhirlpool() { Unpooled.wrappedBuffer(encodedWhirlpool).use { buf -> - val index = Js5MasterIndex.read(buf, MasterIndexFormat.DIGESTS) + val index = Js5MasterIndex.read(buf.slice(), MasterIndexFormat.DIGESTS) assertEquals(decodedWhirlpool, index) + + val indexUnverified = Js5MasterIndex.readUnverified(buf, MasterIndexFormat.DIGESTS) + assertEquals(decodedWhirlpool, indexUnverified) } } @@ -218,8 +221,11 @@ class Js5MasterIndexTest { buf.setByte(lastIndex, buf.getByte(lastIndex).toInt().inv()) assertFailsWith { - Js5MasterIndex.read(buf, MasterIndexFormat.DIGESTS) + Js5MasterIndex.read(buf.slice(), MasterIndexFormat.DIGESTS) } + + val index = Js5MasterIndex.readUnverified(buf, MasterIndexFormat.DIGESTS) + assertEquals(decodedWhirlpool, index) } } @@ -227,8 +233,11 @@ class Js5MasterIndexTest { fun testReadWhirlpoolInvalidSignatureLength() { Unpooled.wrappedBuffer(encodedWhirlpool, 0, encodedWhirlpool.size - 1).use { buf -> assertFailsWith { - Js5MasterIndex.read(buf, MasterIndexFormat.DIGESTS) + Js5MasterIndex.read(buf.slice(), MasterIndexFormat.DIGESTS) } + + val index = Js5MasterIndex.readUnverified(buf, MasterIndexFormat.DIGESTS) + assertEquals(decodedWhirlpool, index) } } @@ -257,8 +266,11 @@ class Js5MasterIndexTest { @Test fun testReadSigned() { Unpooled.wrappedBuffer(encodedSigned).use { buf -> - val index = Js5MasterIndex.read(buf, MasterIndexFormat.DIGESTS, PUBLIC_KEY) + val index = Js5MasterIndex.read(buf.slice(), MasterIndexFormat.DIGESTS, PUBLIC_KEY) assertEquals(decodedWhirlpool, index) + + val indexUnverified = Js5MasterIndex.readUnverified(buf, MasterIndexFormat.DIGESTS) + assertEquals(decodedWhirlpool, indexUnverified) } } @@ -269,8 +281,11 @@ class Js5MasterIndexTest { buf.setByte(lastIndex, buf.getByte(lastIndex).toInt().inv()) assertFailsWith { - Js5MasterIndex.read(buf, MasterIndexFormat.DIGESTS, PUBLIC_KEY) + Js5MasterIndex.read(buf.slice(), MasterIndexFormat.DIGESTS, PUBLIC_KEY) } + + val index = Js5MasterIndex.readUnverified(buf, MasterIndexFormat.DIGESTS) + assertEquals(decodedWhirlpool, index) } } @@ -278,8 +293,11 @@ class Js5MasterIndexTest { fun testReadSignedInvalidSignatureLength() { Unpooled.wrappedBuffer(encodedSigned, 0, encodedSigned.size - 1).use { buf -> assertFailsWith { - Js5MasterIndex.read(buf, MasterIndexFormat.DIGESTS, PUBLIC_KEY) + Js5MasterIndex.read(buf.slice(), MasterIndexFormat.DIGESTS, PUBLIC_KEY) } + + val index = Js5MasterIndex.readUnverified(buf, MasterIndexFormat.DIGESTS) + assertEquals(decodedWhirlpool, index) } }