diff --git a/archive/src/main/kotlin/org/openrs2/archive/cache/CacheDownloader.kt b/archive/src/main/kotlin/org/openrs2/archive/cache/CacheDownloader.kt index e3ccde01..c0c5a270 100644 --- a/archive/src/main/kotlin/org/openrs2/archive/cache/CacheDownloader.kt +++ b/archive/src/main/kotlin/org/openrs2/archive/cache/CacheDownloader.kt @@ -1,7 +1,9 @@ package org.openrs2.archive.cache +import org.openrs2.archive.cache.nxt.MusicStreamClient import org.openrs2.archive.game.GameDatabase import org.openrs2.archive.jav.JavConfig +import org.openrs2.buffer.ByteBufBodyHandler import org.openrs2.net.BootstrapFactory import org.openrs2.net.awaitSuspend import java.net.URI @@ -13,6 +15,7 @@ import kotlin.coroutines.suspendCoroutine @Singleton public class CacheDownloader @Inject constructor( private val client: HttpClient, + private val byteBufBodyHandler: ByteBufBodyHandler, private val bootstrapFactory: BootstrapFactory, private val gameDatabase: GameDatabase, private val importer: CacheImporter @@ -21,28 +24,67 @@ public class CacheDownloader @Inject constructor( val game = gameDatabase.getGame(gameName) ?: throw Exception("Game not found") val url = game.url ?: throw Exception("URL not set") - val build = game.build ?: throw Exception("Current build not set") + val buildMajor = game.buildMajor ?: throw Exception("Current major build not set") val config = JavConfig.download(client, url) - val codebase = config.config[CODEBASE] ?: throw Exception("Codebase missing") - val hostname = URI(codebase).host ?: throw Exception("Hostname missing") val group = bootstrapFactory.createEventLoopGroup() try { suspendCoroutine { continuation -> val bootstrap = bootstrapFactory.createBootstrap(group) - val handler = Js5ChannelHandler( - bootstrap, - game.id, - hostname, - PORT, - build, - game.lastMasterIndexId, - continuation, - importer - ) - - bootstrap.handler(Js5ChannelInitializer(handler)) + + val hostname: String + + val initializer = when (gameName) { + "oldschool" -> { + val codebase = config.config[CODEBASE] ?: throw Exception("Codebase missing") + hostname = URI(codebase).host ?: throw Exception("Hostname missing") + + OsrsJs5ChannelInitializer( + OsrsJs5ChannelHandler( + bootstrap, + game.id, + hostname, + PORT, + buildMajor, + game.lastMasterIndexId, + continuation, + importer, + game.key + ) + ) + } + "runescape" -> { + val buildMinor = game.buildMinor ?: throw Exception("Current minor build not set") + + val tokens = config.params.values.filter { TOKEN_REGEX.matches(it) } + val token = tokens.singleOrNull() ?: throw Exception("Multiple candidate tokens: $tokens") + + hostname = NXT_HOSTNAME + + val musicStreamClient = MusicStreamClient(client, byteBufBodyHandler, "http://$hostname") + + NxtJs5ChannelInitializer( + NxtJs5ChannelHandler( + bootstrap, + game.id, + hostname, + PORT, + buildMajor, + buildMinor, + game.lastMasterIndexId, + continuation, + importer, + game.key, + token, + musicStreamClient + ) + ) + } + else -> throw UnsupportedOperationException() + } + + bootstrap.handler(initializer) .connect(hostname, PORT) } } finally { @@ -52,6 +94,8 @@ public class CacheDownloader @Inject constructor( private companion object { private const val CODEBASE = "codebase" + private const val NXT_HOSTNAME = "content.runescape.com" private const val PORT = 443 + private val TOKEN_REGEX = Regex("[A-Za-z0-9*-]{32}") } } diff --git a/archive/src/main/kotlin/org/openrs2/archive/cache/CacheExporter.kt b/archive/src/main/kotlin/org/openrs2/archive/cache/CacheExporter.kt index 4ce091f6..d05be3cb 100644 --- a/archive/src/main/kotlin/org/openrs2/archive/cache/CacheExporter.kt +++ b/archive/src/main/kotlin/org/openrs2/archive/cache/CacheExporter.kt @@ -11,6 +11,7 @@ import org.openrs2.cache.MasterIndexFormat import org.openrs2.cache.Store import org.openrs2.crypto.XteaKey import org.openrs2.db.Database +import org.postgresql.util.PGobject import java.time.Instant import java.util.SortedSet import javax.inject.Inject @@ -52,10 +53,43 @@ public class CacheExporter @Inject constructor( } } + public data class Build(val major: Int, val minor: Int?) : Comparable { + override fun compareTo(other: Build): Int { + return compareValuesBy(this, other, Build::major, Build::minor) + } + + override fun toString(): String { + return if (minor != null) { + "$major.$minor" + } else { + major.toString() + } + } + + internal companion object { + internal fun fromPgObject(o: PGobject): Build? { + val value = o.value!! + require(value.length >= 2) + + val parts = value.substring(1, value.length - 1).split(",") + require(parts.size == 2) + + val major = parts[0] + val minor = parts[1] + + if (major.isEmpty()) { + return null + } + + return Build(major.toInt(), if (minor.isEmpty()) null else minor.toInt()) + } + } + } + public data class CacheSummary( val id: Int, val game: String, - val builds: SortedSet, + val builds: SortedSet, val timestamp: Instant?, val names: SortedSet, val stats: Stats? @@ -70,7 +104,7 @@ public class CacheExporter @Inject constructor( public data class Source( val game: String, - val build: Int?, + val build: Build?, val timestamp: Instant?, val name: String?, val description: String?, @@ -90,26 +124,29 @@ public class CacheExporter @Inject constructor( return database.execute { connection -> connection.prepareStatement( """ - SELECT - m.id, - g.name, - array_remove(array_agg(DISTINCT s.build ORDER BY s.build ASC), NULL), - MIN(s.timestamp), - array_remove(array_agg(DISTINCT s.name ORDER BY s.name ASC), NULL), - ms.valid_indexes, - ms.indexes, - ms.valid_groups, - ms.groups, - ms.valid_keys, - ms.keys, - ms.size - FROM master_indexes m - JOIN sources s ON s.master_index_id = m.id - JOIN games g ON g.id = s.game_id - LEFT JOIN master_index_stats ms ON ms.master_index_id = m.id - GROUP BY m.id, g.name, ms.valid_indexes, ms.indexes, ms.valid_groups, ms.groups, ms.valid_keys, ms.keys, - ms.size - ORDER BY g.name ASC, MIN(s.build) ASC, MIN(s.timestamp) ASC + SELECT * + FROM ( + SELECT + m.id, + g.name, + array_remove(array_agg(DISTINCT ROW(s.build_major, s.build_minor)::build ORDER BY ROW(s.build_major, s.build_minor)::build ASC), NULL) builds, + MIN(s.timestamp) AS timestamp, + array_remove(array_agg(DISTINCT s.name ORDER BY s.name ASC), NULL) sources, + ms.valid_indexes, + ms.indexes, + ms.valid_groups, + ms.groups, + ms.valid_keys, + ms.keys, + ms.size + FROM master_indexes m + JOIN sources s ON s.master_index_id = m.id + JOIN games g ON g.id = s.game_id + LEFT JOIN master_index_stats ms ON ms.master_index_id = m.id + GROUP BY m.id, g.name, ms.valid_indexes, ms.indexes, ms.valid_groups, ms.groups, ms.valid_keys, ms.keys, + ms.size + ) t + ORDER BY t.name ASC, t.builds[1] ASC, t.timestamp ASC """.trimIndent() ).use { stmt -> stmt.executeQuery().use { rows -> @@ -118,7 +155,7 @@ public class CacheExporter @Inject constructor( while (rows.next()) { val id = rows.getInt(1) val game = rows.getString(2) - val builds = rows.getArray(3).array as Array + val builds = rows.getArray(3).array as Array val timestamp = rows.getTimestamp(4)?.toInstant() val names = rows.getArray(5).array as Array @@ -138,7 +175,7 @@ public class CacheExporter @Inject constructor( caches += CacheSummary( id, game, - builds.toSortedSet(), + builds.mapNotNull { o -> Build.fromPgObject(o as PGobject) }.toSortedSet(), timestamp, names.toSortedSet(), stats @@ -185,7 +222,7 @@ public class CacheExporter @Inject constructor( masterIndex = Unpooled.wrappedBuffer(rows.getBytes(2)).use { compressed -> Js5Compression.uncompress(compressed).use { uncompressed -> - Js5MasterIndex.read(uncompressed, format) + Js5MasterIndex.readUnverified(uncompressed, format) } } @@ -208,7 +245,7 @@ public class CacheExporter @Inject constructor( connection.prepareStatement( """ - SELECT g.name, s.build, s.timestamp, s.name, s.description, s.url + SELECT g.name, s.build_major, s.build_minor, s.timestamp, s.name, s.description, s.url FROM sources s JOIN games g ON g.id = s.game_id WHERE s.master_index_id = ? @@ -221,15 +258,26 @@ public class CacheExporter @Inject constructor( while (rows.next()) { val game = rows.getString(1) - var build: Int? = rows.getInt(2) + var buildMajor: Int? = rows.getInt(2) if (rows.wasNull()) { - build = null + buildMajor = null } - val timestamp = rows.getTimestamp(3)?.toInstant() - val name = rows.getString(4) - val description = rows.getString(5) - val url = rows.getString(6) + var buildMinor: Int? = rows.getInt(3) + if (rows.wasNull()) { + buildMinor = null + } + + val build = if (buildMajor != null) { + Build(buildMajor, buildMinor) + } else { + null + } + + val timestamp = rows.getTimestamp(4)?.toInstant() + val name = rows.getString(5) + val description = rows.getString(6) + val url = rows.getString(7) sources += Source(game, build, timestamp, name, description, url) } diff --git a/archive/src/main/kotlin/org/openrs2/archive/cache/CacheImporter.kt b/archive/src/main/kotlin/org/openrs2/archive/cache/CacheImporter.kt index f4a6f335..45bbe20e 100644 --- a/archive/src/main/kotlin/org/openrs2/archive/cache/CacheImporter.kt +++ b/archive/src/main/kotlin/org/openrs2/archive/cache/CacheImporter.kt @@ -88,7 +88,8 @@ public class CacheImporter @Inject constructor( public suspend fun import( store: Store, game: String, - build: Int?, + buildMajor: Int?, + buildMinor: Int?, timestamp: Instant?, name: String?, description: String?, @@ -117,7 +118,8 @@ public class CacheImporter @Inject constructor( SourceType.DISK, masterIndexId, gameId, - build, + buildMajor, + buildMinor, timestamp, name, description, @@ -182,21 +184,38 @@ public class CacheImporter @Inject constructor( buf: ByteBuf, format: MasterIndexFormat, game: String, - build: Int?, + buildMajor: Int?, + buildMinor: Int?, timestamp: Instant?, name: String?, description: String?, url: String? ) { Js5Compression.uncompress(buf.slice()).use { uncompressed -> - val masterIndex = MasterIndex(Js5MasterIndex.read(uncompressed.slice(), format), buf, uncompressed) + val masterIndex = MasterIndex( + Js5MasterIndex.readUnverified(uncompressed.slice(), format), + buf, + uncompressed + ) database.execute { connection -> prepare(connection) val gameId = getGameId(connection, game) val masterIndexId = addMasterIndex(connection, masterIndex) - addSource(connection, SourceType.DISK, masterIndexId, gameId, build, timestamp, name, description, url) + + addSource( + connection, + SourceType.DISK, + masterIndexId, + gameId, + buildMajor, + buildMinor, + timestamp, + name, + description, + url + ) } } } @@ -206,7 +225,8 @@ public class CacheImporter @Inject constructor( buf: ByteBuf, uncompressed: ByteBuf, gameId: Int, - build: Int, + buildMajor: Int, + buildMinor: Int?, lastId: Int?, timestamp: Instant ): MasterIndexResult { @@ -216,12 +236,13 @@ public class CacheImporter @Inject constructor( connection.prepareStatement( """ UPDATE games - SET build = ? + SET build_major = ?, build_minor = ? WHERE id = ? """.trimIndent() ).use { stmt -> - stmt.setInt(1, build) - stmt.setInt(2, gameId) + stmt.setInt(1, buildMajor) + stmt.setObject(2, buildMinor, Types.INTEGER) + stmt.setInt(3, gameId) stmt.execute() } @@ -236,7 +257,8 @@ public class CacheImporter @Inject constructor( SourceType.JS5REMOTE, masterIndexId, gameId, - build, + buildMajor, + buildMinor, timestamp, name = "Jagex", description = null, @@ -451,23 +473,25 @@ public class CacheImporter @Inject constructor( type: SourceType, masterIndexId: Int, gameId: Int, - build: Int?, + buildMajor: Int?, + buildMinor: Int?, timestamp: Instant?, name: String?, description: String?, url: String? ): Int { - if (type == SourceType.JS5REMOTE && build != null) { + if (type == SourceType.JS5REMOTE && buildMajor != null) { connection.prepareStatement( """ SELECT id FROM sources - WHERE type = 'js5remote' AND master_index_id = ? AND game_id = ? AND build = ? + WHERE type = 'js5remote' AND master_index_id = ? AND game_id = ? AND build_major = ? AND build_minor IS NOT DISTINCT FROM ? """.trimIndent() ).use { stmt -> stmt.setInt(1, masterIndexId) stmt.setInt(2, gameId) - stmt.setInt(3, build) + stmt.setInt(3, buildMajor) + stmt.setObject(4, buildMinor, Types.INTEGER) stmt.executeQuery().use { rows -> if (rows.next()) { @@ -479,25 +503,26 @@ public class CacheImporter @Inject constructor( connection.prepareStatement( """ - INSERT INTO sources (type, master_index_id, game_id, build, timestamp, name, description, url) - VALUES (?::source_type, ?, ?, ?, ?, ?, ?, ?) + INSERT INTO sources (type, master_index_id, game_id, build_major, build_minor, timestamp, name, description, url) + VALUES (?::source_type, ?, ?, ?, ?, ?, ?, ?, ?) RETURNING id """.trimIndent() ).use { stmt -> stmt.setString(1, type.toString().lowercase()) stmt.setInt(2, masterIndexId) stmt.setInt(3, gameId) - stmt.setObject(4, build, Types.INTEGER) + stmt.setObject(4, buildMajor, Types.INTEGER) + stmt.setObject(5, buildMinor, Types.INTEGER) if (timestamp != null) { - stmt.setObject(5, timestamp.atOffset(ZoneOffset.UTC), Types.TIMESTAMP_WITH_TIMEZONE) + stmt.setObject(6, timestamp.atOffset(ZoneOffset.UTC), Types.TIMESTAMP_WITH_TIMEZONE) } else { - stmt.setNull(5, Types.TIMESTAMP_WITH_TIMEZONE) + stmt.setNull(6, Types.TIMESTAMP_WITH_TIMEZONE) } - stmt.setString(6, name) - stmt.setString(7, description) - stmt.setString(8, url) + stmt.setString(7, name) + stmt.setString(8, description) + stmt.setString(9, url) stmt.executeQuery().use { rows -> check(rows.next()) diff --git a/archive/src/main/kotlin/org/openrs2/archive/cache/ImportCommand.kt b/archive/src/main/kotlin/org/openrs2/archive/cache/ImportCommand.kt index 2c0aadb4..df9acd30 100644 --- a/archive/src/main/kotlin/org/openrs2/archive/cache/ImportCommand.kt +++ b/archive/src/main/kotlin/org/openrs2/archive/cache/ImportCommand.kt @@ -13,7 +13,8 @@ import org.openrs2.cli.instant import org.openrs2.inject.CloseableInjector public class ImportCommand : CliktCommand(name = "import") { - private val build by option().int() + private val buildMajor by option().int() + private val buildMinor by option().int() private val timestamp by option().instant() private val name by option() private val description by option() @@ -31,7 +32,7 @@ public class ImportCommand : CliktCommand(name = "import") { val importer = injector.getInstance(CacheImporter::class.java) Store.open(input).use { store -> - importer.import(store, game, build, timestamp, name, description, url) + importer.import(store, game, buildMajor, buildMinor, timestamp, name, description, url) } } } diff --git a/archive/src/main/kotlin/org/openrs2/archive/cache/ImportMasterIndexCommand.kt b/archive/src/main/kotlin/org/openrs2/archive/cache/ImportMasterIndexCommand.kt index e3ca8efe..1f4249ab 100644 --- a/archive/src/main/kotlin/org/openrs2/archive/cache/ImportMasterIndexCommand.kt +++ b/archive/src/main/kotlin/org/openrs2/archive/cache/ImportMasterIndexCommand.kt @@ -17,7 +17,8 @@ import org.openrs2.inject.CloseableInjector import java.nio.file.Files public class ImportMasterIndexCommand : CliktCommand(name = "import-master-index") { - private val build by option().int() + private val buildMajor by option().int() + private val buildMinor by option().int() private val timestamp by option().instant() private val name by option() private val description by option() @@ -36,7 +37,7 @@ public class ImportMasterIndexCommand : CliktCommand(name = "import-master-index val importer = injector.getInstance(CacheImporter::class.java) Unpooled.wrappedBuffer(Files.readAllBytes(input)).use { buf -> - importer.importMasterIndex(buf, format, game, build, timestamp, name, description, url) + importer.importMasterIndex(buf, format, game, buildMajor, buildMinor, timestamp, name, description, url) } } } diff --git a/archive/src/main/kotlin/org/openrs2/archive/cache/Js5ChannelHandler.kt b/archive/src/main/kotlin/org/openrs2/archive/cache/Js5ChannelHandler.kt index d0ca5bf6..70d315c2 100644 --- a/archive/src/main/kotlin/org/openrs2/archive/cache/Js5ChannelHandler.kt +++ b/archive/src/main/kotlin/org/openrs2/archive/cache/Js5ChannelHandler.kt @@ -5,8 +5,10 @@ import io.netty.bootstrap.Bootstrap import io.netty.buffer.ByteBuf import io.netty.channel.ChannelHandler import io.netty.channel.ChannelHandlerContext +import io.netty.channel.ChannelPipeline import io.netty.channel.SimpleChannelInboundHandler import kotlinx.coroutines.runBlocking +import org.bouncycastle.crypto.params.RSAKeyParameters import org.openrs2.buffer.crc32 import org.openrs2.buffer.use import org.openrs2.cache.Js5Archive @@ -14,34 +16,36 @@ import org.openrs2.cache.Js5Compression import org.openrs2.cache.Js5Index import org.openrs2.cache.Js5MasterIndex import org.openrs2.cache.MasterIndexFormat -import org.openrs2.protocol.Rs2Decoder -import org.openrs2.protocol.Rs2Encoder -import org.openrs2.protocol.js5.Js5Request -import org.openrs2.protocol.js5.Js5RequestEncoder -import org.openrs2.protocol.js5.Js5Response -import org.openrs2.protocol.js5.Js5ResponseDecoder -import org.openrs2.protocol.js5.XorDecoder -import org.openrs2.protocol.login.LoginRequest -import org.openrs2.protocol.login.LoginResponse import java.time.Instant import kotlin.coroutines.Continuation import kotlin.coroutines.resume import kotlin.coroutines.resumeWithException @ChannelHandler.Sharable -public class Js5ChannelHandler( +public abstract class Js5ChannelHandler( private val bootstrap: Bootstrap, private val gameId: Int, private val hostname: String, private val port: Int, - private var build: Int, + protected var buildMajor: Int, + protected var buildMinor: Int?, private val lastMasterIndexId: Int?, private val continuation: Continuation, private val importer: CacheImporter, - private val masterIndexFormat: MasterIndexFormat = MasterIndexFormat.VERSIONED, - private val maxInFlightRequests: Int = 200, - maxBuildAttempts: Int = 10 + private val key: RSAKeyParameters?, + private val masterIndexFormat: MasterIndexFormat, + private val maxInFlightRequests: Int, + private val maxBuildAttempts: Int = 10 ) : SimpleChannelInboundHandler(Object::class.java) { + protected data class InFlightRequest(val prefetch: Boolean, val archive: Int, val group: Int) + protected data class PendingRequest( + val prefetch: Boolean, + val archive: Int, + val group: Int, + val version: Int, + val checksum: Int + ) + private enum class State { ACTIVE, CLIENT_OUT_OF_DATE, @@ -49,39 +53,35 @@ public class Js5ChannelHandler( } private var state = State.ACTIVE - private val maxBuild = build + maxBuildAttempts - private val inFlightRequests = mutableSetOf() - private val pendingRequests = ArrayDeque() + private var buildAttempts = 0 + private val inFlightRequests = mutableSetOf() + private val pendingRequests = ArrayDeque() private var masterIndexId: Int = 0 private var sourceId: Int = 0 private var masterIndex: Js5MasterIndex? = null private lateinit var indexes: Array private val groups = mutableListOf() + protected abstract fun createInitMessage(): Any + protected abstract fun createRequestMessage(prefetch: Boolean, archive: Int, group: Int): Any + protected abstract fun createConnectedMessage(): Any? + protected abstract fun configurePipeline(pipeline: ChannelPipeline) + protected abstract fun incrementVersion() + override fun channelActive(ctx: ChannelHandlerContext) { - ctx.writeAndFlush(LoginRequest.InitJs5RemoteConnection(build), ctx.voidPromise()) + ctx.writeAndFlush(createInitMessage(), ctx.voidPromise()) ctx.read() } - override fun channelRead0(ctx: ChannelHandlerContext, msg: Any) { - when (msg) { - is LoginResponse.Js5Ok -> handleOk(ctx) - is LoginResponse.ClientOutOfDate -> handleClientOutOfDate(ctx) - is LoginResponse -> throw Exception("Invalid response: $msg") - is Js5Response -> handleResponse(ctx, msg) - else -> throw Exception("Unknown message type: ${msg.javaClass.name}") - } - } - override fun channelReadComplete(ctx: ChannelHandlerContext) { var flush = false while (inFlightRequests.size < maxInFlightRequests) { val request = pendingRequests.removeFirstOrNull() ?: break - inFlightRequests += request + inFlightRequests += InFlightRequest(request.prefetch, request.archive, request.group) logger.info { "Requesting archive ${request.archive} group ${request.group}" } - ctx.write(request, ctx.voidPromise()) + ctx.write(createRequestMessage(request.prefetch, request.archive, request.group), ctx.voidPromise()) flush = true } @@ -112,53 +112,60 @@ public class Js5ChannelHandler( continuation.resumeWithException(cause) } - private fun handleOk(ctx: ChannelHandlerContext) { - val pipeline = ctx.pipeline() + protected fun handleOk(ctx: ChannelHandlerContext) { + configurePipeline(ctx.pipeline()) - pipeline.remove(Rs2Encoder::class.java) - pipeline.remove(Rs2Decoder::class.java) - pipeline.addFirst( - Js5RequestEncoder, - XorDecoder(), - Js5ResponseDecoder() - ) + val msg = createConnectedMessage() + if (msg != null) { + ctx.write(msg, ctx.voidPromise()) + } - request(Js5Archive.ARCHIVESET, Js5Archive.ARCHIVESET) + request(ctx, Js5Archive.ARCHIVESET, Js5Archive.ARCHIVESET, 0, 0) } - private fun handleClientOutOfDate(ctx: ChannelHandlerContext) { - if (++build > maxBuild) { + protected fun handleClientOutOfDate(ctx: ChannelHandlerContext) { + if (++buildAttempts > maxBuildAttempts) { throw Exception("Failed to identify current version") } state = State.CLIENT_OUT_OF_DATE + incrementVersion() + ctx.close() } - private fun handleResponse(ctx: ChannelHandlerContext, response: Js5Response) { - val request = Js5Request.Group(response.prefetch, response.archive, response.group) + protected fun handleResponse( + ctx: ChannelHandlerContext, + prefetch: Boolean, + archive: Int, + group: Int, + data: ByteBuf + ) { + val request = InFlightRequest(prefetch, archive, group) val removed = inFlightRequests.remove(request) if (!removed) { - val type = if (response.prefetch) { + val type = if (prefetch) { "prefetch" } else { "urgent" } - val archive = response.archive - val group = response.group throw Exception("Received response for $type request (archive $archive group $group) not in-flight") } - if (response.archive == Js5Archive.ARCHIVESET && response.group == Js5Archive.ARCHIVESET) { - processMasterIndex(response.data) - } else if (response.archive == Js5Archive.ARCHIVESET) { - processIndex(response.group, response.data) + processResponse(ctx, archive, group, data) + } + + protected fun processResponse(ctx: ChannelHandlerContext, archive: Int, group: Int, data: ByteBuf) { + if (archive == Js5Archive.ARCHIVESET && group == Js5Archive.ARCHIVESET) { + processMasterIndex(ctx, data) + } else if (archive == Js5Archive.ARCHIVESET) { + processIndex(ctx, group, data) } else { - processGroup(response.archive, response.group, response.data) + processGroup(archive, group, data) } - val complete = pendingRequests.isEmpty() && inFlightRequests.isEmpty() + val complete = isComplete() if (groups.size >= CacheImporter.BATCH_SIZE || complete) { runBlocking { @@ -179,9 +186,13 @@ public class Js5ChannelHandler( } } - private fun processMasterIndex(buf: ByteBuf) { + protected open fun isComplete(): Boolean { + return pendingRequests.isEmpty() && inFlightRequests.isEmpty() + } + + private fun processMasterIndex(ctx: ChannelHandlerContext, buf: ByteBuf) { Js5Compression.uncompress(buf.slice()).use { uncompressed -> - masterIndex = Js5MasterIndex.read(uncompressed.slice(), masterIndexFormat) + masterIndex = Js5MasterIndex.read(uncompressed.slice(), masterIndexFormat, key) val (masterIndexId, sourceId, rawIndexes) = runBlocking { importer.importMasterIndexAndGetIndexes( @@ -189,7 +200,8 @@ public class Js5ChannelHandler( buf, uncompressed, gameId, - build, + buildMajor, + buildMinor, lastMasterIndexId, timestamp = Instant.now() ) @@ -202,10 +214,15 @@ public class Js5ChannelHandler( indexes = arrayOfNulls(rawIndexes.size) for ((archive, index) in rawIndexes.withIndex()) { + val entry = masterIndex!!.entries[archive] + if (entry.version == 0 && entry.checksum == 0) { + continue + } + if (index != null) { - processIndex(archive, index) + processIndex(ctx, archive, index) } else { - request(Js5Archive.ARCHIVESET, archive) + request(ctx, Js5Archive.ARCHIVESET, archive, entry.version, entry.checksum) } } } finally { @@ -214,7 +231,7 @@ public class Js5ChannelHandler( } } - private fun processIndex(archive: Int, buf: ByteBuf) { + private fun processIndex(ctx: ChannelHandlerContext, archive: Int, buf: ByteBuf) { val checksum = buf.crc32() val entry = masterIndex!!.entries[archive] if (checksum != entry.checksum) { @@ -233,7 +250,8 @@ public class Js5ChannelHandler( importer.importIndexAndGetMissingGroups(sourceId, archive, index, buf, uncompressed, lastMasterIndexId) } for (group in groups) { - request(archive, group) + val groupEntry = index[group]!! + request(ctx, archive, group, groupEntry.version, groupEntry.checksum) } } } @@ -257,8 +275,8 @@ public class Js5ChannelHandler( ) } - private fun request(archive: Int, group: Int) { - pendingRequests += Js5Request.Group(false, archive, group) + protected open fun request(ctx: ChannelHandlerContext, archive: Int, group: Int, version: Int, checksum: Int) { + pendingRequests += PendingRequest(false, archive, group, version, checksum) } private fun releaseGroups() { diff --git a/archive/src/main/kotlin/org/openrs2/archive/cache/NxtJs5ChannelHandler.kt b/archive/src/main/kotlin/org/openrs2/archive/cache/NxtJs5ChannelHandler.kt new file mode 100644 index 00000000..04b082e0 --- /dev/null +++ b/archive/src/main/kotlin/org/openrs2/archive/cache/NxtJs5ChannelHandler.kt @@ -0,0 +1,158 @@ +package org.openrs2.archive.cache + +import com.github.michaelbull.logging.InlineLogger +import io.netty.bootstrap.Bootstrap +import io.netty.channel.ChannelHandlerContext +import io.netty.channel.ChannelPipeline +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.asCoroutineDispatcher +import kotlinx.coroutines.cancel +import kotlinx.coroutines.launch +import org.bouncycastle.crypto.params.RSAKeyParameters +import org.openrs2.archive.cache.nxt.InitJs5RemoteConnection +import org.openrs2.archive.cache.nxt.Js5Request +import org.openrs2.archive.cache.nxt.Js5RequestEncoder +import org.openrs2.archive.cache.nxt.Js5Response +import org.openrs2.archive.cache.nxt.Js5ResponseDecoder +import org.openrs2.archive.cache.nxt.LoginResponse +import org.openrs2.archive.cache.nxt.MusicStreamClient +import org.openrs2.buffer.use +import org.openrs2.cache.MasterIndexFormat +import org.openrs2.protocol.Rs2Decoder +import org.openrs2.protocol.Rs2Encoder +import org.openrs2.protocol.js5.XorDecoder +import kotlin.coroutines.Continuation + +public class NxtJs5ChannelHandler( + bootstrap: Bootstrap, + gameId: Int, + hostname: String, + port: Int, + buildMajor: Int, + buildMinor: Int, + lastMasterIndexId: Int?, + continuation: Continuation, + importer: CacheImporter, + key: RSAKeyParameters?, + private val token: String, + private val musicStreamClient: MusicStreamClient, + private val maxMinorBuildAttempts: Int = 5 +) : Js5ChannelHandler( + bootstrap, + gameId, + hostname, + port, + buildMajor, + buildMinor, + lastMasterIndexId, + continuation, + importer, + key, + MasterIndexFormat.LENGTHS, + maxInFlightRequests = 500 +) { + private data class MusicRequest(val archive: Int, val group: Int, val version: Int, val checksum: Int) + + private var inFlightRequests = 0 + private val pendingRequests = ArrayDeque() + private var scope: CoroutineScope? = null + private var minorBuildAttempts = 0 + + override fun createInitMessage(): Any { + return InitJs5RemoteConnection(buildMajor, buildMinor!!, token, 0) + } + + override fun createRequestMessage(prefetch: Boolean, archive: Int, group: Int): Any { + return Js5Request.Group(prefetch, archive, group, buildMajor) + } + + override fun createConnectedMessage(): Any? { + return Js5Request.Connected(buildMajor) + } + + override fun configurePipeline(pipeline: ChannelPipeline) { + pipeline.addBefore("handler", null, Js5RequestEncoder) + pipeline.addBefore("handler", null, XorDecoder()) + pipeline.addBefore("handler", null, Js5ResponseDecoder()) + + pipeline.remove(Rs2Encoder::class.java) + pipeline.remove(Rs2Decoder::class.java) + } + + override fun incrementVersion() { + buildMinor = buildMinor!! + 1 + + if (++minorBuildAttempts >= maxMinorBuildAttempts) { + buildMajor++ + buildMinor = 1 + } + } + + override fun channelActive(ctx: ChannelHandlerContext) { + super.channelActive(ctx) + scope = CoroutineScope(ctx.channel().eventLoop().asCoroutineDispatcher()) + } + + override fun channelInactive(ctx: ChannelHandlerContext) { + super.channelInactive(ctx) + scope!!.cancel() + } + + override fun channelRead0(ctx: ChannelHandlerContext, msg: Any) { + when (msg) { + is LoginResponse.Js5Ok -> handleOk(ctx) + is LoginResponse.ClientOutOfDate -> handleClientOutOfDate(ctx) + is LoginResponse -> throw Exception("Invalid response: $msg") + is Js5Response -> handleResponse(ctx, msg.prefetch, msg.archive, msg.group, msg.data) + else -> throw Exception("Unknown message type: ${msg.javaClass.name}") + } + } + + override fun channelReadComplete(ctx: ChannelHandlerContext) { + super.channelReadComplete(ctx) + + while (inFlightRequests < 6) { + val request = pendingRequests.removeFirstOrNull() ?: break + inFlightRequests++ + + logger.info { "Requesting archive ${request.archive} group ${request.group}" } + + scope!!.launch { + val archive = request.archive + val group = request.group + val version = request.version + val checksum = request.checksum + + musicStreamClient.request(archive, group, version, checksum, buildMajor).use { buf -> + inFlightRequests-- + + processResponse(ctx, archive, group, buf) + + /* + * Inject a fake channelReadComplete event to ensure we + * don't time out and to send any new music requests. + */ + ctx.channel().pipeline().fireChannelReadComplete() + } + } + } + } + + override fun isComplete(): Boolean { + return super.isComplete() && pendingRequests.isEmpty() && inFlightRequests == 0 + } + + override fun request(ctx: ChannelHandlerContext, archive: Int, group: Int, version: Int, checksum: Int) { + if (archive == MUSIC_ARCHIVE) { + pendingRequests += MusicRequest(archive, group, version, checksum) + } else { + super.request(ctx, archive, group, version, checksum) + } + } + + private companion object { + private val logger = InlineLogger() + + private const val MUSIC_ARCHIVE = 40 + } +} diff --git a/archive/src/main/kotlin/org/openrs2/archive/cache/NxtJs5ChannelInitializer.kt b/archive/src/main/kotlin/org/openrs2/archive/cache/NxtJs5ChannelInitializer.kt new file mode 100644 index 00000000..8f7c18ed --- /dev/null +++ b/archive/src/main/kotlin/org/openrs2/archive/cache/NxtJs5ChannelInitializer.kt @@ -0,0 +1,22 @@ +package org.openrs2.archive.cache + +import io.netty.channel.Channel +import io.netty.channel.ChannelInitializer +import io.netty.handler.timeout.ReadTimeoutHandler +import org.openrs2.archive.cache.nxt.ClientOutOfDateCodec +import org.openrs2.archive.cache.nxt.InitJs5RemoteConnectionCodec +import org.openrs2.archive.cache.nxt.Js5OkCodec +import org.openrs2.protocol.Protocol +import org.openrs2.protocol.Rs2Decoder +import org.openrs2.protocol.Rs2Encoder + +public class NxtJs5ChannelInitializer(private val handler: NxtJs5ChannelHandler) : ChannelInitializer() { + override fun initChannel(ch: Channel) { + ch.pipeline().addLast( + ReadTimeoutHandler(30), + Rs2Encoder(Protocol(InitJs5RemoteConnectionCodec)), + Rs2Decoder(Protocol(Js5OkCodec, ClientOutOfDateCodec)) + ) + ch.pipeline().addLast("handler", handler) + } +} diff --git a/archive/src/main/kotlin/org/openrs2/archive/cache/OsrsJs5ChannelHandler.kt b/archive/src/main/kotlin/org/openrs2/archive/cache/OsrsJs5ChannelHandler.kt new file mode 100644 index 00000000..da933410 --- /dev/null +++ b/archive/src/main/kotlin/org/openrs2/archive/cache/OsrsJs5ChannelHandler.kt @@ -0,0 +1,77 @@ +package org.openrs2.archive.cache + +import io.netty.bootstrap.Bootstrap +import io.netty.channel.ChannelHandlerContext +import io.netty.channel.ChannelPipeline +import org.bouncycastle.crypto.params.RSAKeyParameters +import org.openrs2.cache.MasterIndexFormat +import org.openrs2.protocol.Rs2Decoder +import org.openrs2.protocol.Rs2Encoder +import org.openrs2.protocol.js5.Js5Request +import org.openrs2.protocol.js5.Js5RequestEncoder +import org.openrs2.protocol.js5.Js5Response +import org.openrs2.protocol.js5.Js5ResponseDecoder +import org.openrs2.protocol.js5.XorDecoder +import org.openrs2.protocol.login.LoginRequest +import org.openrs2.protocol.login.LoginResponse +import kotlin.coroutines.Continuation + +public class OsrsJs5ChannelHandler( + bootstrap: Bootstrap, + gameId: Int, + hostname: String, + port: Int, + build: Int, + lastMasterIndexId: Int?, + continuation: Continuation, + importer: CacheImporter, + key: RSAKeyParameters? +) : Js5ChannelHandler( + bootstrap, + gameId, + hostname, + port, + build, + null, + lastMasterIndexId, + continuation, + importer, + key, + MasterIndexFormat.VERSIONED, + maxInFlightRequests = 200 +) { + override fun createInitMessage(): Any { + return LoginRequest.InitJs5RemoteConnection(buildMajor) + } + + override fun createRequestMessage(prefetch: Boolean, archive: Int, group: Int): Any { + return Js5Request.Group(prefetch, archive, group) + } + + override fun createConnectedMessage(): Any? { + return null + } + + override fun configurePipeline(pipeline: ChannelPipeline) { + pipeline.addBefore("handler", null, Js5RequestEncoder) + pipeline.addBefore("handler", null, XorDecoder()) + pipeline.addBefore("handler", null, Js5ResponseDecoder()) + + pipeline.remove(Rs2Encoder::class.java) + pipeline.remove(Rs2Decoder::class.java) + } + + override fun incrementVersion() { + buildMajor++ + } + + override fun channelRead0(ctx: ChannelHandlerContext, msg: Any) { + when (msg) { + is LoginResponse.Js5Ok -> handleOk(ctx) + is LoginResponse.ClientOutOfDate -> handleClientOutOfDate(ctx) + is LoginResponse -> throw Exception("Invalid response: $msg") + is Js5Response -> handleResponse(ctx, msg.prefetch, msg.archive, msg.group, msg.data) + else -> throw Exception("Unknown message type: ${msg.javaClass.name}") + } + } +} diff --git a/archive/src/main/kotlin/org/openrs2/archive/cache/Js5ChannelInitializer.kt b/archive/src/main/kotlin/org/openrs2/archive/cache/OsrsJs5ChannelInitializer.kt similarity index 67% rename from archive/src/main/kotlin/org/openrs2/archive/cache/Js5ChannelInitializer.kt rename to archive/src/main/kotlin/org/openrs2/archive/cache/OsrsJs5ChannelInitializer.kt index 01e80b0c..7c53c615 100644 --- a/archive/src/main/kotlin/org/openrs2/archive/cache/Js5ChannelInitializer.kt +++ b/archive/src/main/kotlin/org/openrs2/archive/cache/OsrsJs5ChannelInitializer.kt @@ -7,13 +7,13 @@ import org.openrs2.protocol.Protocol import org.openrs2.protocol.Rs2Decoder import org.openrs2.protocol.Rs2Encoder -public class Js5ChannelInitializer(private val handler: Js5ChannelHandler) : ChannelInitializer() { +public class OsrsJs5ChannelInitializer(private val handler: OsrsJs5ChannelHandler) : ChannelInitializer() { override fun initChannel(ch: Channel) { ch.pipeline().addLast( ReadTimeoutHandler(30), Rs2Encoder(Protocol.LOGIN_UPSTREAM), - Rs2Decoder(Protocol.LOGIN_DOWNSTREAM), - handler + Rs2Decoder(Protocol.LOGIN_DOWNSTREAM) ) + ch.pipeline().addLast("handler", handler) } } diff --git a/archive/src/main/kotlin/org/openrs2/archive/cache/nxt/ClientOutOfDateCodec.kt b/archive/src/main/kotlin/org/openrs2/archive/cache/nxt/ClientOutOfDateCodec.kt new file mode 100644 index 00000000..54e6d42d --- /dev/null +++ b/archive/src/main/kotlin/org/openrs2/archive/cache/nxt/ClientOutOfDateCodec.kt @@ -0,0 +1,8 @@ +package org.openrs2.archive.cache.nxt + +import org.openrs2.protocol.EmptyPacketCodec + +public object ClientOutOfDateCodec : EmptyPacketCodec( + opcode = 6, + packet = LoginResponse.ClientOutOfDate +) diff --git a/archive/src/main/kotlin/org/openrs2/archive/cache/nxt/InitJs5RemoteConnection.kt b/archive/src/main/kotlin/org/openrs2/archive/cache/nxt/InitJs5RemoteConnection.kt new file mode 100644 index 00000000..83af26e8 --- /dev/null +++ b/archive/src/main/kotlin/org/openrs2/archive/cache/nxt/InitJs5RemoteConnection.kt @@ -0,0 +1,10 @@ +package org.openrs2.archive.cache.nxt + +import org.openrs2.protocol.Packet + +public data class InitJs5RemoteConnection( + public val buildMajor: Int, + public val buildMinor: Int, + public val token: String, + public val language: Int +) : Packet diff --git a/archive/src/main/kotlin/org/openrs2/archive/cache/nxt/InitJs5RemoteConnectionCodec.kt b/archive/src/main/kotlin/org/openrs2/archive/cache/nxt/InitJs5RemoteConnectionCodec.kt new file mode 100644 index 00000000..f384ffc2 --- /dev/null +++ b/archive/src/main/kotlin/org/openrs2/archive/cache/nxt/InitJs5RemoteConnectionCodec.kt @@ -0,0 +1,29 @@ +package org.openrs2.archive.cache.nxt + +import io.netty.buffer.ByteBuf +import org.openrs2.buffer.readString +import org.openrs2.buffer.writeString +import org.openrs2.crypto.StreamCipher +import org.openrs2.protocol.PacketCodec +import org.openrs2.protocol.PacketLength + +public object InitJs5RemoteConnectionCodec : PacketCodec( + length = PacketLength.VARIABLE_BYTE, + opcode = 15, + type = InitJs5RemoteConnection::class.java +) { + override fun decode(input: ByteBuf, cipher: StreamCipher): InitJs5RemoteConnection { + val buildMajor = input.readInt() + val buildMinor = input.readInt() + val token = input.readString() + val language = input.readUnsignedByte().toInt() + return InitJs5RemoteConnection(buildMajor, buildMinor, token, language) + } + + override fun encode(input: InitJs5RemoteConnection, output: ByteBuf, cipher: StreamCipher) { + output.writeInt(input.buildMajor) + output.writeInt(input.buildMinor) + output.writeString(input.token) + output.writeByte(input.language) + } +} diff --git a/archive/src/main/kotlin/org/openrs2/archive/cache/nxt/Js5OkCodec.kt b/archive/src/main/kotlin/org/openrs2/archive/cache/nxt/Js5OkCodec.kt new file mode 100644 index 00000000..6648b467 --- /dev/null +++ b/archive/src/main/kotlin/org/openrs2/archive/cache/nxt/Js5OkCodec.kt @@ -0,0 +1,25 @@ +package org.openrs2.archive.cache.nxt + +import io.netty.buffer.ByteBuf +import org.openrs2.crypto.StreamCipher +import org.openrs2.protocol.PacketCodec + +public object Js5OkCodec : PacketCodec( + opcode = 0, + length = LoginResponse.Js5Ok.LOADING_REQUIREMENTS * 4, + type = LoginResponse.Js5Ok::class.java +) { + override fun decode(input: ByteBuf, cipher: StreamCipher): LoginResponse.Js5Ok { + val loadingRequirements = mutableListOf() + for (i in 0 until LoginResponse.Js5Ok.LOADING_REQUIREMENTS) { + loadingRequirements += input.readInt() + } + return LoginResponse.Js5Ok(loadingRequirements) + } + + override fun encode(input: LoginResponse.Js5Ok, output: ByteBuf, cipher: StreamCipher) { + for (requirement in input.loadingRequirements) { + output.writeInt(requirement) + } + } +} diff --git a/archive/src/main/kotlin/org/openrs2/archive/cache/nxt/Js5Request.kt b/archive/src/main/kotlin/org/openrs2/archive/cache/nxt/Js5Request.kt new file mode 100644 index 00000000..5a653a3b --- /dev/null +++ b/archive/src/main/kotlin/org/openrs2/archive/cache/nxt/Js5Request.kt @@ -0,0 +1,14 @@ +package org.openrs2.archive.cache.nxt + +public sealed class Js5Request { + public data class Group( + public val prefetch: Boolean, + public val archive: Int, + public val group: Int, + public val build: Int + ) : Js5Request() + + public data class Connected( + public val build: Int + ) : Js5Request() +} diff --git a/archive/src/main/kotlin/org/openrs2/archive/cache/nxt/Js5RequestEncoder.kt b/archive/src/main/kotlin/org/openrs2/archive/cache/nxt/Js5RequestEncoder.kt new file mode 100644 index 00000000..993d2268 --- /dev/null +++ b/archive/src/main/kotlin/org/openrs2/archive/cache/nxt/Js5RequestEncoder.kt @@ -0,0 +1,36 @@ +package org.openrs2.archive.cache.nxt + +import io.netty.buffer.ByteBuf +import io.netty.channel.ChannelHandler +import io.netty.channel.ChannelHandlerContext +import io.netty.handler.codec.MessageToByteEncoder + +@ChannelHandler.Sharable +public object Js5RequestEncoder : MessageToByteEncoder(Js5Request::class.java) { + override fun encode(ctx: ChannelHandlerContext, msg: Js5Request, out: ByteBuf) { + when (msg) { + is Js5Request.Group -> { + out.writeByte(if (msg.prefetch) 32 else 33) + out.writeByte(msg.archive) + out.writeInt(msg.group) + out.writeShort(msg.build) + out.writeShort(0) + } + is Js5Request.Connected -> { + out.writeByte(6) + out.writeMedium(5) + out.writeShort(0) + out.writeShort(msg.build) + out.writeShort(0) + } + } + } + + override fun allocateBuffer(ctx: ChannelHandlerContext, msg: Js5Request, preferDirect: Boolean): ByteBuf { + return if (preferDirect) { + ctx.alloc().ioBuffer(10, 10) + } else { + ctx.alloc().heapBuffer(10, 10) + } + } +} diff --git a/archive/src/main/kotlin/org/openrs2/archive/cache/nxt/Js5Response.kt b/archive/src/main/kotlin/org/openrs2/archive/cache/nxt/Js5Response.kt new file mode 100644 index 00000000..792e46fd --- /dev/null +++ b/archive/src/main/kotlin/org/openrs2/archive/cache/nxt/Js5Response.kt @@ -0,0 +1,11 @@ +package org.openrs2.archive.cache.nxt + +import io.netty.buffer.ByteBuf +import io.netty.buffer.DefaultByteBufHolder + +public data class Js5Response( + public val prefetch: Boolean, + public val archive: Int, + public val group: Int, + public val data: ByteBuf +) : DefaultByteBufHolder(data) diff --git a/archive/src/main/kotlin/org/openrs2/archive/cache/nxt/Js5ResponseDecoder.kt b/archive/src/main/kotlin/org/openrs2/archive/cache/nxt/Js5ResponseDecoder.kt new file mode 100644 index 00000000..3a882cb4 --- /dev/null +++ b/archive/src/main/kotlin/org/openrs2/archive/cache/nxt/Js5ResponseDecoder.kt @@ -0,0 +1,121 @@ +package org.openrs2.archive.cache.nxt + +import io.netty.buffer.ByteBuf +import io.netty.channel.ChannelHandlerContext +import io.netty.handler.codec.ByteToMessageDecoder +import io.netty.handler.codec.DecoderException +import kotlin.math.min + +public class Js5ResponseDecoder : ByteToMessageDecoder() { + private data class Request(val prefetch: Boolean, val archive: Int, val group: Int) + + private enum class State { + READ_HEADER, + READ_LEN, + READ_DATA + } + + private var state = State.READ_HEADER + private val buffers = mutableMapOf() + private var request: Request? = null + + override fun decode(ctx: ChannelHandlerContext, input: ByteBuf, out: MutableList) { + if (state == State.READ_HEADER) { + if (input.readableBytes() < 5) { + return + } + + val prefetch: Boolean + val archive = input.readUnsignedByte().toInt() + var group = input.readInt() + + if (group and 0x80000000.toInt() != 0) { + prefetch = true + group = group and 0x7FFFFFFF + } else { + prefetch = false + } + + request = Request(prefetch, archive, group) + + if (buffers.containsKey(request)) { + state = State.READ_DATA + } else { + state = State.READ_LEN + } + } + + if (state == State.READ_LEN) { + if (input.readableBytes() < 5) { + return + } + + val type = input.readUnsignedByte().toInt() + + val len = input.readInt() + if (len < 0) { + throw DecoderException("Length is negative: $len") + } + + val totalLen = if (type == 0) { + len + 5 + } else { + len + 9 + } + + if (totalLen < 0) { + throw DecoderException("Total length exceeds maximum ByteBuf size") + } + + val data = ctx.alloc().buffer(totalLen, totalLen) + data.writeByte(type) + data.writeInt(len) + + buffers[request!!] = data + + state = State.READ_DATA + } + + if (state == State.READ_DATA) { + val data = buffers[request!!]!! + + var blockLen = if (data.writerIndex() == 5) { + 102400 - 10 + } else { + 102400 - 5 + } + + blockLen = min(blockLen, data.writableBytes()) + + if (input.readableBytes() < blockLen) { + return + } + + data.writeBytes(input, blockLen) + + if (!data.isWritable) { + out += Js5Response(request!!.prefetch, request!!.archive, request!!.group, data) + buffers.remove(request!!) + request = null + } + + state = State.READ_HEADER + } + } + + override fun channelInactive(ctx: ChannelHandlerContext) { + super.channelInactive(ctx) + reset() + } + + override fun handlerRemoved0(ctx: ChannelHandlerContext?) { + reset() + } + + private fun reset() { + buffers.values.forEach(ByteBuf::release) + buffers.clear() + + state = State.READ_HEADER + } +} diff --git a/archive/src/main/kotlin/org/openrs2/archive/cache/nxt/LoginResponse.kt b/archive/src/main/kotlin/org/openrs2/archive/cache/nxt/LoginResponse.kt new file mode 100644 index 00000000..56afe0d7 --- /dev/null +++ b/archive/src/main/kotlin/org/openrs2/archive/cache/nxt/LoginResponse.kt @@ -0,0 +1,13 @@ +package org.openrs2.archive.cache.nxt + +import org.openrs2.protocol.Packet + +public sealed class LoginResponse : Packet { + public data class Js5Ok(val loadingRequirements: List) : LoginResponse() { + public companion object { + public const val LOADING_REQUIREMENTS: Int = 31 + } + } + + public object ClientOutOfDate : LoginResponse() +} diff --git a/archive/src/main/kotlin/org/openrs2/archive/cache/nxt/MusicStreamClient.kt b/archive/src/main/kotlin/org/openrs2/archive/cache/nxt/MusicStreamClient.kt new file mode 100644 index 00000000..234d9fb0 --- /dev/null +++ b/archive/src/main/kotlin/org/openrs2/archive/cache/nxt/MusicStreamClient.kt @@ -0,0 +1,30 @@ +package org.openrs2.archive.cache.nxt + +import io.netty.buffer.ByteBuf +import kotlinx.coroutines.future.await +import org.openrs2.buffer.ByteBufBodyHandler +import org.openrs2.buffer.use +import org.openrs2.http.checkStatusCode +import java.net.URI +import java.net.http.HttpClient +import java.net.http.HttpRequest + +public class MusicStreamClient( + private val client: HttpClient, + private val byteBufBodyHandler: ByteBufBodyHandler, + private val origin: String +) { + public suspend fun request(archive: Int, group: Int, version: Int, checksum: Int, build: Int): ByteBuf { + val uri = URI("$origin/ms?m=0&a=$archive&k=$build&g=$group&c=$checksum&v=$version") + + val request = HttpRequest.newBuilder(uri) + .GET() + .build() + + val response = client.sendAsync(request, byteBufBodyHandler).await() + response.body().use { buf -> + response.checkStatusCode() + return buf.retain() + } + } +} diff --git a/archive/src/main/kotlin/org/openrs2/archive/game/Game.kt b/archive/src/main/kotlin/org/openrs2/archive/game/Game.kt index 38ffc8fc..4e1f0cdb 100644 --- a/archive/src/main/kotlin/org/openrs2/archive/game/Game.kt +++ b/archive/src/main/kotlin/org/openrs2/archive/game/Game.kt @@ -1,8 +1,12 @@ package org.openrs2.archive.game +import org.bouncycastle.crypto.params.RSAKeyParameters + public data class Game( public val id: Int, public val url: String?, - public val build: Int?, - public val lastMasterIndexId: Int? + public val buildMajor: Int?, + public val buildMinor: Int?, + public val lastMasterIndexId: Int?, + public val key: RSAKeyParameters? ) diff --git a/archive/src/main/kotlin/org/openrs2/archive/game/GameDatabase.kt b/archive/src/main/kotlin/org/openrs2/archive/game/GameDatabase.kt index 80c2686b..85feaf48 100644 --- a/archive/src/main/kotlin/org/openrs2/archive/game/GameDatabase.kt +++ b/archive/src/main/kotlin/org/openrs2/archive/game/GameDatabase.kt @@ -1,6 +1,8 @@ package org.openrs2.archive.game +import org.openrs2.crypto.Rsa import org.openrs2.db.Database +import java.io.StringReader import javax.inject.Inject import javax.inject.Singleton @@ -12,7 +14,7 @@ public class GameDatabase @Inject constructor( return database.execute { connection -> connection.prepareStatement( """ - SELECT id, url, build, last_master_index_id + SELECT id, url, build_major, build_minor, last_master_index_id, key FROM games WHERE name = ? """.trimIndent() @@ -27,17 +29,31 @@ public class GameDatabase @Inject constructor( val id = rows.getInt(1) val url: String? = rows.getString(2) - var build: Int? = rows.getInt(3) + var buildMajor: Int? = rows.getInt(3) if (rows.wasNull()) { - build = null + buildMajor = null } - var lastMasterIndexId: Int? = rows.getInt(4) + var buildMinor: Int? = rows.getInt(4) + if (rows.wasNull()) { + buildMinor = null + } + + var lastMasterIndexId: Int? = rows.getInt(5) if (rows.wasNull()) { lastMasterIndexId = null } - return@execute Game(id, url, build, lastMasterIndexId) + val pem = rows.getString(6) + val key = if (rows.wasNull()) { + null + } else { + StringReader(pem).use { reader -> + Rsa.readPublicKey(reader) + } + } + + return@execute Game(id, url, buildMajor, buildMinor, lastMasterIndexId, key) } } } diff --git a/archive/src/main/resources/org/openrs2/archive/migrations/V3__nxt.sql b/archive/src/main/resources/org/openrs2/archive/migrations/V3__nxt.sql new file mode 100644 index 00000000..f58a2d26 --- /dev/null +++ b/archive/src/main/resources/org/openrs2/archive/migrations/V3__nxt.sql @@ -0,0 +1,49 @@ +-- @formatter:off +CREATE TYPE build AS ( + major INTEGER, + minor INTEGER +); + +ALTER TABLE games + ADD COLUMN key TEXT NULL, + ADD COLUMN build_minor INTEGER NULL; + +ALTER TABLE games + RENAME COLUMN build TO build_major; + +ALTER TABLE sources + ADD COLUMN build_minor INTEGER NULL; + +ALTER TABLE sources + RENAME COLUMN build TO build_major; + +DROP INDEX sources_master_index_id_game_id_build_idx; + +CREATE UNIQUE INDEX ON sources (master_index_id, game_id, build_major) + WHERE type = 'js5remote' AND build_minor IS NULL; + +CREATE UNIQUE INDEX ON sources (master_index_id, game_id, build_major, build_minor) + WHERE type = 'js5remote' AND build_minor IS NOT NULL; + +UPDATE games +SET + url = 'https://www.runescape.com/k=5/l=0/jav_config.ws?binaryType=2', + build_major = 919, + build_minor = 1, + key = $$-----BEGIN PUBLIC KEY----- +MIICIjANBgkqhkiG9w0BAQEFAAOCAg8AMIICCgKCAgEAnnP2Sqv7uMM3rjmsLTQ3 +z4yt8/4j9MDS/2+/9KEkfnH2K/toJbyBUCMHvfS7SBvPiLXWaArNvIArEz/e5Cr3 +dk2mcSzmoVcsE1dJq/2eDIqRzhH9WB6zDz+5DO6ysRYap1VdMa4bKXMkM+e7V0c3 +9xiMEpjpeSs0cHGxTGlxLBGTFHYG1IZPLkDRJzhKD58Lu8bn2e3KCTuzzvZFf2AF +FZauENC6OswdfAZutlWdWkVOZsD9IB/ALNaY4W35PABZbsfT/ar85S/foXFwHJ+B +OHuF6BR5dYETUQ5Oasl0GUEaVUM9POv7KRv6cW7HWUQHYfQdApjdH+dORHtk4kMG +QAmk/VpTwWBkZWqDbglZBIkd5G7gs8JpluiUh11eRMC/xj99iZp4nt/FOoSNw2NO +GMTUPkHIySC4FQHNSxzbfCW5rQdSRw5+eyuo8MA6mg0LZH3jQuNnnYBg1hJTsdBp +0IrjOQWsfTiX+xZ6lUfRhFtGISuKchpGDZfmOtrZPJDvUgNy0z8w41V6NyiU/h7X +2TKYFQG1/c4Kr4BxT4tPl85nVbMulonfk/AD5l6BflEuHlChpkAhv14j6xRzGHWx +4pdpbHSzDkg/HBR5ka0D7Ua7W6uL3VFVCPAygPERZK1lpYE+m+k92H+i/K7gIV1M +1E07p8x5X9i0oDbZ0lxv8I8CAwEAAQ== +-----END PUBLIC KEY----- +$$ +WHERE name = 'runescape'; +-- @formatter:on diff --git a/cache/src/main/kotlin/org/openrs2/cache/Js5MasterIndex.kt b/cache/src/main/kotlin/org/openrs2/cache/Js5MasterIndex.kt index da5838f6..885021a7 100644 --- a/cache/src/main/kotlin/org/openrs2/cache/Js5MasterIndex.kt +++ b/cache/src/main/kotlin/org/openrs2/cache/Js5MasterIndex.kt @@ -180,6 +180,19 @@ public data class Js5MasterIndex( } public fun read(buf: ByteBuf, format: MasterIndexFormat, key: RSAKeyParameters? = null): Js5MasterIndex { + return read(buf, format, key, true) + } + + public fun readUnverified(buf: ByteBuf, format: MasterIndexFormat): Js5MasterIndex { + return read(buf, format, null, false) + } + + private fun read( + buf: ByteBuf, + format: MasterIndexFormat, + key: RSAKeyParameters?, + verify: Boolean + ): Js5MasterIndex { val index = Js5MasterIndex(format) val start = buf.readerIndex() @@ -233,24 +246,26 @@ public data class Js5MasterIndex( index.entries += Entry(version, checksum, groups, totalUncompressedLength, digest) } - val end = buf.readerIndex() + if (verify) { + val end = buf.readerIndex() - if (format >= MasterIndexFormat.DIGESTS) { - val ciphertext = buf.readSlice(buf.readableBytes()) - decrypt(ciphertext, key).use { plaintext -> - require(plaintext.readableBytes() == SIGNATURE_LENGTH) { - "Invalid signature length" - } + if (format >= MasterIndexFormat.DIGESTS) { + val ciphertext = buf.readSlice(buf.readableBytes()) + decrypt(ciphertext, key).use { plaintext -> + require(plaintext.readableBytes() == SIGNATURE_LENGTH) { + "Invalid signature length" + } - // the client doesn't verify what I presume is the RSA magic byte - plaintext.skipBytes(1) + // the client doesn't verify what I presume is the RSA magic byte + plaintext.skipBytes(1) - val expected = ByteArray(Whirlpool.DIGESTBYTES) - plaintext.readBytes(expected) + val expected = ByteArray(Whirlpool.DIGESTBYTES) + plaintext.readBytes(expected) - val actual = buf.whirlpool(start, end - start) - require(expected.contentEquals(actual)) { - "Invalid signature" + val actual = buf.whirlpool(start, end - start) + require(expected.contentEquals(actual)) { + "Invalid signature" + } } } } diff --git a/cache/src/test/kotlin/org/openrs2/cache/Js5MasterIndexTest.kt b/cache/src/test/kotlin/org/openrs2/cache/Js5MasterIndexTest.kt index 706f1553..08a1c8c1 100644 --- a/cache/src/test/kotlin/org/openrs2/cache/Js5MasterIndexTest.kt +++ b/cache/src/test/kotlin/org/openrs2/cache/Js5MasterIndexTest.kt @@ -206,8 +206,11 @@ class Js5MasterIndexTest { @Test fun testReadWhirlpool() { Unpooled.wrappedBuffer(encodedWhirlpool).use { buf -> - val index = Js5MasterIndex.read(buf, MasterIndexFormat.DIGESTS) + val index = Js5MasterIndex.read(buf.slice(), MasterIndexFormat.DIGESTS) assertEquals(decodedWhirlpool, index) + + val indexUnverified = Js5MasterIndex.readUnverified(buf, MasterIndexFormat.DIGESTS) + assertEquals(decodedWhirlpool, indexUnverified) } } @@ -218,8 +221,11 @@ class Js5MasterIndexTest { buf.setByte(lastIndex, buf.getByte(lastIndex).toInt().inv()) assertFailsWith { - Js5MasterIndex.read(buf, MasterIndexFormat.DIGESTS) + Js5MasterIndex.read(buf.slice(), MasterIndexFormat.DIGESTS) } + + val index = Js5MasterIndex.readUnverified(buf, MasterIndexFormat.DIGESTS) + assertEquals(decodedWhirlpool, index) } } @@ -227,8 +233,11 @@ class Js5MasterIndexTest { fun testReadWhirlpoolInvalidSignatureLength() { Unpooled.wrappedBuffer(encodedWhirlpool, 0, encodedWhirlpool.size - 1).use { buf -> assertFailsWith { - Js5MasterIndex.read(buf, MasterIndexFormat.DIGESTS) + Js5MasterIndex.read(buf.slice(), MasterIndexFormat.DIGESTS) } + + val index = Js5MasterIndex.readUnverified(buf, MasterIndexFormat.DIGESTS) + assertEquals(decodedWhirlpool, index) } } @@ -257,8 +266,11 @@ class Js5MasterIndexTest { @Test fun testReadSigned() { Unpooled.wrappedBuffer(encodedSigned).use { buf -> - val index = Js5MasterIndex.read(buf, MasterIndexFormat.DIGESTS, PUBLIC_KEY) + val index = Js5MasterIndex.read(buf.slice(), MasterIndexFormat.DIGESTS, PUBLIC_KEY) assertEquals(decodedWhirlpool, index) + + val indexUnverified = Js5MasterIndex.readUnverified(buf, MasterIndexFormat.DIGESTS) + assertEquals(decodedWhirlpool, indexUnverified) } } @@ -269,8 +281,11 @@ class Js5MasterIndexTest { buf.setByte(lastIndex, buf.getByte(lastIndex).toInt().inv()) assertFailsWith { - Js5MasterIndex.read(buf, MasterIndexFormat.DIGESTS, PUBLIC_KEY) + Js5MasterIndex.read(buf.slice(), MasterIndexFormat.DIGESTS, PUBLIC_KEY) } + + val index = Js5MasterIndex.readUnverified(buf, MasterIndexFormat.DIGESTS) + assertEquals(decodedWhirlpool, index) } } @@ -278,8 +293,11 @@ class Js5MasterIndexTest { fun testReadSignedInvalidSignatureLength() { Unpooled.wrappedBuffer(encodedSigned, 0, encodedSigned.size - 1).use { buf -> assertFailsWith { - Js5MasterIndex.read(buf, MasterIndexFormat.DIGESTS, PUBLIC_KEY) + Js5MasterIndex.read(buf.slice(), MasterIndexFormat.DIGESTS, PUBLIC_KEY) } + + val index = Js5MasterIndex.readUnverified(buf, MasterIndexFormat.DIGESTS) + assertEquals(decodedWhirlpool, index) } }