Download all groups that changed between the previous and current cache

This makes us behave like a standard client that only keeps a single
copy of each group in its cache. This ensures we can at least detect
(crc32, version) collisions for a particular group, rather than silently
skipping colliding cached groups.

A disadvantage is that more bandwidth usage is required, especially if
the download is interrupted.

Signed-off-by: Graham <gpe@openrs2.org>
pull/132/head
Graham 3 years ago
parent 48ce47ad7b
commit 06ff7a9d3b
  1. 11
      archive/src/main/kotlin/org/openrs2/archive/cache/CacheDownloader.kt
  2. 65
      archive/src/main/kotlin/org/openrs2/archive/cache/CacheImporter.kt
  3. 15
      archive/src/main/kotlin/org/openrs2/archive/cache/Js5ChannelHandler.kt
  4. 3
      archive/src/main/kotlin/org/openrs2/archive/game/Game.kt
  5. 9
      archive/src/main/kotlin/org/openrs2/archive/game/GameDatabase.kt
  6. 2
      archive/src/main/resources/org/openrs2/archive/V1__init.sql

@ -24,7 +24,16 @@ public class CacheDownloader @Inject constructor(
try { try {
suspendCoroutine<Unit> { continuation -> suspendCoroutine<Unit> { continuation ->
val bootstrap = bootstrapFactory.createBootstrap(group) val bootstrap = bootstrapFactory.createBootstrap(group)
val handler = Js5ChannelHandler(bootstrap, game.id, hostname, port, build, continuation, importer) val handler = Js5ChannelHandler(
bootstrap,
game.id,
hostname,
port,
build,
game.previousMasterIndexId,
continuation,
importer
)
bootstrap.handler(Js5ChannelInitializer(handler)) bootstrap.handler(Js5ChannelInitializer(handler))
.connect(hostname, port) .connect(hostname, port)

@ -169,9 +169,10 @@ public class CacheImporter @Inject constructor(
uncompressed: ByteBuf, uncompressed: ByteBuf,
gameId: Int, gameId: Int,
build: Int, build: Int,
previousId: Int?,
timestamp: Instant, timestamp: Instant,
name: String, name: String,
): List<ByteBuf?> { ): Pair<Int, List<ByteBuf?>> {
return database.execute { connection -> return database.execute { connection ->
prepare(connection) prepare(connection)
@ -199,17 +200,28 @@ public class CacheImporter @Inject constructor(
overwrite = true overwrite = true
) )
/*
* In order to defend against (crc32, version) collisions, we only
* use a cached index if its checksum/version haven't changed
* between the previously downloaded version of the cache and the
* current version. This emulates the behaviour of a client always
* using the latest version of the cache - so if there is a
* collision, real players of the game would experience problems.
*/
connection.prepareStatement( connection.prepareStatement(
""" """
SELECT c.data SELECT c.data
FROM master_index_archives a FROM master_index_archives a
LEFT JOIN containers c ON c.crc32 = a.crc32 LEFT JOIN master_index_archives a2 ON a2.master_index_id = ? AND a2.archive_id = a.archive_id AND
LEFT JOIN indexes i ON i.version = a.version AND i.container_id = c.id a2.crc32 = a.crc32 AND a2.version = a.version
LEFT JOIN containers c ON c.crc32 = a2.crc32
LEFT JOIN indexes i ON i.version = a2.version AND i.container_id = c.id
WHERE a.master_index_id = ? WHERE a.master_index_id = ?
ORDER BY a.archive_id ASC ORDER BY a.archive_id ASC
""".trimIndent() """.trimIndent()
).use { stmt -> ).use { stmt ->
stmt.setInt(1, id) stmt.setObject(1, previousId, Types.INTEGER)
stmt.setInt(2, id)
stmt.executeQuery().use { rows -> stmt.executeQuery().use { rows ->
val indexes = mutableListOf<ByteBuf?>() val indexes = mutableListOf<ByteBuf?>()
@ -224,7 +236,7 @@ public class CacheImporter @Inject constructor(
} }
indexes.filterNotNull().forEach(ByteBuf::retain) indexes.filterNotNull().forEach(ByteBuf::retain)
return@execute indexes return@execute Pair(id, indexes)
} finally { } finally {
indexes.filterNotNull().forEach(ByteBuf::release) indexes.filterNotNull().forEach(ByteBuf::release)
} }
@ -237,7 +249,8 @@ public class CacheImporter @Inject constructor(
archive: Int, archive: Int,
index: Js5Index, index: Js5Index,
buf: ByteBuf, buf: ByteBuf,
uncompressed: ByteBuf uncompressed: ByteBuf,
previousMasterIndexId: Int?
): List<Int> { ): List<Int> {
return database.execute { connection -> return database.execute { connection ->
prepare(connection) prepare(connection)
@ -273,22 +286,33 @@ public class CacheImporter @Inject constructor(
} }
/* /*
* We deliberately ignore groups with truncated versions here and * In order to defend against (crc32, version) collisions, we only
* re-download them, just in case there's a (crc32, truncated version) * use a cached group if its checksum/version haven't changed
* collision. * between the previously downloaded version of the cache and the
* current version. This emulates the behaviour of a client always
* using the latest version of the cache - so if there is a
* collision, real players of the game would experience problems.
*
* We never use cached groups with a truncated version, as these
* are even more likely to be prone to collisions.
*/ */
connection.prepareStatement( connection.prepareStatement(
""" """
SELECT t.group_id SELECT t.group_id
FROM tmp_groups t FROM tmp_groups t
LEFT JOIN groups g ON g.archive_id = ? AND g.group_id = t.group_id AND g.version = t.version AND LEFT JOIN master_index_valid_indexes i ON i.master_index_id = ? AND
NOT g.version_truncated i.archive_id = ?
LEFT JOIN containers c ON c.id = g.container_id AND c.crc32 = t.crc32 LEFT JOIN index_groups ig ON ig.container_id = i.container_id AND ig.group_id = t.group_id AND
ig.crc32 = t.crc32 AND ig.version = t.version
LEFT JOIN groups g ON g.archive_id = i.archive_id AND g.group_id = ig.group_id AND
g.version = ig.version AND NOT g.version_truncated
LEFT JOIN containers c ON c.id = g.container_id AND c.crc32 = ig.crc32
WHERE g.container_id IS NULL WHERE g.container_id IS NULL
ORDER BY t.group_id ASC ORDER BY t.group_id ASC
""".trimIndent() """.trimIndent()
).use { stmt -> ).use { stmt ->
stmt.setInt(1, archive) stmt.setObject(1, previousMasterIndexId, Types.INTEGER)
stmt.setInt(2, archive)
stmt.executeQuery().use { rows -> stmt.executeQuery().use { rows ->
val groups = mutableListOf<Int>() val groups = mutableListOf<Int>()
@ -770,6 +794,21 @@ public class CacheImporter @Inject constructor(
} }
} }
public suspend fun setMasterIndexId(gameId: Int, masterIndexId: Int) {
database.execute { connection ->
connection.prepareStatement(
"""
UPDATE games SET master_index_id = ? WHERE id = ?
""".trimIndent()
).use { stmt ->
stmt.setInt(1, masterIndexId)
stmt.setInt(2, gameId)
stmt.execute()
}
}
}
public companion object { public companion object {
public const val BATCH_SIZE: Int = 1024 public const val BATCH_SIZE: Int = 1024
} }

@ -35,6 +35,7 @@ public class Js5ChannelHandler(
private val hostname: String, private val hostname: String,
private val port: Int, private val port: Int,
private var build: Int, private var build: Int,
private val previousMasterIndexId: Int?,
private val continuation: Continuation<Unit>, private val continuation: Continuation<Unit>,
private val importer: CacheImporter, private val importer: CacheImporter,
private val masterIndexFormat: MasterIndexFormat = MasterIndexFormat.VERSIONED, private val masterIndexFormat: MasterIndexFormat = MasterIndexFormat.VERSIONED,
@ -44,6 +45,7 @@ public class Js5ChannelHandler(
private val maxBuild = build + maxBuildAttempts private val maxBuild = build + maxBuildAttempts
private val inFlightRequests = mutableSetOf<Js5Request.Group>() private val inFlightRequests = mutableSetOf<Js5Request.Group>()
private val pendingRequests = ArrayDeque<Js5Request.Group>() private val pendingRequests = ArrayDeque<Js5Request.Group>()
private var currentMasterIndexId: Int = 0
private var masterIndex: Js5MasterIndex? = null private var masterIndex: Js5MasterIndex? = null
private lateinit var indexes: Array<Js5Index?> private lateinit var indexes: Array<Js5Index?>
private val groups = mutableListOf<CacheImporter.Group>() private val groups = mutableListOf<CacheImporter.Group>()
@ -155,6 +157,10 @@ public class Js5ChannelHandler(
} }
if (complete) { if (complete) {
runBlocking {
importer.setMasterIndexId(gameId, currentMasterIndexId)
}
ctx.close() ctx.close()
continuation.resume(Unit) continuation.resume(Unit)
} }
@ -164,14 +170,15 @@ public class Js5ChannelHandler(
Js5Compression.uncompress(buf.slice()).use { uncompressed -> Js5Compression.uncompress(buf.slice()).use { uncompressed ->
masterIndex = Js5MasterIndex.read(uncompressed.slice(), masterIndexFormat) masterIndex = Js5MasterIndex.read(uncompressed.slice(), masterIndexFormat)
val rawIndexes = runBlocking { val name = "Downloaded from $hostname:$port"
val name = "Downloaded from $hostname:$port" val (id, rawIndexes) = runBlocking {
importer.importMasterIndexAndGetIndexes( importer.importMasterIndexAndGetIndexes(
masterIndex!!, masterIndex!!,
buf, buf,
uncompressed, uncompressed,
gameId, gameId,
build, build,
previousMasterIndexId,
timestamp = Instant.now(), timestamp = Instant.now(),
name name
) )
@ -189,6 +196,8 @@ public class Js5ChannelHandler(
} finally { } finally {
rawIndexes.filterNotNull().forEach(ByteBuf::release) rawIndexes.filterNotNull().forEach(ByteBuf::release)
} }
currentMasterIndexId = id
} }
} }
@ -207,7 +216,7 @@ public class Js5ChannelHandler(
} }
val groups = runBlocking { val groups = runBlocking {
importer.importIndexAndGetMissingGroups(archive, index, buf, uncompressed) importer.importIndexAndGetMissingGroups(archive, index, buf, uncompressed, previousMasterIndexId)
} }
for (group in groups) { for (group in groups) {
request(archive, group) request(archive, group)

@ -4,5 +4,6 @@ public data class Game(
public val id: Int, public val id: Int,
public val hostname: String?, public val hostname: String?,
public val port: Int?, public val port: Int?,
public val build: Int? public val build: Int?,
public val previousMasterIndexId: Int?
) )

@ -12,7 +12,7 @@ public class GameDatabase @Inject constructor(
return database.execute { connection -> return database.execute { connection ->
connection.prepareStatement( connection.prepareStatement(
""" """
SELECT id, hostname, port, build SELECT id, hostname, port, build, master_index_id
FROM games FROM games
WHERE name = ? WHERE name = ?
""".trimIndent() """.trimIndent()
@ -37,7 +37,12 @@ public class GameDatabase @Inject constructor(
build = null build = null
} }
return@execute Game(id, hostname, port, build) var masterIndexId: Int? = rows.getInt(5)
if (rows.wasNull()) {
masterIndexId = null
}
return@execute Game(id, hostname, port, build, masterIndexId)
} }
} }
} }

@ -108,6 +108,8 @@ CREATE TABLE master_indexes (
UNIQUE (container_id, format) UNIQUE (container_id, format)
); );
ALTER TABLE games ADD COLUMN master_index_id INT NULL REFERENCES master_indexes (id);
CREATE TABLE master_index_archives ( CREATE TABLE master_index_archives (
master_index_id INTEGER NOT NULL REFERENCES master_indexes (id), master_index_id INTEGER NOT NULL REFERENCES master_indexes (id),
archive_id uint1 NOT NULL, archive_id uint1 NOT NULL,

Loading…
Cancel
Save