From fff63285fe3ecbd7daa85c313a0b4ad5cc3d5369 Mon Sep 17 00:00:00 2001 From: Graham Date: Thu, 31 Dec 2020 11:36:54 +0000 Subject: [PATCH] Re-combine CacheImporter and ContainerImporter Signed-off-by: Graham --- .../openrs2/archive/cache/CacheImporter.kt | 114 ++++++++++++++++-- .../openrs2/archive/container/Container.kt | 16 --- .../archive/container/ContainerImporter.kt | 94 --------------- 3 files changed, 107 insertions(+), 117 deletions(-) delete mode 100644 archive/src/main/kotlin/org/openrs2/archive/container/Container.kt delete mode 100644 archive/src/main/kotlin/org/openrs2/archive/container/ContainerImporter.kt diff --git a/archive/src/main/kotlin/org/openrs2/archive/cache/CacheImporter.kt b/archive/src/main/kotlin/org/openrs2/archive/cache/CacheImporter.kt index 2daed08f6e..b4034f4161 100644 --- a/archive/src/main/kotlin/org/openrs2/archive/cache/CacheImporter.kt +++ b/archive/src/main/kotlin/org/openrs2/archive/cache/CacheImporter.kt @@ -2,8 +2,9 @@ package org.openrs2.archive.cache import io.netty.buffer.ByteBuf import io.netty.buffer.ByteBufAllocator -import org.openrs2.archive.container.Container -import org.openrs2.archive.container.ContainerImporter +import io.netty.buffer.ByteBufUtil +import io.netty.buffer.DefaultByteBufHolder +import org.openrs2.buffer.crc32 import org.openrs2.buffer.use import org.openrs2.cache.Js5Archive import org.openrs2.cache.Js5Compression @@ -12,6 +13,7 @@ import org.openrs2.cache.Js5Index import org.openrs2.cache.Js5MasterIndex import org.openrs2.cache.Store import org.openrs2.cache.VersionTrailer +import org.openrs2.crypto.Whirlpool import org.openrs2.db.Database import java.io.IOException import java.sql.Connection @@ -24,6 +26,15 @@ public class CacheImporter @Inject constructor( private val database: Database, private val alloc: ByteBufAllocator ) { + private abstract class Container( + data: ByteBuf + ) : DefaultByteBufHolder(data) { + val bytes: ByteArray = ByteBufUtil.getBytes(data, data.readerIndex(), data.readableBytes(), false) + val crc32: Int = data.crc32() + val whirlpool: ByteArray = Whirlpool.whirlpool(bytes) + abstract val encrypted: Boolean + } + private class MasterIndex( val index: Js5MasterIndex, data: ByteBuf @@ -48,7 +59,7 @@ public class CacheImporter @Inject constructor( public suspend fun import(store: Store) { database.execute { connection -> - ContainerImporter.prepare(connection) + prepare(connection) // import master index val masterIndex = createMasterIndex(store) @@ -107,7 +118,7 @@ public class CacheImporter @Inject constructor( val masterIndex = MasterIndex(Js5MasterIndex.read(uncompressed.slice()), buf) database.execute { connection -> - ContainerImporter.prepare(connection) + prepare(connection) addMasterIndex(connection, masterIndex) } } @@ -127,7 +138,7 @@ public class CacheImporter @Inject constructor( // TODO(gpe): skip most of this function if we encounter a conflict? private fun addMasterIndex(connection: Connection, masterIndex: MasterIndex) { - val containerId = ContainerImporter.addContainer(connection, masterIndex) + val containerId = addContainer(connection, masterIndex) connection.prepareStatement( """ @@ -173,7 +184,7 @@ public class CacheImporter @Inject constructor( } private fun addGroups(connection: Connection, groups: List) { - val containerIds = ContainerImporter.addContainers(connection, groups) + val containerIds = addContainers(connection, groups) connection.prepareStatement( """ @@ -204,7 +215,7 @@ public class CacheImporter @Inject constructor( // TODO(gpe): skip most of this function if we encounter a conflict? private fun addIndex(connection: Connection, index: Index) { - val containerId = ContainerImporter.addContainer(connection, index) + val containerId = addContainer(connection, index) connection.prepareStatement( """ @@ -271,6 +282,95 @@ public class CacheImporter @Inject constructor( } } + private fun prepare(connection: Connection) { + connection.prepareStatement( + """ + LOCK TABLE containers IN EXCLUSIVE MODE + """.trimIndent() + ).use { stmt -> + stmt.execute() + } + + connection.prepareStatement( + """ + CREATE TEMPORARY TABLE tmp_containers ( + index INTEGER NOT NULL, + crc32 INTEGER NOT NULL, + whirlpool BYTEA NOT NULL, + data BYTEA NOT NULL, + encrypted BOOLEAN NOT NULL + ) ON COMMIT DROP + """.trimIndent() + ).use { stmt -> + stmt.execute() + } + } + + private fun addContainer(connection: Connection, container: Container): Long { + return addContainers(connection, listOf(container)).single() + } + + private fun addContainers(connection: Connection, containers: List): List { + connection.prepareStatement( + """ + TRUNCATE TABLE tmp_containers + """.trimIndent() + ).use { stmt -> + stmt.execute() + } + + connection.prepareStatement( + """ + INSERT INTO tmp_containers (index, crc32, whirlpool, data, encrypted) + VALUES (?, ?, ?, ?, ?) + """.trimIndent() + ).use { stmt -> + for ((i, container) in containers.withIndex()) { + stmt.setInt(1, i) + stmt.setInt(2, container.crc32) + stmt.setBytes(3, container.whirlpool) + stmt.setBytes(4, container.bytes) + stmt.setBoolean(5, container.encrypted) + stmt.addBatch() + } + + stmt.executeBatch() + } + + connection.prepareStatement( + """ + INSERT INTO containers (crc32, whirlpool, data, encrypted) + SELECT t.crc32, t.whirlpool, t.data, t.encrypted + FROM tmp_containers t + LEFT JOIN containers c ON c.whirlpool = t.whirlpool + WHERE c.whirlpool IS NULL + ON CONFLICT DO NOTHING + """.trimIndent() + ).use { stmt -> + stmt.execute() + } + + val ids = mutableListOf() + + connection.prepareStatement( + """ + SELECT c.id + FROM tmp_containers t + JOIN containers c ON c.whirlpool = t.whirlpool + ORDER BY t.index ASC + """.trimIndent() + ).use { stmt -> + stmt.executeQuery().use { rows -> + while (rows.next()) { + ids += rows.getLong(1) + } + } + } + + check(ids.size == containers.size) + return ids + } + private companion object { private const val BATCH_SIZE = 1024 } diff --git a/archive/src/main/kotlin/org/openrs2/archive/container/Container.kt b/archive/src/main/kotlin/org/openrs2/archive/container/Container.kt deleted file mode 100644 index fdcfca41e4..0000000000 --- a/archive/src/main/kotlin/org/openrs2/archive/container/Container.kt +++ /dev/null @@ -1,16 +0,0 @@ -package org.openrs2.archive.container - -import io.netty.buffer.ByteBuf -import io.netty.buffer.ByteBufUtil -import io.netty.buffer.DefaultByteBufHolder -import org.openrs2.buffer.crc32 -import org.openrs2.crypto.Whirlpool - -public abstract class Container( - data: ByteBuf -) : DefaultByteBufHolder(data) { - public val bytes: ByteArray = ByteBufUtil.getBytes(data, data.readerIndex(), data.readableBytes(), false) - public val crc32: Int = data.crc32() - public val whirlpool: ByteArray = Whirlpool.whirlpool(bytes) - public abstract val encrypted: Boolean -} diff --git a/archive/src/main/kotlin/org/openrs2/archive/container/ContainerImporter.kt b/archive/src/main/kotlin/org/openrs2/archive/container/ContainerImporter.kt deleted file mode 100644 index d1874108a9..0000000000 --- a/archive/src/main/kotlin/org/openrs2/archive/container/ContainerImporter.kt +++ /dev/null @@ -1,94 +0,0 @@ -package org.openrs2.archive.container - -import java.sql.Connection - -public object ContainerImporter { - public fun prepare(connection: Connection) { - connection.prepareStatement( - """ - LOCK TABLE containers IN EXCLUSIVE MODE - """.trimIndent() - ).use { stmt -> - stmt.execute() - } - - connection.prepareStatement( - """ - CREATE TEMPORARY TABLE tmp_containers ( - index INTEGER NOT NULL, - crc32 INTEGER NOT NULL, - whirlpool BYTEA NOT NULL, - data BYTEA NOT NULL, - encrypted BOOLEAN NOT NULL - ) ON COMMIT DROP - """.trimIndent() - ).use { stmt -> - stmt.execute() - } - } - - public fun addContainer(connection: Connection, container: Container): Long { - return addContainers(connection, listOf(container)).single() - } - - public fun addContainers(connection: Connection, containers: List): List { - connection.prepareStatement( - """ - TRUNCATE TABLE tmp_containers - """.trimIndent() - ).use { stmt -> - stmt.execute() - } - - connection.prepareStatement( - """ - INSERT INTO tmp_containers (index, crc32, whirlpool, data, encrypted) - VALUES (?, ?, ?, ?, ?) - """.trimIndent() - ).use { stmt -> - for ((i, container) in containers.withIndex()) { - stmt.setInt(1, i) - stmt.setInt(2, container.crc32) - stmt.setBytes(3, container.whirlpool) - stmt.setBytes(4, container.bytes) - stmt.setBoolean(5, container.encrypted) - stmt.addBatch() - } - - stmt.executeBatch() - } - - connection.prepareStatement( - """ - INSERT INTO containers (crc32, whirlpool, data, encrypted) - SELECT t.crc32, t.whirlpool, t.data, t.encrypted - FROM tmp_containers t - LEFT JOIN containers c ON c.whirlpool = t.whirlpool - WHERE c.whirlpool IS NULL - ON CONFLICT DO NOTHING - """.trimIndent() - ).use { stmt -> - stmt.execute() - } - - val ids = mutableListOf() - - connection.prepareStatement( - """ - SELECT c.id - FROM tmp_containers t - JOIN containers c ON c.whirlpool = t.whirlpool - ORDER BY t.index ASC - """.trimIndent() - ).use { stmt -> - stmt.executeQuery().use { rows -> - while (rows.next()) { - ids += rows.getLong(1) - } - } - } - - check(ids.size == containers.size) - return ids - } -}