From 06ff7a9d3b36bf2d426375206ec55ef72d1033ce Mon Sep 17 00:00:00 2001 From: Graham Date: Thu, 11 Feb 2021 23:24:19 +0000 Subject: [PATCH] Download all groups that changed between the previous and current cache This makes us behave like a standard client that only keeps a single copy of each group in its cache. This ensures we can at least detect (crc32, version) collisions for a particular group, rather than silently skipping colliding cached groups. A disadvantage is that more bandwidth usage is required, especially if the download is interrupted. Signed-off-by: Graham --- .../openrs2/archive/cache/CacheDownloader.kt | 11 +++- .../openrs2/archive/cache/CacheImporter.kt | 65 +++++++++++++++---- .../archive/cache/Js5ChannelHandler.kt | 15 ++++- .../kotlin/org/openrs2/archive/game/Game.kt | 3 +- .../org/openrs2/archive/game/GameDatabase.kt | 9 ++- .../org/openrs2/archive/V1__init.sql | 2 + 6 files changed, 85 insertions(+), 20 deletions(-) diff --git a/archive/src/main/kotlin/org/openrs2/archive/cache/CacheDownloader.kt b/archive/src/main/kotlin/org/openrs2/archive/cache/CacheDownloader.kt index 1b16c21f..1fb83808 100644 --- a/archive/src/main/kotlin/org/openrs2/archive/cache/CacheDownloader.kt +++ b/archive/src/main/kotlin/org/openrs2/archive/cache/CacheDownloader.kt @@ -24,7 +24,16 @@ public class CacheDownloader @Inject constructor( try { suspendCoroutine { continuation -> val bootstrap = bootstrapFactory.createBootstrap(group) - val handler = Js5ChannelHandler(bootstrap, game.id, hostname, port, build, continuation, importer) + val handler = Js5ChannelHandler( + bootstrap, + game.id, + hostname, + port, + build, + game.previousMasterIndexId, + continuation, + importer + ) bootstrap.handler(Js5ChannelInitializer(handler)) .connect(hostname, port) diff --git a/archive/src/main/kotlin/org/openrs2/archive/cache/CacheImporter.kt b/archive/src/main/kotlin/org/openrs2/archive/cache/CacheImporter.kt index 0734bf2a..a379bedc 100644 --- a/archive/src/main/kotlin/org/openrs2/archive/cache/CacheImporter.kt +++ b/archive/src/main/kotlin/org/openrs2/archive/cache/CacheImporter.kt @@ -169,9 +169,10 @@ public class CacheImporter @Inject constructor( uncompressed: ByteBuf, gameId: Int, build: Int, + previousId: Int?, timestamp: Instant, name: String, - ): List { + ): Pair> { return database.execute { connection -> prepare(connection) @@ -199,17 +200,28 @@ public class CacheImporter @Inject constructor( overwrite = true ) + /* + * In order to defend against (crc32, version) collisions, we only + * use a cached index if its checksum/version haven't changed + * between the previously downloaded version of the cache and the + * current version. This emulates the behaviour of a client always + * using the latest version of the cache - so if there is a + * collision, real players of the game would experience problems. + */ connection.prepareStatement( """ SELECT c.data FROM master_index_archives a - LEFT JOIN containers c ON c.crc32 = a.crc32 - LEFT JOIN indexes i ON i.version = a.version AND i.container_id = c.id + LEFT JOIN master_index_archives a2 ON a2.master_index_id = ? AND a2.archive_id = a.archive_id AND + a2.crc32 = a.crc32 AND a2.version = a.version + LEFT JOIN containers c ON c.crc32 = a2.crc32 + LEFT JOIN indexes i ON i.version = a2.version AND i.container_id = c.id WHERE a.master_index_id = ? ORDER BY a.archive_id ASC """.trimIndent() ).use { stmt -> - stmt.setInt(1, id) + stmt.setObject(1, previousId, Types.INTEGER) + stmt.setInt(2, id) stmt.executeQuery().use { rows -> val indexes = mutableListOf() @@ -224,7 +236,7 @@ public class CacheImporter @Inject constructor( } indexes.filterNotNull().forEach(ByteBuf::retain) - return@execute indexes + return@execute Pair(id, indexes) } finally { indexes.filterNotNull().forEach(ByteBuf::release) } @@ -237,7 +249,8 @@ public class CacheImporter @Inject constructor( archive: Int, index: Js5Index, buf: ByteBuf, - uncompressed: ByteBuf + uncompressed: ByteBuf, + previousMasterIndexId: Int? ): List { return database.execute { connection -> prepare(connection) @@ -273,22 +286,33 @@ public class CacheImporter @Inject constructor( } /* - * We deliberately ignore groups with truncated versions here and - * re-download them, just in case there's a (crc32, truncated version) - * collision. + * In order to defend against (crc32, version) collisions, we only + * use a cached group if its checksum/version haven't changed + * between the previously downloaded version of the cache and the + * current version. This emulates the behaviour of a client always + * using the latest version of the cache - so if there is a + * collision, real players of the game would experience problems. + * + * We never use cached groups with a truncated version, as these + * are even more likely to be prone to collisions. */ connection.prepareStatement( """ SELECT t.group_id FROM tmp_groups t - LEFT JOIN groups g ON g.archive_id = ? AND g.group_id = t.group_id AND g.version = t.version AND - NOT g.version_truncated - LEFT JOIN containers c ON c.id = g.container_id AND c.crc32 = t.crc32 + LEFT JOIN master_index_valid_indexes i ON i.master_index_id = ? AND + i.archive_id = ? + LEFT JOIN index_groups ig ON ig.container_id = i.container_id AND ig.group_id = t.group_id AND + ig.crc32 = t.crc32 AND ig.version = t.version + LEFT JOIN groups g ON g.archive_id = i.archive_id AND g.group_id = ig.group_id AND + g.version = ig.version AND NOT g.version_truncated + LEFT JOIN containers c ON c.id = g.container_id AND c.crc32 = ig.crc32 WHERE g.container_id IS NULL ORDER BY t.group_id ASC """.trimIndent() ).use { stmt -> - stmt.setInt(1, archive) + stmt.setObject(1, previousMasterIndexId, Types.INTEGER) + stmt.setInt(2, archive) stmt.executeQuery().use { rows -> val groups = mutableListOf() @@ -770,6 +794,21 @@ public class CacheImporter @Inject constructor( } } + public suspend fun setMasterIndexId(gameId: Int, masterIndexId: Int) { + database.execute { connection -> + connection.prepareStatement( + """ + UPDATE games SET master_index_id = ? WHERE id = ? + """.trimIndent() + ).use { stmt -> + stmt.setInt(1, masterIndexId) + stmt.setInt(2, gameId) + + stmt.execute() + } + } + } + public companion object { public const val BATCH_SIZE: Int = 1024 } diff --git a/archive/src/main/kotlin/org/openrs2/archive/cache/Js5ChannelHandler.kt b/archive/src/main/kotlin/org/openrs2/archive/cache/Js5ChannelHandler.kt index d0b547d9..fcbe5ded 100644 --- a/archive/src/main/kotlin/org/openrs2/archive/cache/Js5ChannelHandler.kt +++ b/archive/src/main/kotlin/org/openrs2/archive/cache/Js5ChannelHandler.kt @@ -35,6 +35,7 @@ public class Js5ChannelHandler( private val hostname: String, private val port: Int, private var build: Int, + private val previousMasterIndexId: Int?, private val continuation: Continuation, private val importer: CacheImporter, private val masterIndexFormat: MasterIndexFormat = MasterIndexFormat.VERSIONED, @@ -44,6 +45,7 @@ public class Js5ChannelHandler( private val maxBuild = build + maxBuildAttempts private val inFlightRequests = mutableSetOf() private val pendingRequests = ArrayDeque() + private var currentMasterIndexId: Int = 0 private var masterIndex: Js5MasterIndex? = null private lateinit var indexes: Array private val groups = mutableListOf() @@ -155,6 +157,10 @@ public class Js5ChannelHandler( } if (complete) { + runBlocking { + importer.setMasterIndexId(gameId, currentMasterIndexId) + } + ctx.close() continuation.resume(Unit) } @@ -164,14 +170,15 @@ public class Js5ChannelHandler( Js5Compression.uncompress(buf.slice()).use { uncompressed -> masterIndex = Js5MasterIndex.read(uncompressed.slice(), masterIndexFormat) - val rawIndexes = runBlocking { - val name = "Downloaded from $hostname:$port" + val name = "Downloaded from $hostname:$port" + val (id, rawIndexes) = runBlocking { importer.importMasterIndexAndGetIndexes( masterIndex!!, buf, uncompressed, gameId, build, + previousMasterIndexId, timestamp = Instant.now(), name ) @@ -189,6 +196,8 @@ public class Js5ChannelHandler( } finally { rawIndexes.filterNotNull().forEach(ByteBuf::release) } + + currentMasterIndexId = id } } @@ -207,7 +216,7 @@ public class Js5ChannelHandler( } val groups = runBlocking { - importer.importIndexAndGetMissingGroups(archive, index, buf, uncompressed) + importer.importIndexAndGetMissingGroups(archive, index, buf, uncompressed, previousMasterIndexId) } for (group in groups) { request(archive, group) diff --git a/archive/src/main/kotlin/org/openrs2/archive/game/Game.kt b/archive/src/main/kotlin/org/openrs2/archive/game/Game.kt index f4b71a8c..16e3177d 100644 --- a/archive/src/main/kotlin/org/openrs2/archive/game/Game.kt +++ b/archive/src/main/kotlin/org/openrs2/archive/game/Game.kt @@ -4,5 +4,6 @@ public data class Game( public val id: Int, public val hostname: String?, public val port: Int?, - public val build: Int? + public val build: Int?, + public val previousMasterIndexId: Int? ) diff --git a/archive/src/main/kotlin/org/openrs2/archive/game/GameDatabase.kt b/archive/src/main/kotlin/org/openrs2/archive/game/GameDatabase.kt index ef6ac886..9380febd 100644 --- a/archive/src/main/kotlin/org/openrs2/archive/game/GameDatabase.kt +++ b/archive/src/main/kotlin/org/openrs2/archive/game/GameDatabase.kt @@ -12,7 +12,7 @@ public class GameDatabase @Inject constructor( return database.execute { connection -> connection.prepareStatement( """ - SELECT id, hostname, port, build + SELECT id, hostname, port, build, master_index_id FROM games WHERE name = ? """.trimIndent() @@ -37,7 +37,12 @@ public class GameDatabase @Inject constructor( build = null } - return@execute Game(id, hostname, port, build) + var masterIndexId: Int? = rows.getInt(5) + if (rows.wasNull()) { + masterIndexId = null + } + + return@execute Game(id, hostname, port, build, masterIndexId) } } } diff --git a/archive/src/main/resources/org/openrs2/archive/V1__init.sql b/archive/src/main/resources/org/openrs2/archive/V1__init.sql index 72d4df80..de7a3010 100644 --- a/archive/src/main/resources/org/openrs2/archive/V1__init.sql +++ b/archive/src/main/resources/org/openrs2/archive/V1__init.sql @@ -108,6 +108,8 @@ CREATE TABLE master_indexes ( UNIQUE (container_id, format) ); +ALTER TABLE games ADD COLUMN master_index_id INT NULL REFERENCES master_indexes (id); + CREATE TABLE master_index_archives ( master_index_id INTEGER NOT NULL REFERENCES master_indexes (id), archive_id uint1 NOT NULL,