Re-combine CacheImporter and ContainerImporter

Signed-off-by: Graham <gpe@openrs2.org>
Graham 4 years ago
parent 4f16713f01
commit fff63285fe
  1. 114
      archive/src/main/kotlin/org/openrs2/archive/cache/CacheImporter.kt
  2. 16
      archive/src/main/kotlin/org/openrs2/archive/container/Container.kt
  3. 94
      archive/src/main/kotlin/org/openrs2/archive/container/ContainerImporter.kt

@ -2,8 +2,9 @@ package org.openrs2.archive.cache
import io.netty.buffer.ByteBuf import io.netty.buffer.ByteBuf
import io.netty.buffer.ByteBufAllocator import io.netty.buffer.ByteBufAllocator
import org.openrs2.archive.container.Container import io.netty.buffer.ByteBufUtil
import org.openrs2.archive.container.ContainerImporter import io.netty.buffer.DefaultByteBufHolder
import org.openrs2.buffer.crc32
import org.openrs2.buffer.use import org.openrs2.buffer.use
import org.openrs2.cache.Js5Archive import org.openrs2.cache.Js5Archive
import org.openrs2.cache.Js5Compression import org.openrs2.cache.Js5Compression
@ -12,6 +13,7 @@ import org.openrs2.cache.Js5Index
import org.openrs2.cache.Js5MasterIndex import org.openrs2.cache.Js5MasterIndex
import org.openrs2.cache.Store import org.openrs2.cache.Store
import org.openrs2.cache.VersionTrailer import org.openrs2.cache.VersionTrailer
import org.openrs2.crypto.Whirlpool
import org.openrs2.db.Database import org.openrs2.db.Database
import java.io.IOException import java.io.IOException
import java.sql.Connection import java.sql.Connection
@ -24,6 +26,15 @@ public class CacheImporter @Inject constructor(
private val database: Database, private val database: Database,
private val alloc: ByteBufAllocator private val alloc: ByteBufAllocator
) { ) {
private abstract class Container(
data: ByteBuf
) : DefaultByteBufHolder(data) {
val bytes: ByteArray = ByteBufUtil.getBytes(data, data.readerIndex(), data.readableBytes(), false)
val crc32: Int = data.crc32()
val whirlpool: ByteArray = Whirlpool.whirlpool(bytes)
abstract val encrypted: Boolean
}
private class MasterIndex( private class MasterIndex(
val index: Js5MasterIndex, val index: Js5MasterIndex,
data: ByteBuf data: ByteBuf
@ -48,7 +59,7 @@ public class CacheImporter @Inject constructor(
public suspend fun import(store: Store) { public suspend fun import(store: Store) {
database.execute { connection -> database.execute { connection ->
ContainerImporter.prepare(connection) prepare(connection)
// import master index // import master index
val masterIndex = createMasterIndex(store) val masterIndex = createMasterIndex(store)
@ -107,7 +118,7 @@ public class CacheImporter @Inject constructor(
val masterIndex = MasterIndex(Js5MasterIndex.read(uncompressed.slice()), buf) val masterIndex = MasterIndex(Js5MasterIndex.read(uncompressed.slice()), buf)
database.execute { connection -> database.execute { connection ->
ContainerImporter.prepare(connection) prepare(connection)
addMasterIndex(connection, masterIndex) addMasterIndex(connection, masterIndex)
} }
} }
@ -127,7 +138,7 @@ public class CacheImporter @Inject constructor(
// TODO(gpe): skip most of this function if we encounter a conflict? // TODO(gpe): skip most of this function if we encounter a conflict?
private fun addMasterIndex(connection: Connection, masterIndex: MasterIndex) { private fun addMasterIndex(connection: Connection, masterIndex: MasterIndex) {
val containerId = ContainerImporter.addContainer(connection, masterIndex) val containerId = addContainer(connection, masterIndex)
connection.prepareStatement( connection.prepareStatement(
""" """
@ -173,7 +184,7 @@ public class CacheImporter @Inject constructor(
} }
private fun addGroups(connection: Connection, groups: List<Group>) { private fun addGroups(connection: Connection, groups: List<Group>) {
val containerIds = ContainerImporter.addContainers(connection, groups) val containerIds = addContainers(connection, groups)
connection.prepareStatement( connection.prepareStatement(
""" """
@ -204,7 +215,7 @@ public class CacheImporter @Inject constructor(
// TODO(gpe): skip most of this function if we encounter a conflict? // TODO(gpe): skip most of this function if we encounter a conflict?
private fun addIndex(connection: Connection, index: Index) { private fun addIndex(connection: Connection, index: Index) {
val containerId = ContainerImporter.addContainer(connection, index) val containerId = addContainer(connection, index)
connection.prepareStatement( connection.prepareStatement(
""" """
@ -271,6 +282,95 @@ public class CacheImporter @Inject constructor(
} }
} }
private fun prepare(connection: Connection) {
connection.prepareStatement(
"""
LOCK TABLE containers IN EXCLUSIVE MODE
""".trimIndent()
).use { stmt ->
stmt.execute()
}
connection.prepareStatement(
"""
CREATE TEMPORARY TABLE tmp_containers (
index INTEGER NOT NULL,
crc32 INTEGER NOT NULL,
whirlpool BYTEA NOT NULL,
data BYTEA NOT NULL,
encrypted BOOLEAN NOT NULL
) ON COMMIT DROP
""".trimIndent()
).use { stmt ->
stmt.execute()
}
}
private fun addContainer(connection: Connection, container: Container): Long {
return addContainers(connection, listOf(container)).single()
}
private fun addContainers(connection: Connection, containers: List<Container>): List<Long> {
connection.prepareStatement(
"""
TRUNCATE TABLE tmp_containers
""".trimIndent()
).use { stmt ->
stmt.execute()
}
connection.prepareStatement(
"""
INSERT INTO tmp_containers (index, crc32, whirlpool, data, encrypted)
VALUES (?, ?, ?, ?, ?)
""".trimIndent()
).use { stmt ->
for ((i, container) in containers.withIndex()) {
stmt.setInt(1, i)
stmt.setInt(2, container.crc32)
stmt.setBytes(3, container.whirlpool)
stmt.setBytes(4, container.bytes)
stmt.setBoolean(5, container.encrypted)
stmt.addBatch()
}
stmt.executeBatch()
}
connection.prepareStatement(
"""
INSERT INTO containers (crc32, whirlpool, data, encrypted)
SELECT t.crc32, t.whirlpool, t.data, t.encrypted
FROM tmp_containers t
LEFT JOIN containers c ON c.whirlpool = t.whirlpool
WHERE c.whirlpool IS NULL
ON CONFLICT DO NOTHING
""".trimIndent()
).use { stmt ->
stmt.execute()
}
val ids = mutableListOf<Long>()
connection.prepareStatement(
"""
SELECT c.id
FROM tmp_containers t
JOIN containers c ON c.whirlpool = t.whirlpool
ORDER BY t.index ASC
""".trimIndent()
).use { stmt ->
stmt.executeQuery().use { rows ->
while (rows.next()) {
ids += rows.getLong(1)
}
}
}
check(ids.size == containers.size)
return ids
}
private companion object { private companion object {
private const val BATCH_SIZE = 1024 private const val BATCH_SIZE = 1024
} }

@ -1,16 +0,0 @@
package org.openrs2.archive.container
import io.netty.buffer.ByteBuf
import io.netty.buffer.ByteBufUtil
import io.netty.buffer.DefaultByteBufHolder
import org.openrs2.buffer.crc32
import org.openrs2.crypto.Whirlpool
public abstract class Container(
data: ByteBuf
) : DefaultByteBufHolder(data) {
public val bytes: ByteArray = ByteBufUtil.getBytes(data, data.readerIndex(), data.readableBytes(), false)
public val crc32: Int = data.crc32()
public val whirlpool: ByteArray = Whirlpool.whirlpool(bytes)
public abstract val encrypted: Boolean
}

@ -1,94 +0,0 @@
package org.openrs2.archive.container
import java.sql.Connection
public object ContainerImporter {
public fun prepare(connection: Connection) {
connection.prepareStatement(
"""
LOCK TABLE containers IN EXCLUSIVE MODE
""".trimIndent()
).use { stmt ->
stmt.execute()
}
connection.prepareStatement(
"""
CREATE TEMPORARY TABLE tmp_containers (
index INTEGER NOT NULL,
crc32 INTEGER NOT NULL,
whirlpool BYTEA NOT NULL,
data BYTEA NOT NULL,
encrypted BOOLEAN NOT NULL
) ON COMMIT DROP
""".trimIndent()
).use { stmt ->
stmt.execute()
}
}
public fun addContainer(connection: Connection, container: Container): Long {
return addContainers(connection, listOf(container)).single()
}
public fun addContainers(connection: Connection, containers: List<Container>): List<Long> {
connection.prepareStatement(
"""
TRUNCATE TABLE tmp_containers
""".trimIndent()
).use { stmt ->
stmt.execute()
}
connection.prepareStatement(
"""
INSERT INTO tmp_containers (index, crc32, whirlpool, data, encrypted)
VALUES (?, ?, ?, ?, ?)
""".trimIndent()
).use { stmt ->
for ((i, container) in containers.withIndex()) {
stmt.setInt(1, i)
stmt.setInt(2, container.crc32)
stmt.setBytes(3, container.whirlpool)
stmt.setBytes(4, container.bytes)
stmt.setBoolean(5, container.encrypted)
stmt.addBatch()
}
stmt.executeBatch()
}
connection.prepareStatement(
"""
INSERT INTO containers (crc32, whirlpool, data, encrypted)
SELECT t.crc32, t.whirlpool, t.data, t.encrypted
FROM tmp_containers t
LEFT JOIN containers c ON c.whirlpool = t.whirlpool
WHERE c.whirlpool IS NULL
ON CONFLICT DO NOTHING
""".trimIndent()
).use { stmt ->
stmt.execute()
}
val ids = mutableListOf<Long>()
connection.prepareStatement(
"""
SELECT c.id
FROM tmp_containers t
JOIN containers c ON c.whirlpool = t.whirlpool
ORDER BY t.index ASC
""".trimIndent()
).use { stmt ->
stmt.executeQuery().use { rows ->
while (rows.next()) {
ids += rows.getLong(1)
}
}
}
check(ids.size == containers.size)
return ids
}
}
Loading…
Cancel
Save