From 4e90bd76b5a63e3dbe6447cac5ff3d0e121579b5 Mon Sep 17 00:00:00 2001 From: Graham Date: Wed, 30 Dec 2020 12:24:54 +0000 Subject: [PATCH] Remove the cache and cache_index tables These effectively duplicate the master index tables, but in a less flexible manner - as they don't support importing a master index where some of the indexes are missing. This commit also combines MasterIndexImporter with CacheImporter, to make it easier to re-use code. Signed-off-by: Graham --- .../org/openrs2/archive/ArchiveCommand.kt | 2 - .../org/openrs2/archive/cache/CacheCommand.kt | 1 + .../openrs2/archive/cache/CacheExporter.kt | 23 ++- .../openrs2/archive/cache/CacheImporter.kt | 159 +++++++++--------- .../ImportMasterIndexCommand.kt} | 8 +- .../archive/masterindex/MasterIndexCommand.kt | 12 -- .../masterindex/MasterIndexImporter.kt | 63 ------- .../org/openrs2/archive/V1__init.sql | 19 --- 8 files changed, 97 insertions(+), 190 deletions(-) rename archive/src/main/kotlin/org/openrs2/archive/{masterindex/ImportCommand.kt => cache/ImportMasterIndexCommand.kt} (74%) delete mode 100644 archive/src/main/kotlin/org/openrs2/archive/masterindex/MasterIndexCommand.kt delete mode 100644 archive/src/main/kotlin/org/openrs2/archive/masterindex/MasterIndexImporter.kt diff --git a/archive/src/main/kotlin/org/openrs2/archive/ArchiveCommand.kt b/archive/src/main/kotlin/org/openrs2/archive/ArchiveCommand.kt index 980ea5bf5d..95ce488d24 100644 --- a/archive/src/main/kotlin/org/openrs2/archive/ArchiveCommand.kt +++ b/archive/src/main/kotlin/org/openrs2/archive/ArchiveCommand.kt @@ -4,7 +4,6 @@ import com.github.ajalt.clikt.core.NoOpCliktCommand import com.github.ajalt.clikt.core.subcommands import org.openrs2.archive.cache.CacheCommand import org.openrs2.archive.key.KeyCommand -import org.openrs2.archive.masterindex.MasterIndexCommand import org.openrs2.archive.name.NameCommand public fun main(args: Array): Unit = ArchiveCommand().main(args) @@ -14,7 +13,6 @@ public class ArchiveCommand : NoOpCliktCommand(name = "archive") { subcommands( CacheCommand(), KeyCommand(), - MasterIndexCommand(), NameCommand() ) } diff --git a/archive/src/main/kotlin/org/openrs2/archive/cache/CacheCommand.kt b/archive/src/main/kotlin/org/openrs2/archive/cache/CacheCommand.kt index 072654ac43..1f57945d09 100644 --- a/archive/src/main/kotlin/org/openrs2/archive/cache/CacheCommand.kt +++ b/archive/src/main/kotlin/org/openrs2/archive/cache/CacheCommand.kt @@ -7,6 +7,7 @@ public class CacheCommand : NoOpCliktCommand(name = "cache") { init { subcommands( ImportCommand(), + ImportMasterIndexCommand(), ExportCommand() ) } diff --git a/archive/src/main/kotlin/org/openrs2/archive/cache/CacheExporter.kt b/archive/src/main/kotlin/org/openrs2/archive/cache/CacheExporter.kt index cb3a77aea6..a8b22ba037 100644 --- a/archive/src/main/kotlin/org/openrs2/archive/cache/CacheExporter.kt +++ b/archive/src/main/kotlin/org/openrs2/archive/cache/CacheExporter.kt @@ -18,17 +18,22 @@ public class CacheExporter @Inject constructor( database.execute { connection -> connection.prepareStatement( """ - SELECT 255::uint1, ci.archive_id::INTEGER, c.data, NULL - FROM cache_indexes ci - JOIN containers c ON c.id = ci.container_id - WHERE ci.cache_id = ? + SELECT 255::uint1, e.archive_id::INTEGER, c.data, NULL + FROM master_index_entries e + JOIN master_indexes m ON m.container_id = e.container_id + JOIN containers c ON c.crc32 = e.crc32 + JOIN indexes i ON i.container_id = c.id AND i.version = e.version + WHERE m.container_id = ? UNION ALL - SELECT ci.archive_id, ig.group_id, c.data, g.truncated_version - FROM cache_indexes ci - JOIN index_groups ig ON ig.container_id = ci.container_id - JOIN groups g ON g.archive_id = ci.archive_id AND g.group_id = ig.group_id AND g.truncated_version = ig.version & 65535 + SELECT e.archive_id, ig.group_id, c.data, g.truncated_version + FROM master_index_entries e + JOIN master_indexes m ON m.container_id = e.container_id + JOIN containers ic ON ic.crc32 = e.crc32 + JOIN indexes i ON i.container_id = ic.id AND i.version = e.version + JOIN index_groups ig ON ig.container_id = i.container_id + JOIN groups g ON g.archive_id = e.archive_id AND g.group_id = ig.group_id AND g.truncated_version = ig.version & 65535 JOIN containers c ON c.id = g.container_id AND c.crc32 = ig.crc32 - WHERE ci.cache_id = ? + WHERE m.container_id = ? """.trimIndent() ).use { stmt -> stmt.fetchSize = BATCH_SIZE diff --git a/archive/src/main/kotlin/org/openrs2/archive/cache/CacheImporter.kt b/archive/src/main/kotlin/org/openrs2/archive/cache/CacheImporter.kt index 8fd387292a..2daed08f6e 100644 --- a/archive/src/main/kotlin/org/openrs2/archive/cache/CacheImporter.kt +++ b/archive/src/main/kotlin/org/openrs2/archive/cache/CacheImporter.kt @@ -2,16 +2,16 @@ package org.openrs2.archive.cache import io.netty.buffer.ByteBuf import io.netty.buffer.ByteBufAllocator -import io.netty.buffer.ByteBufUtil import org.openrs2.archive.container.Container import org.openrs2.archive.container.ContainerImporter import org.openrs2.buffer.use import org.openrs2.cache.Js5Archive import org.openrs2.cache.Js5Compression +import org.openrs2.cache.Js5CompressionType import org.openrs2.cache.Js5Index +import org.openrs2.cache.Js5MasterIndex import org.openrs2.cache.Store import org.openrs2.cache.VersionTrailer -import org.openrs2.crypto.Whirlpool import org.openrs2.db.Database import java.io.IOException import java.sql.Connection @@ -24,8 +24,14 @@ public class CacheImporter @Inject constructor( private val database: Database, private val alloc: ByteBufAllocator ) { + private class MasterIndex( + val index: Js5MasterIndex, + data: ByteBuf + ) : Container(data) { + override val encrypted: Boolean = false + } + private class Index( - val archive: Int, val index: Js5Index, data: ByteBuf ) : Container(data) { @@ -44,6 +50,14 @@ public class CacheImporter @Inject constructor( database.execute { connection -> ContainerImporter.prepare(connection) + // import master index + val masterIndex = createMasterIndex(store) + try { + addMasterIndex(connection, masterIndex) + } finally { + masterIndex.release() + } + // import indexes val indexes = mutableListOf() try { @@ -51,10 +65,8 @@ public class CacheImporter @Inject constructor( indexes += readIndex(store, archive) } - val cacheId = addCache(connection, indexes) - for (index in indexes) { - addIndex(connection, cacheId, index) + addIndex(connection, index) } } finally { indexes.forEach(Index::release) @@ -90,6 +102,64 @@ public class CacheImporter @Inject constructor( } } + public suspend fun importMasterIndex(buf: ByteBuf) { + Js5Compression.uncompress(buf.slice()).use { uncompressed -> + val masterIndex = MasterIndex(Js5MasterIndex.read(uncompressed.slice()), buf) + + database.execute { connection -> + ContainerImporter.prepare(connection) + addMasterIndex(connection, masterIndex) + } + } + } + + private fun createMasterIndex(store: Store): MasterIndex { + val index = Js5MasterIndex.create(store) + + alloc.buffer().use { uncompressed -> + index.write(uncompressed) + + Js5Compression.compress(uncompressed, Js5CompressionType.UNCOMPRESSED).use { buf -> + return MasterIndex(index, buf.retain()) + } + } + } + + // TODO(gpe): skip most of this function if we encounter a conflict? + private fun addMasterIndex(connection: Connection, masterIndex: MasterIndex) { + val containerId = ContainerImporter.addContainer(connection, masterIndex) + + connection.prepareStatement( + """ + INSERT INTO master_indexes (container_id) + VALUES (?) + ON CONFLICT DO NOTHING + """.trimIndent() + ).use { stmt -> + stmt.setLong(1, containerId) + stmt.execute() + } + + connection.prepareStatement( + """ + INSERT INTO master_index_entries (container_id, archive_id, crc32, version) + VALUES (?, ?, ?, ?) + ON CONFLICT DO NOTHING + """.trimIndent() + ).use { stmt -> + for ((i, entry) in masterIndex.index.entries.withIndex()) { + stmt.setLong(1, containerId) + stmt.setInt(2, i) + stmt.setInt(3, entry.checksum) + stmt.setInt(4, entry.version) + + stmt.addBatch() + } + + stmt.executeBatch() + } + } + private fun readGroup(store: Store, archive: Int, group: Int): Group? { try { store.read(archive, group).use { buf -> @@ -127,73 +197,13 @@ public class CacheImporter @Inject constructor( private fun readIndex(store: Store, archive: Int): Index { return store.read(Js5Archive.ARCHIVESET, archive).use { buf -> Js5Compression.uncompress(buf.slice()).use { uncompressed -> - Index(archive, Js5Index.read(uncompressed), buf.retain()) - } - } - } - - private fun addCache(connection: Connection, indexes: List): Long { - val len = indexes.size * (1 + Whirlpool.DIGESTBYTES) - val whirlpool = alloc.buffer(len, len).use { buf -> - for (index in indexes) { - buf.writeByte(index.archive) - buf.writeBytes(index.whirlpool) - } - - Whirlpool.whirlpool(ByteBufUtil.getBytes(buf, 0, buf.readableBytes(), false)) - } - - connection.prepareStatement( - """ - SELECT id - FROM caches - WHERE whirlpool = ? - """.trimIndent() - ).use { stmt -> - stmt.setBytes(1, whirlpool) - - stmt.executeQuery().use { rows -> - if (rows.next()) { - return rows.getLong(1) - } - } - } - - connection.prepareStatement( - """ - INSERT INTO caches (whirlpool) - VALUES (?) - ON CONFLICT DO NOTHING - RETURNING id - """.trimIndent() - ).use { stmt -> - stmt.setBytes(1, whirlpool) - - stmt.executeQuery().use { rows -> - if (rows.next()) { - rows.getLong(1) - } - } - } - - connection.prepareStatement( - """ - SELECT id - FROM caches - WHERE whirlpool = ? - """.trimIndent() - ).use { stmt -> - stmt.setBytes(1, whirlpool) - - stmt.executeQuery().use { rows -> - check(rows.next()) - return rows.getLong(1) + Index(Js5Index.read(uncompressed), buf.retain()) } } } // TODO(gpe): skip most of this function if we encounter a conflict? - private fun addIndex(connection: Connection, cacheId: Long, index: Index) { + private fun addIndex(connection: Connection, index: Index) { val containerId = ContainerImporter.addContainer(connection, index) connection.prepareStatement( @@ -208,19 +218,6 @@ public class CacheImporter @Inject constructor( stmt.execute() } - connection.prepareStatement( - """ - INSERT INTO cache_indexes (cache_id, archive_id, container_id) - VALUES (?, ?, ?) - ON CONFLICT DO NOTHING - """.trimIndent() - ).use { stmt -> - stmt.setLong(1, cacheId) - stmt.setInt(2, index.archive) - stmt.setLong(3, containerId) - stmt.execute() - } - connection.prepareStatement( """ INSERT INTO index_groups (container_id, group_id, crc32, whirlpool, version, name_hash) diff --git a/archive/src/main/kotlin/org/openrs2/archive/masterindex/ImportCommand.kt b/archive/src/main/kotlin/org/openrs2/archive/cache/ImportMasterIndexCommand.kt similarity index 74% rename from archive/src/main/kotlin/org/openrs2/archive/masterindex/ImportCommand.kt rename to archive/src/main/kotlin/org/openrs2/archive/cache/ImportMasterIndexCommand.kt index 6d689e2846..6ec8b39343 100644 --- a/archive/src/main/kotlin/org/openrs2/archive/masterindex/ImportCommand.kt +++ b/archive/src/main/kotlin/org/openrs2/archive/cache/ImportMasterIndexCommand.kt @@ -1,4 +1,4 @@ -package org.openrs2.archive.masterindex +package org.openrs2.archive.cache import com.github.ajalt.clikt.core.CliktCommand import com.github.ajalt.clikt.parameters.arguments.argument @@ -10,7 +10,7 @@ import org.openrs2.archive.ArchiveModule import org.openrs2.buffer.use import java.nio.file.Files -public class ImportCommand : CliktCommand(name = "import") { +public class ImportMasterIndexCommand : CliktCommand(name = "import-master-index") { private val input by argument().path( mustExist = true, canBeDir = false, @@ -19,10 +19,10 @@ public class ImportCommand : CliktCommand(name = "import") { override fun run(): Unit = runBlocking { val injector = Guice.createInjector(ArchiveModule) - val importer = injector.getInstance(MasterIndexImporter::class.java) + val importer = injector.getInstance(CacheImporter::class.java) Unpooled.wrappedBuffer(Files.readAllBytes(input)).use { buf -> - importer.import(buf) + importer.importMasterIndex(buf) } } } diff --git a/archive/src/main/kotlin/org/openrs2/archive/masterindex/MasterIndexCommand.kt b/archive/src/main/kotlin/org/openrs2/archive/masterindex/MasterIndexCommand.kt deleted file mode 100644 index 32fb569a02..0000000000 --- a/archive/src/main/kotlin/org/openrs2/archive/masterindex/MasterIndexCommand.kt +++ /dev/null @@ -1,12 +0,0 @@ -package org.openrs2.archive.masterindex - -import com.github.ajalt.clikt.core.NoOpCliktCommand -import com.github.ajalt.clikt.core.subcommands - -public class MasterIndexCommand : NoOpCliktCommand(name = "master-index") { - init { - subcommands( - ImportCommand() - ) - } -} diff --git a/archive/src/main/kotlin/org/openrs2/archive/masterindex/MasterIndexImporter.kt b/archive/src/main/kotlin/org/openrs2/archive/masterindex/MasterIndexImporter.kt deleted file mode 100644 index cac49e22ba..0000000000 --- a/archive/src/main/kotlin/org/openrs2/archive/masterindex/MasterIndexImporter.kt +++ /dev/null @@ -1,63 +0,0 @@ -package org.openrs2.archive.masterindex - -import io.netty.buffer.ByteBuf -import org.openrs2.archive.container.Container -import org.openrs2.archive.container.ContainerImporter -import org.openrs2.buffer.use -import org.openrs2.cache.Js5Compression -import org.openrs2.cache.Js5MasterIndex -import org.openrs2.db.Database -import javax.inject.Inject -import javax.inject.Singleton - -@Singleton -public class MasterIndexImporter @Inject constructor( - private val database: Database -) { - private class MasterIndex( - data: ByteBuf, - val index: Js5MasterIndex - ) : Container(data) { - override val encrypted: Boolean = false - } - - public suspend fun import(buf: ByteBuf) { - database.execute { connection -> - ContainerImporter.prepare(connection) - - val masterIndex = Js5Compression.uncompress(buf).use { uncompressed -> - MasterIndex(buf, Js5MasterIndex.read(uncompressed)) - } - - val containerId = ContainerImporter.addContainer(connection, masterIndex) - - connection.prepareStatement( - """ - INSERT INTO master_indexes (container_id) - VALUES (?) - """.trimIndent() - ).use { stmt -> - stmt.setLong(1, containerId) - stmt.execute() - } - - connection.prepareStatement( - """ - INSERT INTO master_index_entries (container_id, archive_id, crc32, version) - VALUES (?, ?, ?, ?) - """.trimIndent() - ).use { stmt -> - for ((i, entry) in masterIndex.index.entries.withIndex()) { - stmt.setLong(1, containerId) - stmt.setInt(2, i) - stmt.setInt(3, entry.checksum) - stmt.setInt(4, entry.version) - - stmt.addBatch() - } - - stmt.executeBatch() - } - } - } -} diff --git a/archive/src/main/resources/org/openrs2/archive/V1__init.sql b/archive/src/main/resources/org/openrs2/archive/V1__init.sql index 7b874e355b..74a99b8c83 100644 --- a/archive/src/main/resources/org/openrs2/archive/V1__init.sql +++ b/archive/src/main/resources/org/openrs2/archive/V1__init.sql @@ -79,25 +79,6 @@ CREATE TABLE master_index_entries ( PRIMARY KEY (container_id, archive_id) ); -CREATE TABLE caches ( - id BIGSERIAL PRIMARY KEY NOT NULL, - -- This doesn't correspond to a hash used by the client - it was just - -- convenient to re-use Whirlpool given we already use it elsewhere in the - -- codebase. - -- - -- It is a hash over the whirlpool hashes of a cache's Js5Indexes, used to - -- make it easier to identify an individual cache row in a relational - -- database. - whirlpool BYTEA UNIQUE NOT NULL -); - -CREATE TABLE cache_indexes ( - cache_id BIGINT NOT NULL REFERENCES caches (id), - archive_id uint1 NOT NULL, - container_id BIGINT NOT NULL REFERENCES indexes (container_id), - PRIMARY KEY (cache_id, archive_id) -); - CREATE TABLE names ( hash INTEGER NOT NULL, name TEXT PRIMARY KEY NOT NULL