Add NXT cache downloader

I'm still not particularly happy with this: if the JS5 download
finishes before HTTP, it'll time out and kill the whole process.
Similarly, because it takes so long to import the indexes and as we
can't fetch groups in parallel with that, it can often time out early
during the process.

In the long term, I think I am going to try and move most of the logic
outside of the Netty threads and communicate between threads with queues
or channels. This would also allow us to run multiple JS5 clients in
parallel.

The code also needs some tidying up, particularly constants in the
Js5ChannelHandler constructors.

Signed-off-by: Graham <gpe@openrs2.org>
Graham 4 years ago
parent 0e1046d457
commit 6f02ab2f65
  1. 74
      archive/src/main/kotlin/org/openrs2/archive/cache/CacheDownloader.kt
  2. 112
      archive/src/main/kotlin/org/openrs2/archive/cache/CacheExporter.kt
  3. 69
      archive/src/main/kotlin/org/openrs2/archive/cache/CacheImporter.kt
  4. 5
      archive/src/main/kotlin/org/openrs2/archive/cache/ImportCommand.kt
  5. 5
      archive/src/main/kotlin/org/openrs2/archive/cache/ImportMasterIndexCommand.kt
  6. 142
      archive/src/main/kotlin/org/openrs2/archive/cache/Js5ChannelHandler.kt
  7. 158
      archive/src/main/kotlin/org/openrs2/archive/cache/NxtJs5ChannelHandler.kt
  8. 22
      archive/src/main/kotlin/org/openrs2/archive/cache/NxtJs5ChannelInitializer.kt
  9. 77
      archive/src/main/kotlin/org/openrs2/archive/cache/OsrsJs5ChannelHandler.kt
  10. 6
      archive/src/main/kotlin/org/openrs2/archive/cache/OsrsJs5ChannelInitializer.kt
  11. 8
      archive/src/main/kotlin/org/openrs2/archive/cache/nxt/ClientOutOfDateCodec.kt
  12. 10
      archive/src/main/kotlin/org/openrs2/archive/cache/nxt/InitJs5RemoteConnection.kt
  13. 29
      archive/src/main/kotlin/org/openrs2/archive/cache/nxt/InitJs5RemoteConnectionCodec.kt
  14. 25
      archive/src/main/kotlin/org/openrs2/archive/cache/nxt/Js5OkCodec.kt
  15. 14
      archive/src/main/kotlin/org/openrs2/archive/cache/nxt/Js5Request.kt
  16. 36
      archive/src/main/kotlin/org/openrs2/archive/cache/nxt/Js5RequestEncoder.kt
  17. 11
      archive/src/main/kotlin/org/openrs2/archive/cache/nxt/Js5Response.kt
  18. 121
      archive/src/main/kotlin/org/openrs2/archive/cache/nxt/Js5ResponseDecoder.kt
  19. 13
      archive/src/main/kotlin/org/openrs2/archive/cache/nxt/LoginResponse.kt
  20. 30
      archive/src/main/kotlin/org/openrs2/archive/cache/nxt/MusicStreamClient.kt
  21. 8
      archive/src/main/kotlin/org/openrs2/archive/game/Game.kt
  22. 26
      archive/src/main/kotlin/org/openrs2/archive/game/GameDatabase.kt
  23. 49
      archive/src/main/resources/org/openrs2/archive/migrations/V3__nxt.sql
  24. 43
      cache/src/main/kotlin/org/openrs2/cache/Js5MasterIndex.kt
  25. 30
      cache/src/test/kotlin/org/openrs2/cache/Js5MasterIndexTest.kt

