Open-source multiplayer game server compatible with the RuneScape client https://www.openrs2.org/
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
openrs2/archive/src/main/kotlin/org/openrs2/archive/cache/CacheImporter.kt

280 lines
8.4 KiB

package org.openrs2.archive.cache
import io.netty.buffer.ByteBuf
import io.netty.buffer.ByteBufAllocator
import io.netty.buffer.ByteBufUtil
import org.openrs2.archive.container.Container
import org.openrs2.archive.container.ContainerImporter
import org.openrs2.buffer.use
import org.openrs2.cache.Js5Archive
import org.openrs2.cache.Js5Compression
import org.openrs2.cache.Js5Index
import org.openrs2.cache.Store
import org.openrs2.cache.VersionTrailer
import org.openrs2.crypto.Whirlpool
import org.openrs2.db.Database
import java.io.IOException
import java.sql.Connection
import java.sql.Types
import javax.inject.Inject
import javax.inject.Singleton
@Singleton
public class CacheImporter @Inject constructor(
private val database: Database,
private val alloc: ByteBufAllocator
) {
private class Index(
val archive: Int,
val index: Js5Index,
data: ByteBuf
) : Container(data) {
override val encrypted: Boolean = false
}
private class Group(
val archive: Int,
val group: Int,
data: ByteBuf,
val version: Int,
override val encrypted: Boolean
) : Container(data)
public suspend fun import(store: Store) {
database.execute { connection ->
ContainerImporter.prepare(connection)
// import indexes
val indexes = mutableListOf<Index>()
try {
for (archive in store.list(Js5Archive.ARCHIVESET)) {
indexes += readIndex(store, archive)
}
val cacheId = addCache(connection, indexes)
for (index in indexes) {
addIndex(connection, cacheId, index)
}
} finally {
indexes.forEach(Index::release)
}
// import groups
val groups = mutableListOf<Group>()
try {
for (archive in store.list()) {
if (archive == Js5Archive.ARCHIVESET) {
continue
}
for (id in store.list(archive)) {
val group = readGroup(store, archive, id) ?: continue
groups += group
if (groups.size >= BATCH_SIZE) {
addGroups(connection, groups)
groups.forEach(Group::release)
groups.clear()
}
}
}
if (groups.isNotEmpty()) {
addGroups(connection, groups)
}
} finally {
groups.forEach(Group::release)
}
}
}
private fun readGroup(store: Store, archive: Int, group: Int): Group? {
try {
store.read(archive, group).use { buf ->
val version = VersionTrailer.strip(buf) ?: return null
val encrypted = Js5Compression.isEncrypted(buf.slice())
return Group(archive, group, buf.retain(), version, encrypted)
}
} catch (ex: IOException) {
return null
}
}
private fun addGroups(connection: Connection, groups: List<Group>) {
val containerIds = ContainerImporter.addContainers(connection, groups)
connection.prepareStatement(
"""
INSERT INTO groups (archive_id, group_id, container_id, truncated_version)
VALUES (?, ?, ?, ?)
ON CONFLICT DO NOTHING
""".trimIndent()
).use { stmt ->
for ((i, group) in groups.withIndex()) {
stmt.setInt(1, group.archive)
stmt.setInt(2, group.group)
stmt.setLong(3, containerIds[i])
stmt.setInt(4, group.version)
stmt.addBatch()
}
stmt.executeBatch()
}
}
private fun readIndex(store: Store, archive: Int): Index {
return store.read(Js5Archive.ARCHIVESET, archive).use { buf ->
Js5Compression.uncompress(buf.slice()).use { uncompressed ->
Index(archive, Js5Index.read(uncompressed), buf.retain())
}
}
}
private fun addCache(connection: Connection, indexes: List<Index>): Long {
val len = indexes.size * (1 + Whirlpool.DIGESTBYTES)
val whirlpool = alloc.buffer(len, len).use { buf ->
for (index in indexes) {
buf.writeByte(index.archive)
buf.writeBytes(index.whirlpool)
}
Whirlpool.whirlpool(ByteBufUtil.getBytes(buf, 0, buf.readableBytes(), false))
}
connection.prepareStatement(
"""
SELECT id
FROM caches
WHERE whirlpool = ?
""".trimIndent()
).use { stmt ->
stmt.setBytes(1, whirlpool)
stmt.executeQuery().use { rows ->
if (rows.next()) {
return rows.getLong(1)
}
}
}
connection.prepareStatement(
"""
INSERT INTO caches (whirlpool)
VALUES (?)
ON CONFLICT DO NOTHING
RETURNING id
""".trimIndent()
).use { stmt ->
stmt.setBytes(1, whirlpool)
stmt.executeQuery().use { rows ->
if (rows.next()) {
rows.getLong(1)
}
}
}
connection.prepareStatement(
"""
SELECT id
FROM caches
WHERE whirlpool = ?
""".trimIndent()
).use { stmt ->
stmt.setBytes(1, whirlpool)
stmt.executeQuery().use { rows ->
check(rows.next())
return rows.getLong(1)
}
}
}
// TODO(gpe): skip most of this function if we encounter a conflict?
private fun addIndex(connection: Connection, cacheId: Long, index: Index) {
val containerId = ContainerImporter.addContainer(connection, index)
connection.prepareStatement(
"""
INSERT INTO indexes (container_id, version)
VALUES (?, ?)
ON CONFLICT DO NOTHING
""".trimIndent()
).use { stmt ->
stmt.setLong(1, containerId)
stmt.setInt(2, index.index.version)
stmt.execute()
}
connection.prepareStatement(
"""
INSERT INTO cache_indexes (cache_id, archive_id, container_id)
VALUES (?, ?, ?)
ON CONFLICT DO NOTHING
""".trimIndent()
).use { stmt ->
stmt.setLong(1, cacheId)
stmt.setInt(2, index.archive)
stmt.setLong(3, containerId)
stmt.execute()
}
connection.prepareStatement(
"""
INSERT INTO index_groups (container_id, group_id, crc32, whirlpool, version, name_hash)
VALUES (?, ?, ?, ?, ?, ?)
ON CONFLICT DO NOTHING
""".trimIndent()
).use { stmt ->
for (group in index.index) {
stmt.setLong(1, containerId)
stmt.setInt(2, group.id)
stmt.setInt(3, group.checksum)
stmt.setBytes(4, group.digest)
stmt.setInt(5, group.version)
if (index.index.hasNames) {
stmt.setInt(6, group.nameHash)
} else {
stmt.setNull(6, Types.INTEGER)
}
stmt.addBatch()
}
stmt.executeBatch()
}
connection.prepareStatement(
"""
INSERT INTO index_files (container_id, group_id, file_id, name_hash)
VALUES (?, ?, ?, ?)
ON CONFLICT DO NOTHING
""".trimIndent()
).use { stmt ->
for (group in index.index) {
for (file in group) {
stmt.setLong(1, containerId)
stmt.setInt(2, group.id)
stmt.setInt(3, file.id)
if (index.index.hasNames) {
stmt.setInt(4, file.nameHash)
} else {
stmt.setNull(4, Types.INTEGER)
}
stmt.addBatch()
}
}
stmt.executeBatch()
}
}
private companion object {
private const val BATCH_SIZE = 1024
}
}