@ -1,7 +1,9 @@
package org.openrs2.archive.cache package org.openrs2.archive.cache
import org.openrs2.archive.cache.nxt.MusicStreamClient
import org.openrs2.archive.game.GameDatabase import org.openrs2.archive.game.GameDatabase
import org.openrs2.archive.jav.JavConfig import org.openrs2.archive.jav.JavConfig
import org.openrs2.buffer.ByteBufBodyHandler
import org.openrs2.net.BootstrapFactory import org.openrs2.net.BootstrapFactory
import org.openrs2.net.awaitSuspend import org.openrs2.net.awaitSuspend
import java.net.URI import java.net.URI
@ -13,6 +15,7 @@ import kotlin.coroutines.suspendCoroutine
@Singleton @Singleton
public class CacheDownloader @Inject constructor( public class CacheDownloader @Inject constructor(
private val client: HttpClient, private val client: HttpClient,
private val byteBufBodyHandler: ByteBufBodyHandler,
private val bootstrapFactory: BootstrapFactory, private val bootstrapFactory: BootstrapFactory,
private val gameDatabase: GameDatabase, private val gameDatabase: GameDatabase,
private val importer: CacheImporter private val importer: CacheImporter
@ -21,28 +24,67 @@ public class CacheDownloader @Inject constructor(
val game = gameDatabase.getGame(gameName) ?: throw Exception("Game not found") val game = gameDatabase.getGame(gameName) ?: throw Exception("Game not found")
val url = game.url ?: throw Exception("URL not set") val url = game.url ?: throw Exception("URL not set")
val build = game.build ?: throw Exception("Current build not set") val buildMajor = game.buildMajor ?: throw Exception("Current major build not set")
val config = JavConfig.download(client, url) val config = JavConfig.download(client, url)
val codebase = config.config[CODEBASE] ?: throw Exception("Codebase missing")
val hostname = URI(codebase).host ?: throw Exception("Hostname missing")
val group = bootstrapFactory.createEventLoopGroup() val group = bootstrapFactory.createEventLoopGroup()
try { try {
suspendCoroutine<Unit> { continuation -> suspendCoroutine<Unit> { continuation ->
val bootstrap = bootstrapFactory.createBootstrap(group) val bootstrap = bootstrapFactory.createBootstrap(group)
val handler = Js5ChannelHandler(
bootstrap, val hostname: String
game.id,
hostname, val initializer = when (gameName) {
PORT, "oldschool" -> {
build, val codebase = config.config[CODEBASE] ?: throw Exception("Codebase missing")
game.lastMasterIndexId, hostname = URI(codebase).host ?: throw Exception("Hostname missing")
continuation,
importer OsrsJs5ChannelInitializer(
) OsrsJs5ChannelHandler(
bootstrap,
bootstrap.handler(Js5ChannelInitializer(handler)) game.id,
hostname,
PORT,
buildMajor,
game.lastMasterIndexId,
continuation,
importer,
game.key
)
)
}
"runescape" -> {
val buildMinor = game.buildMinor ?: throw Exception("Current minor build not set")
val tokens = config.params.values.filter { TOKEN_REGEX.matches(it) }
val token = tokens.singleOrNull() ?: throw Exception("Multiple candidate tokens: $tokens")
hostname = NXT_HOSTNAME
val musicStreamClient = MusicStreamClient(client, byteBufBodyHandler, "http://$hostname")
NxtJs5ChannelInitializer(
NxtJs5ChannelHandler(
bootstrap,
game.id,
hostname,
PORT,
buildMajor,
buildMinor,
game.lastMasterIndexId,
continuation,
importer,
game.key,
token,
musicStreamClient
)
)
}
else -> throw UnsupportedOperationException()
}
bootstrap.handler(initializer)
.connect(hostname, PORT) .connect(hostname, PORT)
} }
} finally { } finally {
@ -52,6 +94,8 @@ public class CacheDownloader @Inject constructor(
private companion object { private companion object {
private const val CODEBASE = "codebase" private const val CODEBASE = "codebase"
private const val NXT_HOSTNAME = "content.runescape.com"
private const val PORT = 443 private const val PORT = 443
private val TOKEN_REGEX = Regex("[A-Za-z0-9*-]{32}")
} }
} }

@ -11,6 +11,7 @@ import org.openrs2.cache.MasterIndexFormat
import org.openrs2.cache.Store import org.openrs2.cache.Store
import org.openrs2.crypto.XteaKey import org.openrs2.crypto.XteaKey
import org.openrs2.db.Database import org.openrs2.db.Database
import org.postgresql.util.PGobject
import java.time.Instant import java.time.Instant
import java.util.SortedSet import java.util.SortedSet
import javax.inject.Inject import javax.inject.Inject
@ -52,10 +53,43 @@ public class CacheExporter @Inject constructor(
} }
} }
public data class Build(val major: Int, val minor: Int?) : Comparable<Build> {
override fun compareTo(other: Build): Int {
return compareValuesBy(this, other, Build::major, Build::minor)
}
override fun toString(): String {
return if (minor != null) {
"$major.$minor"
} else {
major.toString()
}
}
internal companion object {
internal fun fromPgObject(o: PGobject): Build? {
val value = o.value!!
require(value.length >= 2)
val parts = value.substring(1, value.length - 1).split(",")
require(parts.size == 2)
val major = parts[0]
val minor = parts[1]
if (major.isEmpty()) {
return null
}
return Build(major.toInt(), if (minor.isEmpty()) null else minor.toInt())
}
}
}
public data class CacheSummary( public data class CacheSummary(
val id: Int, val id: Int,
val game: String, val game: String,
val builds: SortedSet<Int>, val builds: SortedSet<Build>,
val timestamp: Instant?, val timestamp: Instant?,
val names: SortedSet<String>, val names: SortedSet<String>,
val stats: Stats? val stats: Stats?
@ -70,7 +104,7 @@ public class CacheExporter @Inject constructor(
public data class Source( public data class Source(
val game: String, val game: String,
val build: Int?, val build: Build?,
val timestamp: Instant?, val timestamp: Instant?,
val name: String?, val name: String?,
val description: String?, val description: String?,
@ -90,26 +124,29 @@ public class CacheExporter @Inject constructor(
return database.execute { connection -> return database.execute { connection ->
connection.prepareStatement( connection.prepareStatement(
""" """
SELECT SELECT *
m.id, FROM (
g.name, SELECT
array_remove(array_agg(DISTINCT s.build ORDER BY s.build ASC), NULL), m.id,
MIN(s.timestamp), g.name,
array_remove(array_agg(DISTINCT s.name ORDER BY s.name ASC), NULL), array_remove(array_agg(DISTINCT ROW(s.build_major, s.build_minor)::build ORDER BY ROW(s.build_major, s.build_minor)::build ASC), NULL) builds,
ms.valid_indexes, MIN(s.timestamp) AS timestamp,
ms.indexes, array_remove(array_agg(DISTINCT s.name ORDER BY s.name ASC), NULL) sources,
ms.valid_groups, ms.valid_indexes,
ms.groups, ms.indexes,
ms.valid_keys, ms.valid_groups,
ms.keys, ms.groups,
ms.size ms.valid_keys,
FROM master_indexes m ms.keys,
JOIN sources s ON s.master_index_id = m.id ms.size
JOIN games g ON g.id = s.game_id FROM master_indexes m
LEFT JOIN master_index_stats ms ON ms.master_index_id = m.id JOIN sources s ON s.master_index_id = m.id
GROUP BY m.id, g.name, ms.valid_indexes, ms.indexes, ms.valid_groups, ms.groups, ms.valid_keys, ms.keys, JOIN games g ON g.id = s.game_id
ms.size LEFT JOIN master_index_stats ms ON ms.master_index_id = m.id
ORDER BY g.name ASC, MIN(s.build) ASC, MIN(s.timestamp) ASC GROUP BY m.id, g.name, ms.valid_indexes, ms.indexes, ms.valid_groups, ms.groups, ms.valid_keys, ms.keys,
ms.size
) t
ORDER BY t.name ASC, t.builds[1] ASC, t.timestamp ASC
""".trimIndent() """.trimIndent()
).use { stmt -> ).use { stmt ->
stmt.executeQuery().use { rows -> stmt.executeQuery().use { rows ->
@ -118,7 +155,7 @@ public class CacheExporter @Inject constructor(
while (rows.next()) { while (rows.next()) {
val id = rows.getInt(1) val id = rows.getInt(1)
val game = rows.getString(2) val game = rows.getString(2)
val builds = rows.getArray(3).array as Array<Int> val builds = rows.getArray(3).array as Array<Any>
val timestamp = rows.getTimestamp(4)?.toInstant() val timestamp = rows.getTimestamp(4)?.toInstant()
val names = rows.getArray(5).array as Array<String> val names = rows.getArray(5).array as Array<String>
@ -138,7 +175,7 @@ public class CacheExporter @Inject constructor(
caches += CacheSummary( caches += CacheSummary(
id, id,
game, game,
builds.toSortedSet(), builds.mapNotNull { o -> Build.fromPgObject(o as PGobject) }.toSortedSet(),
timestamp, timestamp,
names.toSortedSet(), names.toSortedSet(),
stats stats
@ -185,7 +222,7 @@ public class CacheExporter @Inject constructor(
masterIndex = Unpooled.wrappedBuffer(rows.getBytes(2)).use { compressed -> masterIndex = Unpooled.wrappedBuffer(rows.getBytes(2)).use { compressed ->
Js5Compression.uncompress(compressed).use { uncompressed -> Js5Compression.uncompress(compressed).use { uncompressed ->
Js5MasterIndex.read(uncompressed, format) Js5MasterIndex.readUnverified(uncompressed, format)
} }
} }
@ -208,7 +245,7 @@ public class CacheExporter @Inject constructor(
connection.prepareStatement( connection.prepareStatement(
""" """
SELECT g.name, s.build, s.timestamp, s.name, s.description, s.url SELECT g.name, s.build_major, s.build_minor, s.timestamp, s.name, s.description, s.url
FROM sources s FROM sources s
JOIN games g ON g.id = s.game_id JOIN games g ON g.id = s.game_id
WHERE s.master_index_id = ? WHERE s.master_index_id = ?
@ -221,15 +258,26 @@ public class CacheExporter @Inject constructor(
while (rows.next()) { while (rows.next()) {
val game = rows.getString(1) val game = rows.getString(1)
var build: Int? = rows.getInt(2) var buildMajor: Int? = rows.getInt(2)
if (rows.wasNull()) { if (rows.wasNull()) {
build = null buildMajor = null
} }
val timestamp = rows.getTimestamp(3)?.toInstant() var buildMinor: Int? = rows.getInt(3)
val name = rows.getString(4) if (rows.wasNull()) {
val description = rows.getString(5) buildMinor = null
val url = rows.getString(6) }
val build = if (buildMajor != null) {
Build(buildMajor, buildMinor)
} else {
null
}
val timestamp = rows.getTimestamp(4)?.toInstant()
val name = rows.getString(5)
val description = rows.getString(6)
val url = rows.getString(7)
sources += Source(game, build, timestamp, name, description, url) sources += Source(game, build, timestamp, name, description, url)
} }

@ -88,7 +88,8 @@ public class CacheImporter @Inject constructor(
public suspend fun import( public suspend fun import(
store: Store, store: Store,
game: String, game: String,
build: Int?, buildMajor: Int?,
buildMinor: Int?,
timestamp: Instant?, timestamp: Instant?,
name: String?, name: String?,
description: String?, description: String?,
@ -117,7 +118,8 @@ public class CacheImporter @Inject constructor(
SourceType.DISK, SourceType.DISK,
masterIndexId, masterIndexId,
gameId, gameId,
build, buildMajor,
buildMinor,
timestamp, timestamp,
name, name,
description, description,
@ -182,21 +184,38 @@ public class CacheImporter @Inject constructor(
buf: ByteBuf, buf: ByteBuf,
format: MasterIndexFormat, format: MasterIndexFormat,
game: String, game: String,
build: Int?, buildMajor: Int?,
buildMinor: Int?,
timestamp: Instant?, timestamp: Instant?,
name: String?, name: String?,
description: String?, description: String?,
url: String? url: String?
) { ) {
Js5Compression.uncompress(buf.slice()).use { uncompressed -> Js5Compression.uncompress(buf.slice()).use { uncompressed ->
val masterIndex = MasterIndex(Js5MasterIndex.read(uncompressed.slice(), format), buf, uncompressed) val masterIndex = MasterIndex(
Js5MasterIndex.readUnverified(uncompressed.slice(), format),
buf,
uncompressed
)
database.execute { connection -> database.execute { connection ->
prepare(connection) prepare(connection)
val gameId = getGameId(connection, game) val gameId = getGameId(connection, game)
val masterIndexId = addMasterIndex(connection, masterIndex) val masterIndexId = addMasterIndex(connection, masterIndex)
addSource(connection, SourceType.DISK, masterIndexId, gameId, build, timestamp, name, description, url)
addSource(
connection,
SourceType.DISK,
masterIndexId,
gameId,
buildMajor,
buildMinor,
timestamp,
name,
description,
url
)
} }
} }
} }
@ -206,7 +225,8 @@ public class CacheImporter @Inject constructor(
buf: ByteBuf, buf: ByteBuf,
uncompressed: ByteBuf, uncompressed: ByteBuf,
gameId: Int, gameId: Int,
build: Int, buildMajor: Int,
buildMinor: Int?,
lastId: Int?, lastId: Int?,
timestamp: Instant timestamp: Instant
): MasterIndexResult { ): MasterIndexResult {
@ -216,12 +236,13 @@ public class CacheImporter @Inject constructor(
connection.prepareStatement( connection.prepareStatement(
""" """
UPDATE games UPDATE games
SET build = ? SET build_major = ?, build_minor = ?
WHERE id = ? WHERE id = ?
""".trimIndent() """.trimIndent()
).use { stmt -> ).use { stmt ->
stmt.setInt(1, build) stmt.setInt(1, buildMajor)
stmt.setInt(2, gameId) stmt.setObject(2, buildMinor, Types.INTEGER)
stmt.setInt(3, gameId)
stmt.execute() stmt.execute()
} }
@ -236,7 +257,8 @@ public class CacheImporter @Inject constructor(
SourceType.JS5REMOTE, SourceType.JS5REMOTE,
masterIndexId, masterIndexId,
gameId, gameId,
build, buildMajor,
buildMinor,
timestamp, timestamp,
name = "Jagex", name = "Jagex",
description = null, description = null,
@ -451,23 +473,25 @@ public class CacheImporter @Inject constructor(
type: SourceType, type: SourceType,
masterIndexId: Int, masterIndexId: Int,
gameId: Int, gameId: Int,
build: Int?, buildMajor: Int?,
buildMinor: Int?,
timestamp: Instant?, timestamp: Instant?,
name: String?, name: String?,
description: String?, description: String?,
url: String? url: String?
): Int { ): Int {
if (type == SourceType.JS5REMOTE && build != null) { if (type == SourceType.JS5REMOTE && buildMajor != null) {
connection.prepareStatement( connection.prepareStatement(
""" """
SELECT id SELECT id
FROM sources FROM sources
WHERE type = 'js5remote' AND master_index_id = ? AND game_id = ? AND build = ? WHERE type = 'js5remote' AND master_index_id = ? AND game_id = ? AND build_major = ? AND build_minor IS NOT DISTINCT FROM ?
""".trimIndent() """.trimIndent()
).use { stmt -> ).use { stmt ->
stmt.setInt(1, masterIndexId) stmt.setInt(1, masterIndexId)
stmt.setInt(2, gameId) stmt.setInt(2, gameId)
stmt.setInt(3, build) stmt.setInt(3, buildMajor)
stmt.setObject(4, buildMinor, Types.INTEGER)
stmt.executeQuery().use { rows -> stmt.executeQuery().use { rows ->
if (rows.next()) { if (rows.next()) {
@ -479,25 +503,26 @@ public class CacheImporter @Inject constructor(
connection.prepareStatement( connection.prepareStatement(
""" """
INSERT INTO sources (type, master_index_id, game_id, build, timestamp, name, description, url) INSERT INTO sources (type, master_index_id, game_id, build_major, build_minor, timestamp, name, description, url)
VALUES (?::source_type, ?, ?, ?, ?, ?, ?, ?) VALUES (?::source_type, ?, ?, ?, ?, ?, ?, ?, ?)
RETURNING id RETURNING id
""".trimIndent() """.trimIndent()
).use { stmt -> ).use { stmt ->
stmt.setString(1, type.toString().lowercase()) stmt.setString(1, type.toString().lowercase())
stmt.setInt(2, masterIndexId) stmt.setInt(2, masterIndexId)
stmt.setInt(3, gameId) stmt.setInt(3, gameId)
stmt.setObject(4, build, Types.INTEGER) stmt.setObject(4, buildMajor, Types.INTEGER)
stmt.setObject(5, buildMinor, Types.INTEGER)
if (timestamp != null) { if (timestamp != null) {
stmt.setObject(5, timestamp.atOffset(ZoneOffset.UTC), Types.TIMESTAMP_WITH_TIMEZONE) stmt.setObject(6, timestamp.atOffset(ZoneOffset.UTC), Types.TIMESTAMP_WITH_TIMEZONE)
} else { } else {
stmt.setNull(5, Types.TIMESTAMP_WITH_TIMEZONE) stmt.setNull(6, Types.TIMESTAMP_WITH_TIMEZONE)
} }
stmt.setString(6, name) stmt.setString(7, name)
stmt.setString(7, description) stmt.setString(8, description)
stmt.setString(8, url) stmt.setString(9, url)
stmt.executeQuery().use { rows -> stmt.executeQuery().use { rows ->
check(rows.next()) check(rows.next())

@ -13,7 +13,8 @@ import org.openrs2.cli.instant
import org.openrs2.inject.CloseableInjector import org.openrs2.inject.CloseableInjector
public class ImportCommand : CliktCommand(name = "import") { public class ImportCommand : CliktCommand(name = "import") {
private val build by option().int() private val buildMajor by option().int()
private val buildMinor by option().int()
private val timestamp by option().instant() private val timestamp by option().instant()
private val name by option() private val name by option()
private val description by option() private val description by option()
@ -31,7 +32,7 @@ public class ImportCommand : CliktCommand(name = "import") {
val importer = injector.getInstance(CacheImporter::class.java) val importer = injector.getInstance(CacheImporter::class.java)
Store.open(input).use { store -> Store.open(input).use { store ->
importer.import(store, game, build, timestamp, name, description, url) importer.import(store, game, buildMajor, buildMinor, timestamp, name, description, url)
} }
} }
} }

@ -17,7 +17,8 @@ import org.openrs2.inject.CloseableInjector
import java.nio.file.Files import java.nio.file.Files
public class ImportMasterIndexCommand : CliktCommand(name = "import-master-index") { public class ImportMasterIndexCommand : CliktCommand(name = "import-master-index") {
private val build by option().int() private val buildMajor by option().int()
private val buildMinor by option().int()
private val timestamp by option().instant() private val timestamp by option().instant()
private val name by option() private val name by option()
private val description by option() private val description by option()
@ -36,7 +37,7 @@ public class ImportMasterIndexCommand : CliktCommand(name = "import-master-index
val importer = injector.getInstance(CacheImporter::class.java) val importer = injector.getInstance(CacheImporter::class.java)
Unpooled.wrappedBuffer(Files.readAllBytes(input)).use { buf -> Unpooled.wrappedBuffer(Files.readAllBytes(input)).use { buf ->
importer.importMasterIndex(buf, format, game, build, timestamp, name, description, url) importer.importMasterIndex(buf, format, game, buildMajor, buildMinor, timestamp, name, description, url)
} }
} }
} }

@ -5,8 +5,10 @@ import io.netty.bootstrap.Bootstrap
import io.netty.buffer.ByteBuf import io.netty.buffer.ByteBuf
import io.netty.channel.ChannelHandler import io.netty.channel.ChannelHandler
import io.netty.channel.ChannelHandlerContext import io.netty.channel.ChannelHandlerContext
import io.netty.channel.ChannelPipeline
import io.netty.channel.SimpleChannelInboundHandler import io.netty.channel.SimpleChannelInboundHandler
import kotlinx.coroutines.runBlocking import kotlinx.coroutines.runBlocking
import org.bouncycastle.crypto.params.RSAKeyParameters
import org.openrs2.buffer.crc32 import org.openrs2.buffer.crc32
import org.openrs2.buffer.use import org.openrs2.buffer.use
import org.openrs2.cache.Js5Archive import org.openrs2.cache.Js5Archive
@ -14,34 +16,36 @@ import org.openrs2.cache.Js5Compression
import org.openrs2.cache.Js5Index import org.openrs2.cache.Js5Index
import org.openrs2.cache.Js5MasterIndex import org.openrs2.cache.Js5MasterIndex
import org.openrs2.cache.MasterIndexFormat import org.openrs2.cache.MasterIndexFormat
import org.openrs2.protocol.Rs2Decoder
import org.openrs2.protocol.Rs2Encoder
import org.openrs2.protocol.js5.Js5Request
import org.openrs2.protocol.js5.Js5RequestEncoder
import org.openrs2.protocol.js5.Js5Response
import org.openrs2.protocol.js5.Js5ResponseDecoder
import org.openrs2.protocol.js5.XorDecoder
import org.openrs2.protocol.login.LoginRequest
import org.openrs2.protocol.login.LoginResponse
import java.time.Instant import java.time.Instant
import kotlin.coroutines.Continuation import kotlin.coroutines.Continuation
import kotlin.coroutines.resume import kotlin.coroutines.resume
import kotlin.coroutines.resumeWithException import kotlin.coroutines.resumeWithException
@ChannelHandler.Sharable @ChannelHandler.Sharable
public class Js5ChannelHandler( public abstract class Js5ChannelHandler(
private val bootstrap: Bootstrap, private val bootstrap: Bootstrap,
private val gameId: Int, private val gameId: Int,
private val hostname: String, private val hostname: String,
private val port: Int, private val port: Int,
private var build: Int, protected var buildMajor: Int,
protected var buildMinor: Int?,
private val lastMasterIndexId: Int?, private val lastMasterIndexId: Int?,
private val continuation: Continuation<Unit>, private val continuation: Continuation<Unit>,
private val importer: CacheImporter, private val importer: CacheImporter,
private val masterIndexFormat: MasterIndexFormat = MasterIndexFormat.VERSIONED, private val key: RSAKeyParameters?,
private val maxInFlightRequests: Int = 200, private val masterIndexFormat: MasterIndexFormat,
maxBuildAttempts: Int = 10 private val maxInFlightRequests: Int,
private val maxBuildAttempts: Int = 10
) : SimpleChannelInboundHandler<Any>(Object::class.java) { ) : SimpleChannelInboundHandler<Any>(Object::class.java) {
protected data class InFlightRequest(val prefetch: Boolean, val archive: Int, val group: Int)
protected data class PendingRequest(
val prefetch: Boolean,
val archive: Int,
val group: Int,
val version: Int,
val checksum: Int
)
private enum class State { private enum class State {
ACTIVE, ACTIVE,
CLIENT_OUT_OF_DATE, CLIENT_OUT_OF_DATE,
@ -49,39 +53,35 @@ public class Js5ChannelHandler(
} }
private var state = State.ACTIVE private var state = State.ACTIVE
private val maxBuild = build + maxBuildAttempts private var buildAttempts = 0
private val inFlightRequests = mutableSetOf<Js5Request.Group>() private val inFlightRequests = mutableSetOf<InFlightRequest>()
private val pendingRequests = ArrayDeque<Js5Request.Group>() private val pendingRequests = ArrayDeque<PendingRequest>()
private var masterIndexId: Int = 0 private var masterIndexId: Int = 0
private var sourceId: Int = 0 private var sourceId: Int = 0
private var masterIndex: Js5MasterIndex? = null private var masterIndex: Js5MasterIndex? = null
private lateinit var indexes: Array<Js5Index?> private lateinit var indexes: Array<Js5Index?>
private val groups = mutableListOf<CacheImporter.Group>() private val groups = mutableListOf<CacheImporter.Group>()
protected abstract fun createInitMessage(): Any
protected abstract fun createRequestMessage(prefetch: Boolean, archive: Int, group: Int): Any
protected abstract fun createConnectedMessage(): Any?
protected abstract fun configurePipeline(pipeline: ChannelPipeline)
protected abstract fun incrementVersion()
override fun channelActive(ctx: ChannelHandlerContext) { override fun channelActive(ctx: ChannelHandlerContext) {
ctx.writeAndFlush(LoginRequest.InitJs5RemoteConnection(build), ctx.voidPromise()) ctx.writeAndFlush(createInitMessage(), ctx.voidPromise())
ctx.read() ctx.read()
} }
override fun channelRead0(ctx: ChannelHandlerContext, msg: Any) {
when (msg) {
is LoginResponse.Js5Ok -> handleOk(ctx)
is LoginResponse.ClientOutOfDate -> handleClientOutOfDate(ctx)
is LoginResponse -> throw Exception("Invalid response: $msg")
is Js5Response -> handleResponse(ctx, msg)
else -> throw Exception("Unknown message type: ${msg.javaClass.name}")
}
}
override fun channelReadComplete(ctx: ChannelHandlerContext) { override fun channelReadComplete(ctx: ChannelHandlerContext) {
var flush = false var flush = false
while (inFlightRequests.size < maxInFlightRequests) { while (inFlightRequests.size < maxInFlightRequests) {
val request = pendingRequests.removeFirstOrNull() ?: break val request = pendingRequests.removeFirstOrNull() ?: break
inFlightRequests += request inFlightRequests += InFlightRequest(request.prefetch, request.archive, request.group)
logger.info { "Requesting archive ${request.archive} group ${request.group}" } logger.info { "Requesting archive ${request.archive} group ${request.group}" }
ctx.write(request, ctx.voidPromise()) ctx.write(createRequestMessage(request.prefetch, request.archive, request.group), ctx.voidPromise())
flush = true flush = true
} }
@ -112,53 +112,60 @@ public class Js5ChannelHandler(
continuation.resumeWithException(cause) continuation.resumeWithException(cause)
} }
private fun handleOk(ctx: ChannelHandlerContext) { protected fun handleOk(ctx: ChannelHandlerContext) {
val pipeline = ctx.pipeline() configurePipeline(ctx.pipeline())
pipeline.remove(Rs2Encoder::class.java) val msg = createConnectedMessage()
pipeline.remove(Rs2Decoder::class.java) if (msg != null) {
pipeline.addFirst( ctx.write(msg, ctx.voidPromise())
Js5RequestEncoder, }
XorDecoder(),
Js5ResponseDecoder()
)
request(Js5Archive.ARCHIVESET, Js5Archive.ARCHIVESET) request(ctx, Js5Archive.ARCHIVESET, Js5Archive.ARCHIVESET, 0, 0)
} }
private fun handleClientOutOfDate(ctx: ChannelHandlerContext) { protected fun handleClientOutOfDate(ctx: ChannelHandlerContext) {
if (++build > maxBuild) { if (++buildAttempts > maxBuildAttempts) {
throw Exception("Failed to identify current version") throw Exception("Failed to identify current version")
} }
state = State.CLIENT_OUT_OF_DATE state = State.CLIENT_OUT_OF_DATE
incrementVersion()
ctx.close() ctx.close()
} }
private fun handleResponse(ctx: ChannelHandlerContext, response: Js5Response) { protected fun handleResponse(
val request = Js5Request.Group(response.prefetch, response.archive, response.group) ctx: ChannelHandlerContext,
prefetch: Boolean,
archive: Int,
group: Int,
data: ByteBuf
) {
val request = InFlightRequest(prefetch, archive, group)
val removed = inFlightRequests.remove(request) val removed = inFlightRequests.remove(request)
if (!removed) { if (!removed) {
val type = if (response.prefetch) { val type = if (prefetch) {
"prefetch" "prefetch"
} else { } else {
"urgent" "urgent"
} }
val archive = response.archive
val group = response.group
throw Exception("Received response for $type request (archive $archive group $group) not in-flight") throw Exception("Received response for $type request (archive $archive group $group) not in-flight")
} }
if (response.archive == Js5Archive.ARCHIVESET && response.group == Js5Archive.ARCHIVESET) { processResponse(ctx, archive, group, data)
processMasterIndex(response.data) }
} else if (response.archive == Js5Archive.ARCHIVESET) {
processIndex(response.group, response.data) protected fun processResponse(ctx: ChannelHandlerContext, archive: Int, group: Int, data: ByteBuf) {
if (archive == Js5Archive.ARCHIVESET && group == Js5Archive.ARCHIVESET) {
processMasterIndex(ctx, data)
} else if (archive == Js5Archive.ARCHIVESET) {
processIndex(ctx, group, data)
} else { } else {
processGroup(response.archive, response.group, response.data) processGroup(archive, group, data)
} }
val complete = pendingRequests.isEmpty() && inFlightRequests.isEmpty() val complete = isComplete()
if (groups.size >= CacheImporter.BATCH_SIZE || complete) { if (groups.size >= CacheImporter.BATCH_SIZE || complete) {
runBlocking { runBlocking {
@ -179,9 +186,13 @@ public class Js5ChannelHandler(
} }
} }
private fun processMasterIndex(buf: ByteBuf) { protected open fun isComplete(): Boolean {
return pendingRequests.isEmpty() && inFlightRequests.isEmpty()
}
private fun processMasterIndex(ctx: ChannelHandlerContext, buf: ByteBuf) {
Js5Compression.uncompress(buf.slice()).use { uncompressed -> Js5Compression.uncompress(buf.slice()).use { uncompressed ->
masterIndex = Js5MasterIndex.read(uncompressed.slice(), masterIndexFormat) masterIndex = Js5MasterIndex.read(uncompressed.slice(), masterIndexFormat, key)
val (masterIndexId, sourceId, rawIndexes) = runBlocking { val (masterIndexId, sourceId, rawIndexes) = runBlocking {
importer.importMasterIndexAndGetIndexes( importer.importMasterIndexAndGetIndexes(
@ -189,7 +200,8 @@ public class Js5ChannelHandler(
buf, buf,
uncompressed, uncompressed,
gameId, gameId,
build, buildMajor,
buildMinor,
lastMasterIndexId, lastMasterIndexId,
timestamp = Instant.now() timestamp = Instant.now()
) )
@ -202,10 +214,15 @@ public class Js5ChannelHandler(
indexes = arrayOfNulls(rawIndexes.size) indexes = arrayOfNulls(rawIndexes.size)
for ((archive, index) in rawIndexes.withIndex()) { for ((archive, index) in rawIndexes.withIndex()) {
val entry = masterIndex!!.entries[archive]
if (entry.version == 0 && entry.checksum == 0) {
continue
}
if (index != null) { if (index != null) {
processIndex(archive, index) processIndex(ctx, archive, index)
} else { } else {
request(Js5Archive.ARCHIVESET, archive) request(ctx, Js5Archive.ARCHIVESET, archive, entry.version, entry.checksum)
} }
} }
} finally { } finally {
@ -214,7 +231,7 @@ public class Js5ChannelHandler(
} }
} }
private fun processIndex(archive: Int, buf: ByteBuf) { private fun processIndex(ctx: ChannelHandlerContext, archive: Int, buf: ByteBuf) {
val checksum = buf.crc32() val checksum = buf.crc32()
val entry = masterIndex!!.entries[archive] val entry = masterIndex!!.entries[archive]
if (checksum != entry.checksum) { if (checksum != entry.checksum) {
@ -233,7 +250,8 @@ public class Js5ChannelHandler(
importer.importIndexAndGetMissingGroups(sourceId, archive, index, buf, uncompressed, lastMasterIndexId) importer.importIndexAndGetMissingGroups(sourceId, archive, index, buf, uncompressed, lastMasterIndexId)
} }
for (group in groups) { for (group in groups) {
request(archive, group) val groupEntry = index[group]!!
request(ctx, archive, group, groupEntry.version, groupEntry.checksum)
} }
} }
} }
@ -257,8 +275,8 @@ public class Js5ChannelHandler(
) )
} }
private fun request(archive: Int, group: Int) { protected open fun request(ctx: ChannelHandlerContext, archive: Int, group: Int, version: Int, checksum: Int) {
pendingRequests += Js5Request.Group(false, archive, group) pendingRequests += PendingRequest(false, archive, group, version, checksum)
} }
private fun releaseGroups() { private fun releaseGroups() {

@ -0,0 +1,158 @@
package org.openrs2.archive.cache
import com.github.michaelbull.logging.InlineLogger
import io.netty.bootstrap.Bootstrap
import io.netty.channel.ChannelHandlerContext
import io.netty.channel.ChannelPipeline
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.asCoroutineDispatcher
import kotlinx.coroutines.cancel
import kotlinx.coroutines.launch
import org.bouncycastle.crypto.params.RSAKeyParameters
import org.openrs2.archive.cache.nxt.InitJs5RemoteConnection
import org.openrs2.archive.cache.nxt.Js5Request
import org.openrs2.archive.cache.nxt.Js5RequestEncoder
import org.openrs2.archive.cache.nxt.Js5Response
import org.openrs2.archive.cache.nxt.Js5ResponseDecoder
import org.openrs2.archive.cache.nxt.LoginResponse
import org.openrs2.archive.cache.nxt.MusicStreamClient
import org.openrs2.buffer.use
import org.openrs2.cache.MasterIndexFormat
import org.openrs2.protocol.Rs2Decoder
import org.openrs2.protocol.Rs2Encoder
import org.openrs2.protocol.js5.XorDecoder
import kotlin.coroutines.Continuation
public class NxtJs5ChannelHandler(
bootstrap: Bootstrap,
gameId: Int,
hostname: String,
port: Int,
buildMajor: Int,
buildMinor: Int,
lastMasterIndexId: Int?,
continuation: Continuation<Unit>,
importer: CacheImporter,
key: RSAKeyParameters?,
private val token: String,
private val musicStreamClient: MusicStreamClient,
private val maxMinorBuildAttempts: Int = 5
) : Js5ChannelHandler(
bootstrap,
gameId,
hostname,
port,
buildMajor,
buildMinor,
lastMasterIndexId,
continuation,
importer,
key,
MasterIndexFormat.LENGTHS,
maxInFlightRequests = 500
) {
private data class MusicRequest(val archive: Int, val group: Int, val version: Int, val checksum: Int)
private var inFlightRequests = 0
private val pendingRequests = ArrayDeque<MusicRequest>()
private var scope: CoroutineScope? = null
private var minorBuildAttempts = 0
override fun createInitMessage(): Any {
return InitJs5RemoteConnection(buildMajor, buildMinor!!, token, 0)
}
override fun createRequestMessage(prefetch: Boolean, archive: Int, group: Int): Any {
return Js5Request.Group(prefetch, archive, group, buildMajor)
}
override fun createConnectedMessage(): Any? {
return Js5Request.Connected(buildMajor)
}
override fun configurePipeline(pipeline: ChannelPipeline) {
pipeline.addBefore("handler", null, Js5RequestEncoder)
pipeline.addBefore("handler", null, XorDecoder())
pipeline.addBefore("handler", null, Js5ResponseDecoder())
pipeline.remove(Rs2Encoder::class.java)
pipeline.remove(Rs2Decoder::class.java)
}
override fun incrementVersion() {
buildMinor = buildMinor!! + 1
if (++minorBuildAttempts >= maxMinorBuildAttempts) {
buildMajor++
buildMinor = 1
}
}
override fun channelActive(ctx: ChannelHandlerContext) {
super.channelActive(ctx)
scope = CoroutineScope(ctx.channel().eventLoop().asCoroutineDispatcher())
}
override fun channelInactive(ctx: ChannelHandlerContext) {
super.channelInactive(ctx)
scope!!.cancel()
}
override fun channelRead0(ctx: ChannelHandlerContext, msg: Any) {
when (msg) {
is LoginResponse.Js5Ok -> handleOk(ctx)
is LoginResponse.ClientOutOfDate -> handleClientOutOfDate(ctx)
is LoginResponse -> throw Exception("Invalid response: $msg")
is Js5Response -> handleResponse(ctx, msg.prefetch, msg.archive, msg.group, msg.data)
else -> throw Exception("Unknown message type: ${msg.javaClass.name}")
}
}
override fun channelReadComplete(ctx: ChannelHandlerContext) {
super.channelReadComplete(ctx)
while (inFlightRequests < 6) {
val request = pendingRequests.removeFirstOrNull() ?: break
inFlightRequests++
logger.info { "Requesting archive ${request.archive} group ${request.group}" }
scope!!.launch {
val archive = request.archive
val group = request.group
val version = request.version
val checksum = request.checksum
musicStreamClient.request(archive, group, version, checksum, buildMajor).use { buf ->
inFlightRequests--
processResponse(ctx, archive, group, buf)
/*
* Inject a fake channelReadComplete event to ensure we
* don't time out and to send any new music requests.
*/
ctx.channel().pipeline().fireChannelReadComplete()
}
}
}
}
override fun isComplete(): Boolean {
return super.isComplete() && pendingRequests.isEmpty() && inFlightRequests == 0
}
override fun request(ctx: ChannelHandlerContext, archive: Int, group: Int, version: Int, checksum: Int) {
if (archive == MUSIC_ARCHIVE) {
pendingRequests += MusicRequest(archive, group, version, checksum)
} else {
super.request(ctx, archive, group, version, checksum)
}
}
private companion object {
private val logger = InlineLogger()
private const val MUSIC_ARCHIVE = 40
}
}

@ -0,0 +1,22 @@
package org.openrs2.archive.cache
import io.netty.channel.Channel
import io.netty.channel.ChannelInitializer
import io.netty.handler.timeout.ReadTimeoutHandler
import org.openrs2.archive.cache.nxt.ClientOutOfDateCodec
import org.openrs2.archive.cache.nxt.InitJs5RemoteConnectionCodec
import org.openrs2.archive.cache.nxt.Js5OkCodec
import org.openrs2.protocol.Protocol
import org.openrs2.protocol.Rs2Decoder
import org.openrs2.protocol.Rs2Encoder
public class NxtJs5ChannelInitializer(private val handler: NxtJs5ChannelHandler) : ChannelInitializer<Channel>() {
override fun initChannel(ch: Channel) {
ch.pipeline().addLast(
ReadTimeoutHandler(30),
Rs2Encoder(Protocol(InitJs5RemoteConnectionCodec)),
Rs2Decoder(Protocol(Js5OkCodec, ClientOutOfDateCodec))
)
ch.pipeline().addLast("handler", handler)
}
}

@ -0,0 +1,77 @@
package org.openrs2.archive.cache
import io.netty.bootstrap.Bootstrap
import io.netty.channel.ChannelHandlerContext
import io.netty.channel.ChannelPipeline
import org.bouncycastle.crypto.params.RSAKeyParameters
import org.openrs2.cache.MasterIndexFormat
import org.openrs2.protocol.Rs2Decoder
import org.openrs2.protocol.Rs2Encoder
import org.openrs2.protocol.js5.Js5Request
import org.openrs2.protocol.js5.Js5RequestEncoder
import org.openrs2.protocol.js5.Js5Response
import org.openrs2.protocol.js5.Js5ResponseDecoder
import org.openrs2.protocol.js5.XorDecoder
import org.openrs2.protocol.login.LoginRequest
import org.openrs2.protocol.login.LoginResponse
import kotlin.coroutines.Continuation
public class OsrsJs5ChannelHandler(
bootstrap: Bootstrap,
gameId: Int,
hostname: String,
port: Int,
build: Int,
lastMasterIndexId: Int?,
continuation: Continuation<Unit>,
importer: CacheImporter,
key: RSAKeyParameters?
) : Js5ChannelHandler(
bootstrap,
gameId,
hostname,
port,
build,
null,
lastMasterIndexId,
continuation,
importer,
key,
MasterIndexFormat.VERSIONED,
maxInFlightRequests = 200
) {
override fun createInitMessage(): Any {
return LoginRequest.InitJs5RemoteConnection(buildMajor)
}
override fun createRequestMessage(prefetch: Boolean, archive: Int, group: Int): Any {
return Js5Request.Group(prefetch, archive, group)
}
override fun createConnectedMessage(): Any? {
return null
}
override fun configurePipeline(pipeline: ChannelPipeline) {
pipeline.addBefore("handler", null, Js5RequestEncoder)
pipeline.addBefore("handler", null, XorDecoder())
pipeline.addBefore("handler", null, Js5ResponseDecoder())
pipeline.remove(Rs2Encoder::class.java)
pipeline.remove(Rs2Decoder::class.java)
}
override fun incrementVersion() {
buildMajor++
}
override fun channelRead0(ctx: ChannelHandlerContext, msg: Any) {
when (msg) {
is LoginResponse.Js5Ok -> handleOk(ctx)
is LoginResponse.ClientOutOfDate -> handleClientOutOfDate(ctx)
is LoginResponse -> throw Exception("Invalid response: $msg")
is Js5Response -> handleResponse(ctx, msg.prefetch, msg.archive, msg.group, msg.data)
else -> throw Exception("Unknown message type: ${msg.javaClass.name}")
}
}
}

@ -7,13 +7,13 @@ import org.openrs2.protocol.Protocol
import org.openrs2.protocol.Rs2Decoder import org.openrs2.protocol.Rs2Decoder
import org.openrs2.protocol.Rs2Encoder import org.openrs2.protocol.Rs2Encoder
public class Js5ChannelInitializer(private val handler: Js5ChannelHandler) : ChannelInitializer<Channel>() { public class OsrsJs5ChannelInitializer(private val handler: OsrsJs5ChannelHandler) : ChannelInitializer<Channel>() {
override fun initChannel(ch: Channel) { override fun initChannel(ch: Channel) {
ch.pipeline().addLast( ch.pipeline().addLast(
ReadTimeoutHandler(30), ReadTimeoutHandler(30),
Rs2Encoder(Protocol.LOGIN_UPSTREAM), Rs2Encoder(Protocol.LOGIN_UPSTREAM),
Rs2Decoder(Protocol.LOGIN_DOWNSTREAM), Rs2Decoder(Protocol.LOGIN_DOWNSTREAM)
handler
) )
ch.pipeline().addLast("handler", handler)
} }
} }

@ -0,0 +1,8 @@
package org.openrs2.archive.cache.nxt
import org.openrs2.protocol.EmptyPacketCodec
public object ClientOutOfDateCodec : EmptyPacketCodec<LoginResponse.ClientOutOfDate>(
opcode = 6,
packet = LoginResponse.ClientOutOfDate
)

@ -0,0 +1,10 @@
package org.openrs2.archive.cache.nxt
import org.openrs2.protocol.Packet
public data class InitJs5RemoteConnection(
public val buildMajor: Int,
public val buildMinor: Int,
public val token: String,
public val language: Int
) : Packet

@ -0,0 +1,29 @@
package org.openrs2.archive.cache.nxt
import io.netty.buffer.ByteBuf
import org.openrs2.buffer.readString
import org.openrs2.buffer.writeString
import org.openrs2.crypto.StreamCipher
import org.openrs2.protocol.PacketCodec
import org.openrs2.protocol.PacketLength
public object InitJs5RemoteConnectionCodec : PacketCodec<InitJs5RemoteConnection>(
length = PacketLength.VARIABLE_BYTE,
opcode = 15,
type = InitJs5RemoteConnection::class.java
) {
override fun decode(input: ByteBuf, cipher: StreamCipher): InitJs5RemoteConnection {
val buildMajor = input.readInt()
val buildMinor = input.readInt()
val token = input.readString()
val language = input.readUnsignedByte().toInt()
return InitJs5RemoteConnection(buildMajor, buildMinor, token, language)
}
override fun encode(input: InitJs5RemoteConnection, output: ByteBuf, cipher: StreamCipher) {
output.writeInt(input.buildMajor)
output.writeInt(input.buildMinor)
output.writeString(input.token)
output.writeByte(input.language)
}
}

@ -0,0 +1,25 @@
package org.openrs2.archive.cache.nxt
import io.netty.buffer.ByteBuf
import org.openrs2.crypto.StreamCipher
import org.openrs2.protocol.PacketCodec
public object Js5OkCodec : PacketCodec<LoginResponse.Js5Ok>(
opcode = 0,
length = LoginResponse.Js5Ok.LOADING_REQUIREMENTS * 4,
type = LoginResponse.Js5Ok::class.java
) {
override fun decode(input: ByteBuf, cipher: StreamCipher): LoginResponse.Js5Ok {
val loadingRequirements = mutableListOf<Int>()
for (i in 0 until LoginResponse.Js5Ok.LOADING_REQUIREMENTS) {
loadingRequirements += input.readInt()
}
return LoginResponse.Js5Ok(loadingRequirements)
}
override fun encode(input: LoginResponse.Js5Ok, output: ByteBuf, cipher: StreamCipher) {
for (requirement in input.loadingRequirements) {
output.writeInt(requirement)
}
}
}

@ -0,0 +1,14 @@
package org.openrs2.archive.cache.nxt
public sealed class Js5Request {
public data class Group(
public val prefetch: Boolean,
public val archive: Int,
public val group: Int,
public val build: Int
) : Js5Request()
public data class Connected(
public val build: Int
) : Js5Request()
}

@ -0,0 +1,36 @@
package org.openrs2.archive.cache.nxt
import io.netty.buffer.ByteBuf
import io.netty.channel.ChannelHandler
import io.netty.channel.ChannelHandlerContext
import io.netty.handler.codec.MessageToByteEncoder
@ChannelHandler.Sharable
public object Js5RequestEncoder : MessageToByteEncoder<Js5Request>(Js5Request::class.java) {
override fun encode(ctx: ChannelHandlerContext, msg: Js5Request, out: ByteBuf) {
when (msg) {
is Js5Request.Group -> {
out.writeByte(if (msg.prefetch) 32 else 33)
out.writeByte(msg.archive)
out.writeInt(msg.group)
out.writeShort(msg.build)
out.writeShort(0)
}
is Js5Request.Connected -> {
out.writeByte(6)
out.writeMedium(5)
out.writeShort(0)
out.writeShort(msg.build)
out.writeShort(0)
}
}
}
override fun allocateBuffer(ctx: ChannelHandlerContext, msg: Js5Request, preferDirect: Boolean): ByteBuf {
return if (preferDirect) {
ctx.alloc().ioBuffer(10, 10)
} else {
ctx.alloc().heapBuffer(10, 10)
}
}
}

@ -0,0 +1,11 @@
package org.openrs2.archive.cache.nxt
import io.netty.buffer.ByteBuf
import io.netty.buffer.DefaultByteBufHolder
public data class Js5Response(
public val prefetch: Boolean,
public val archive: Int,
public val group: Int,
public val data: ByteBuf
) : DefaultByteBufHolder(data)

@ -0,0 +1,121 @@
package org.openrs2.archive.cache.nxt
import io.netty.buffer.ByteBuf
import io.netty.channel.ChannelHandlerContext
import io.netty.handler.codec.ByteToMessageDecoder
import io.netty.handler.codec.DecoderException
import kotlin.math.min
public class Js5ResponseDecoder : ByteToMessageDecoder() {
private data class Request(val prefetch: Boolean, val archive: Int, val group: Int)
private enum class State {
READ_HEADER,
READ_LEN,
READ_DATA
}
private var state = State.READ_HEADER
private val buffers = mutableMapOf<Request, ByteBuf>()
private var request: Request? = null
override fun decode(ctx: ChannelHandlerContext, input: ByteBuf, out: MutableList<Any>) {
if (state == State.READ_HEADER) {
if (input.readableBytes() < 5) {
return
}
val prefetch: Boolean
val archive = input.readUnsignedByte().toInt()
var group = input.readInt()
if (group and 0x80000000.toInt() != 0) {
prefetch = true
group = group and 0x7FFFFFFF
} else {
prefetch = false
}
request = Request(prefetch, archive, group)
if (buffers.containsKey(request)) {
state = State.READ_DATA
} else {
state = State.READ_LEN
}
}
if (state == State.READ_LEN) {
if (input.readableBytes() < 5) {
return
}
val type = input.readUnsignedByte().toInt()
val len = input.readInt()
if (len < 0) {
throw DecoderException("Length is negative: $len")
}
val totalLen = if (type == 0) {
len + 5
} else {
len + 9
}
if (totalLen < 0) {
throw DecoderException("Total length exceeds maximum ByteBuf size")
}
val data = ctx.alloc().buffer(totalLen, totalLen)
data.writeByte(type)
data.writeInt(len)
buffers[request!!] = data
state = State.READ_DATA
}
if (state == State.READ_DATA) {
val data = buffers[request!!]!!
var blockLen = if (data.writerIndex() == 5) {
102400 - 10
} else {
102400 - 5
}
blockLen = min(blockLen, data.writableBytes())
if (input.readableBytes() < blockLen) {
return
}
data.writeBytes(input, blockLen)
if (!data.isWritable) {
out += Js5Response(request!!.prefetch, request!!.archive, request!!.group, data)
buffers.remove(request!!)
request = null
}
state = State.READ_HEADER
}
}
override fun channelInactive(ctx: ChannelHandlerContext) {
super.channelInactive(ctx)
reset()
}
override fun handlerRemoved0(ctx: ChannelHandlerContext?) {
reset()
}
private fun reset() {
buffers.values.forEach(ByteBuf::release)
buffers.clear()
state = State.READ_HEADER
}
}

@ -0,0 +1,13 @@
package org.openrs2.archive.cache.nxt
import org.openrs2.protocol.Packet
public sealed class LoginResponse : Packet {
public data class Js5Ok(val loadingRequirements: List<Int>) : LoginResponse() {
public companion object {
public const val LOADING_REQUIREMENTS: Int = 31
}
}
public object ClientOutOfDate : LoginResponse()
}

@ -0,0 +1,30 @@
package org.openrs2.archive.cache.nxt
import io.netty.buffer.ByteBuf
import kotlinx.coroutines.future.await
import org.openrs2.buffer.ByteBufBodyHandler
import org.openrs2.buffer.use
import org.openrs2.http.checkStatusCode
import java.net.URI
import java.net.http.HttpClient
import java.net.http.HttpRequest
public class MusicStreamClient(
private val client: HttpClient,
private val byteBufBodyHandler: ByteBufBodyHandler,
private val origin: String
) {
public suspend fun request(archive: Int, group: Int, version: Int, checksum: Int, build: Int): ByteBuf {
val uri = URI("$origin/ms?m=0&a=$archive&k=$build&g=$group&c=$checksum&v=$version")
val request = HttpRequest.newBuilder(uri)
.GET()
.build()
val response = client.sendAsync(request, byteBufBodyHandler).await()
response.body().use { buf ->
response.checkStatusCode()
return buf.retain()
}
}
}

@ -1,8 +1,12 @@
package org.openrs2.archive.game package org.openrs2.archive.game
import org.bouncycastle.crypto.params.RSAKeyParameters
public data class Game( public data class Game(
public val id: Int, public val id: Int,
public val url: String?, public val url: String?,
public val build: Int?, public val buildMajor: Int?,
public val lastMasterIndexId: Int? public val buildMinor: Int?,
public val lastMasterIndexId: Int?,
public val key: RSAKeyParameters?
) )

@ -1,6 +1,8 @@
package org.openrs2.archive.game package org.openrs2.archive.game
import org.openrs2.crypto.Rsa
import org.openrs2.db.Database import org.openrs2.db.Database
import java.io.StringReader
import javax.inject.Inject import javax.inject.Inject
import javax.inject.Singleton import javax.inject.Singleton
@ -12,7 +14,7 @@ public class GameDatabase @Inject constructor(
return database.execute { connection -> return database.execute { connection ->
connection.prepareStatement( connection.prepareStatement(
""" """
SELECT id, url, build, last_master_index_id SELECT id, url, build_major, build_minor, last_master_index_id, key
FROM games FROM games
WHERE name = ? WHERE name = ?
""".trimIndent() """.trimIndent()
@ -27,17 +29,31 @@ public class GameDatabase @Inject constructor(
val id = rows.getInt(1) val id = rows.getInt(1)
val url: String? = rows.getString(2) val url: String? = rows.getString(2)
var build: Int? = rows.getInt(3) var buildMajor: Int? = rows.getInt(3)
if (rows.wasNull()) { if (rows.wasNull()) {
build = null buildMajor = null
} }
var lastMasterIndexId: Int? = rows.getInt(4) var buildMinor: Int? = rows.getInt(4)
if (rows.wasNull()) {
buildMinor = null
}
var lastMasterIndexId: Int? = rows.getInt(5)
if (rows.wasNull()) { if (rows.wasNull()) {
lastMasterIndexId = null lastMasterIndexId = null
} }
return@execute Game(id, url, build, lastMasterIndexId) val pem = rows.getString(6)
val key = if (rows.wasNull()) {
null
} else {
StringReader(pem).use { reader ->
Rsa.readPublicKey(reader)
}
}
return@execute Game(id, url, buildMajor, buildMinor, lastMasterIndexId, key)
} }
} }
} }

@ -0,0 +1,49 @@
-- @formatter:off
CREATE TYPE build AS (
major INTEGER,
minor INTEGER
);
ALTER TABLE games
ADD COLUMN key TEXT NULL,
ADD COLUMN build_minor INTEGER NULL;
ALTER TABLE games
RENAME COLUMN build TO build_major;
ALTER TABLE sources
ADD COLUMN build_minor INTEGER NULL;
ALTER TABLE sources
RENAME COLUMN build TO build_major;
DROP INDEX sources_master_index_id_game_id_build_idx;
CREATE UNIQUE INDEX ON sources (master_index_id, game_id, build_major)
WHERE type = 'js5remote' AND build_minor IS NULL;
CREATE UNIQUE INDEX ON sources (master_index_id, game_id, build_major, build_minor)
WHERE type = 'js5remote' AND build_minor IS NOT NULL;
UPDATE games
SET
url = 'https://www.runescape.com/k=5/l=0/jav_config.ws?binaryType=2',
build_major = 919,
build_minor = 1,
key = $$-----BEGIN PUBLIC KEY-----
MIICIjANBgkqhkiG9w0BAQEFAAOCAg8AMIICCgKCAgEAnnP2Sqv7uMM3rjmsLTQ3
z4yt8/4j9MDS/2+/9KEkfnH2K/toJbyBUCMHvfS7SBvPiLXWaArNvIArEz/e5Cr3
dk2mcSzmoVcsE1dJq/2eDIqRzhH9WB6zDz+5DO6ysRYap1VdMa4bKXMkM+e7V0c3
9xiMEpjpeSs0cHGxTGlxLBGTFHYG1IZPLkDRJzhKD58Lu8bn2e3KCTuzzvZFf2AF
FZauENC6OswdfAZutlWdWkVOZsD9IB/ALNaY4W35PABZbsfT/ar85S/foXFwHJ+B
OHuF6BR5dYETUQ5Oasl0GUEaVUM9POv7KRv6cW7HWUQHYfQdApjdH+dORHtk4kMG
QAmk/VpTwWBkZWqDbglZBIkd5G7gs8JpluiUh11eRMC/xj99iZp4nt/FOoSNw2NO
GMTUPkHIySC4FQHNSxzbfCW5rQdSRw5+eyuo8MA6mg0LZH3jQuNnnYBg1hJTsdBp
0IrjOQWsfTiX+xZ6lUfRhFtGISuKchpGDZfmOtrZPJDvUgNy0z8w41V6NyiU/h7X
2TKYFQG1/c4Kr4BxT4tPl85nVbMulonfk/AD5l6BflEuHlChpkAhv14j6xRzGHWx
4pdpbHSzDkg/HBR5ka0D7Ua7W6uL3VFVCPAygPERZK1lpYE+m+k92H+i/K7gIV1M
1E07p8x5X9i0oDbZ0lxv8I8CAwEAAQ==
-----END PUBLIC KEY-----
$$
WHERE name = 'runescape';
-- @formatter:on

@ -180,6 +180,19 @@ public data class Js5MasterIndex(
} }
public fun read(buf: ByteBuf, format: MasterIndexFormat, key: RSAKeyParameters? = null): Js5MasterIndex { public fun read(buf: ByteBuf, format: MasterIndexFormat, key: RSAKeyParameters? = null): Js5MasterIndex {
return read(buf, format, key, true)
}
public fun readUnverified(buf: ByteBuf, format: MasterIndexFormat): Js5MasterIndex {
return read(buf, format, null, false)
}
private fun read(
buf: ByteBuf,
format: MasterIndexFormat,
key: RSAKeyParameters?,
verify: Boolean
): Js5MasterIndex {
val index = Js5MasterIndex(format) val index = Js5MasterIndex(format)
val start = buf.readerIndex() val start = buf.readerIndex()
@ -233,24 +246,26 @@ public data class Js5MasterIndex(
index.entries += Entry(version, checksum, groups, totalUncompressedLength, digest) index.entries += Entry(version, checksum, groups, totalUncompressedLength, digest)
} }
val end = buf.readerIndex() if (verify) {
val end = buf.readerIndex()
if (format >= MasterIndexFormat.DIGESTS) { if (format >= MasterIndexFormat.DIGESTS) {
val ciphertext = buf.readSlice(buf.readableBytes()) val ciphertext = buf.readSlice(buf.readableBytes())
decrypt(ciphertext, key).use { plaintext -> decrypt(ciphertext, key).use { plaintext ->
require(plaintext.readableBytes() == SIGNATURE_LENGTH) { require(plaintext.readableBytes() == SIGNATURE_LENGTH) {
"Invalid signature length" "Invalid signature length"
} }
// the client doesn't verify what I presume is the RSA magic byte // the client doesn't verify what I presume is the RSA magic byte
plaintext.skipBytes(1) plaintext.skipBytes(1)
val expected = ByteArray(Whirlpool.DIGESTBYTES) val expected = ByteArray(Whirlpool.DIGESTBYTES)
plaintext.readBytes(expected) plaintext.readBytes(expected)
val actual = buf.whirlpool(start, end - start) val actual = buf.whirlpool(start, end - start)
require(expected.contentEquals(actual)) { require(expected.contentEquals(actual)) {
"Invalid signature" "Invalid signature"
}
} }
} }
} }

@ -206,8 +206,11 @@ class Js5MasterIndexTest {
@Test @Test
fun testReadWhirlpool() { fun testReadWhirlpool() {
Unpooled.wrappedBuffer(encodedWhirlpool).use { buf -> Unpooled.wrappedBuffer(encodedWhirlpool).use { buf ->
val index = Js5MasterIndex.read(buf, MasterIndexFormat.DIGESTS) val index = Js5MasterIndex.read(buf.slice(), MasterIndexFormat.DIGESTS)
assertEquals(decodedWhirlpool, index) assertEquals(decodedWhirlpool, index)
val indexUnverified = Js5MasterIndex.readUnverified(buf, MasterIndexFormat.DIGESTS)
assertEquals(decodedWhirlpool, indexUnverified)
} }
} }
@ -218,8 +221,11 @@ class Js5MasterIndexTest {
buf.setByte(lastIndex, buf.getByte(lastIndex).toInt().inv()) buf.setByte(lastIndex, buf.getByte(lastIndex).toInt().inv())
assertFailsWith<IllegalArgumentException> { assertFailsWith<IllegalArgumentException> {
Js5MasterIndex.read(buf, MasterIndexFormat.DIGESTS) Js5MasterIndex.read(buf.slice(), MasterIndexFormat.DIGESTS)
} }
val index = Js5MasterIndex.readUnverified(buf, MasterIndexFormat.DIGESTS)
assertEquals(decodedWhirlpool, index)
} }
} }
@ -227,8 +233,11 @@ class Js5MasterIndexTest {
fun testReadWhirlpoolInvalidSignatureLength() { fun testReadWhirlpoolInvalidSignatureLength() {
Unpooled.wrappedBuffer(encodedWhirlpool, 0, encodedWhirlpool.size - 1).use { buf -> Unpooled.wrappedBuffer(encodedWhirlpool, 0, encodedWhirlpool.size - 1).use { buf ->
assertFailsWith<IllegalArgumentException> { assertFailsWith<IllegalArgumentException> {
Js5MasterIndex.read(buf, MasterIndexFormat.DIGESTS) Js5MasterIndex.read(buf.slice(), MasterIndexFormat.DIGESTS)
} }
val index = Js5MasterIndex.readUnverified(buf, MasterIndexFormat.DIGESTS)
assertEquals(decodedWhirlpool, index)
} }
} }
@ -257,8 +266,11 @@ class Js5MasterIndexTest {
@Test @Test
fun testReadSigned() { fun testReadSigned() {
Unpooled.wrappedBuffer(encodedSigned).use { buf -> Unpooled.wrappedBuffer(encodedSigned).use { buf ->
val index = Js5MasterIndex.read(buf, MasterIndexFormat.DIGESTS, PUBLIC_KEY) val index = Js5MasterIndex.read(buf.slice(), MasterIndexFormat.DIGESTS, PUBLIC_KEY)
assertEquals(decodedWhirlpool, index) assertEquals(decodedWhirlpool, index)
val indexUnverified = Js5MasterIndex.readUnverified(buf, MasterIndexFormat.DIGESTS)
assertEquals(decodedWhirlpool, indexUnverified)
} }
} }
@ -269,8 +281,11 @@ class Js5MasterIndexTest {
buf.setByte(lastIndex, buf.getByte(lastIndex).toInt().inv()) buf.setByte(lastIndex, buf.getByte(lastIndex).toInt().inv())
assertFailsWith<IllegalArgumentException> { assertFailsWith<IllegalArgumentException> {
Js5MasterIndex.read(buf, MasterIndexFormat.DIGESTS, PUBLIC_KEY) Js5MasterIndex.read(buf.slice(), MasterIndexFormat.DIGESTS, PUBLIC_KEY)
} }
val index = Js5MasterIndex.readUnverified(buf, MasterIndexFormat.DIGESTS)
assertEquals(decodedWhirlpool, index)
} }
} }
@ -278,8 +293,11 @@ class Js5MasterIndexTest {
fun testReadSignedInvalidSignatureLength() { fun testReadSignedInvalidSignatureLength() {
Unpooled.wrappedBuffer(encodedSigned, 0, encodedSigned.size - 1).use { buf -> Unpooled.wrappedBuffer(encodedSigned, 0, encodedSigned.size - 1).use { buf ->
assertFailsWith<IllegalArgumentException> { assertFailsWith<IllegalArgumentException> {
Js5MasterIndex.read(buf, MasterIndexFormat.DIGESTS, PUBLIC_KEY) Js5MasterIndex.read(buf.slice(), MasterIndexFormat.DIGESTS, PUBLIC_KEY)
} }
val index = Js5MasterIndex.readUnverified(buf, MasterIndexFormat.DIGESTS)
assertEquals(decodedWhirlpool, index)
} }
} }

Loading…
Cancel
Save