From f079c415f5d7dab49fa22c2d135455c26c1f11ef Mon Sep 17 00:00:00 2001 From: Graham Date: Sat, 11 Dec 2021 20:16:34 +0000 Subject: [PATCH] Add initial support for archiving legacy caches Signed-off-by: Graham --- .../openrs2/archive/cache/CacheExporter.kt | 223 ++++--- .../openrs2/archive/cache/CacheImporter.kt | 584 ++++++++++++++++-- .../openrs2/archive/cache/ExportCommand.kt | 4 +- .../openrs2/archive/web/CachesController.kt | 9 +- .../openrs2/archive/migrations/V9__legacy.sql | 215 +++++++ .../archive/templates/caches/show.html | 111 ++-- 6 files changed, 958 insertions(+), 188 deletions(-) create mode 100644 archive/src/main/resources/org/openrs2/archive/migrations/V9__legacy.sql diff --git a/archive/src/main/kotlin/org/openrs2/archive/cache/CacheExporter.kt b/archive/src/main/kotlin/org/openrs2/archive/cache/CacheExporter.kt index c2f76f3a94..0ac4ea131c 100644 --- a/archive/src/main/kotlin/org/openrs2/archive/cache/CacheExporter.kt +++ b/archive/src/main/kotlin/org/openrs2/archive/cache/CacheExporter.kt @@ -4,6 +4,7 @@ import com.fasterxml.jackson.annotation.JsonProperty import io.netty.buffer.ByteBufAllocator import io.netty.buffer.Unpooled import org.openrs2.buffer.use +import org.openrs2.cache.ChecksumTable import org.openrs2.cache.DiskStore import org.openrs2.cache.Js5Archive import org.openrs2.cache.Js5Compression @@ -13,6 +14,7 @@ import org.openrs2.cache.Store import org.openrs2.crypto.XteaKey import org.openrs2.db.Database import org.postgresql.util.PGobject +import java.sql.Connection import java.time.Instant import java.util.SortedSet import javax.inject.Inject @@ -108,7 +110,8 @@ public class CacheExporter @Inject constructor( val sources: List, val updates: List, val stats: Stats?, - val masterIndex: Js5MasterIndex + val masterIndex: Js5MasterIndex?, + val checksumTable: ChecksumTable? ) public data class Source( @@ -136,25 +139,25 @@ public class CacheExporter @Inject constructor( SELECT * FROM ( SELECT - m.id, + c.id, g.name, array_remove(array_agg(DISTINCT ROW(s.build_major, s.build_minor)::build ORDER BY ROW(s.build_major, s.build_minor)::build ASC), NULL) builds, MIN(s.timestamp) AS timestamp, array_remove(array_agg(DISTINCT s.name ORDER BY s.name ASC), NULL) sources, - ms.valid_indexes, - ms.indexes, - ms.valid_groups, - ms.groups, - ms.valid_keys, - ms.keys, - ms.size, - ms.blocks - FROM master_indexes m - JOIN sources s ON s.master_index_id = m.id + cs.valid_indexes, + cs.indexes, + cs.valid_groups, + cs.groups, + cs.valid_keys, + cs.keys, + cs.size, + cs.blocks + FROM caches c + JOIN sources s ON s.cache_id = c.id JOIN games g ON g.id = s.game_id - LEFT JOIN master_index_stats ms ON ms.master_index_id = m.id - GROUP BY m.id, g.name, ms.valid_indexes, ms.indexes, ms.valid_groups, ms.groups, ms.valid_keys, ms.keys, - ms.size, ms.blocks + LEFT JOIN cache_stats cs ON cs.cache_id = c.id + GROUP BY c.id, g.name, cs.valid_indexes, cs.indexes, cs.valid_groups, cs.groups, cs.valid_keys, cs.keys, + cs.size, cs.blocks ) t ORDER BY t.name ASC, t.builds[1] ASC, t.timestamp ASC """.trimIndent() @@ -201,26 +204,31 @@ public class CacheExporter @Inject constructor( public suspend fun get(id: Int): Cache? { return database.execute { connection -> - val masterIndex: Js5MasterIndex + val masterIndex: Js5MasterIndex? + val checksumTable: ChecksumTable? val stats: Stats? connection.prepareStatement( """ SELECT m.format, - c.data, - ms.valid_indexes, - ms.indexes, - ms.valid_groups, - ms.groups, - ms.valid_keys, - ms.keys, - ms.size, - ms.blocks - FROM master_indexes m - JOIN containers c ON c.id = m.container_id - LEFT JOIN master_index_stats ms ON ms.master_index_id = m.id - WHERE m.id = ? + mc.data, + b.data, + cs.valid_indexes, + cs.indexes, + cs.valid_groups, + cs.groups, + cs.valid_keys, + cs.keys, + cs.size, + cs.blocks + FROM caches c + LEFT JOIN master_indexes m ON m.id = c.id + LEFT JOIN containers mc ON mc.id = m.container_id + LEFT JOIN crc_tables t ON t.id = c.id + LEFT JOIN blobs b ON b.id = t.blob_id + LEFT JOIN cache_stats cs ON cs.cache_id = c.id + WHERE c.id = ? """.trimIndent() ).use { stmt -> stmt.setInt(1, id) @@ -230,25 +238,38 @@ public class CacheExporter @Inject constructor( return@execute null } - val format = MasterIndexFormat.valueOf(rows.getString(1).uppercase()) + val formatString = rows.getString(1) + masterIndex = if (formatString != null) { + Unpooled.wrappedBuffer(rows.getBytes(2)).use { compressed -> + Js5Compression.uncompress(compressed).use { uncompressed -> + val format = MasterIndexFormat.valueOf(formatString.uppercase()) + Js5MasterIndex.readUnverified(uncompressed, format) + } + } + } else { + null + } - masterIndex = Unpooled.wrappedBuffer(rows.getBytes(2)).use { compressed -> - Js5Compression.uncompress(compressed).use { uncompressed -> - Js5MasterIndex.readUnverified(uncompressed, format) + val blob = rows.getBytes(3) + checksumTable = if (blob != null) { + Unpooled.wrappedBuffer(blob).use { buf -> + ChecksumTable.read(buf) } + } else { + null } - val validIndexes = rows.getLong(3) + val validIndexes = rows.getLong(4) stats = if (rows.wasNull()) { null } else { - val indexes = rows.getLong(4) - val validGroups = rows.getLong(5) - val groups = rows.getLong(6) - val validKeys = rows.getLong(7) - val keys = rows.getLong(8) - val size = rows.getLong(9) - val blocks = rows.getLong(10) + val indexes = rows.getLong(5) + val validGroups = rows.getLong(6) + val groups = rows.getLong(7) + val validKeys = rows.getLong(8) + val keys = rows.getLong(9) + val size = rows.getLong(10) + val blocks = rows.getLong(11) Stats(validIndexes, indexes, validGroups, groups, validKeys, keys, size, blocks) } } @@ -261,7 +282,7 @@ public class CacheExporter @Inject constructor( SELECT g.name, s.build_major, s.build_minor, s.timestamp, s.name, s.description, s.url FROM sources s JOIN games g ON g.id = s.game_id - WHERE s.master_index_id = ? + WHERE s.cache_id = ? ORDER BY s.name ASC """.trimIndent() ).use { stmt -> @@ -303,7 +324,7 @@ public class CacheExporter @Inject constructor( """ SELECT url FROM updates - WHERE master_index_id = ? + WHERE cache_id = ? """.trimIndent() ).use { stmt -> stmt.setInt(1, id) @@ -315,45 +336,69 @@ public class CacheExporter @Inject constructor( } } - Cache(id, sources, updates, stats, masterIndex) + Cache(id, sources, updates, stats, masterIndex, checksumTable) } } - public suspend fun export(id: Int, store: Store) { - database.execute { connection -> - connection.prepareStatement( + public fun export(id: Int, storeFactory: (Boolean) -> Store) { + database.executeOnce { connection -> + val legacy = connection.prepareStatement( """ - SELECT archive_id, group_id, data, version - FROM resolved_groups - WHERE master_index_id = ? + SELECT id + FROM crc_tables + WHERE id = ? """.trimIndent() ).use { stmt -> - stmt.fetchSize = BATCH_SIZE stmt.setInt(1, id) stmt.executeQuery().use { rows -> - alloc.buffer(2, 2).use { versionBuf -> - store.create(Js5Archive.ARCHIVESET) - - while (rows.next()) { - val archive = rows.getInt(1) - val group = rows.getInt(2) - val bytes = rows.getBytes(3) - val version = rows.getInt(4) - val versionNull = rows.wasNull() - - versionBuf.clear() - if (!versionNull) { - versionBuf.writeShort(version) - } + rows.next() + } + } + + storeFactory(legacy).use { store -> + if (legacy) { + exportLegacy(connection, id, store) + } else { + export(connection, id, store) + } + } + } + } + + private fun export(connection: Connection, id: Int, store: Store) { + connection.prepareStatement( + """ + SELECT archive_id, group_id, data, version + FROM resolved_groups + WHERE master_index_id = ? + """.trimIndent() + ).use { stmt -> + stmt.fetchSize = BATCH_SIZE + stmt.setInt(1, id) + + stmt.executeQuery().use { rows -> + alloc.buffer(2, 2).use { versionBuf -> + store.create(Js5Archive.ARCHIVESET) - Unpooled.wrappedBuffer(Unpooled.wrappedBuffer(bytes), versionBuf.retain()).use { buf -> - store.write(archive, group, buf) + while (rows.next()) { + val archive = rows.getInt(1) + val group = rows.getInt(2) + val bytes = rows.getBytes(3) + val version = rows.getInt(4) + val versionNull = rows.wasNull() + + versionBuf.clear() + if (!versionNull) { + versionBuf.writeShort(version) + } - // ensure the .idx file exists even if it is empty - if (archive == Js5Archive.ARCHIVESET) { - store.create(group) - } + Unpooled.wrappedBuffer(Unpooled.wrappedBuffer(bytes), versionBuf.retain()).use { buf -> + store.write(archive, group, buf) + + // ensure the .idx file exists even if it is empty + if (archive == Js5Archive.ARCHIVESET) { + store.create(group) } } } @@ -362,6 +407,42 @@ public class CacheExporter @Inject constructor( } } + private fun exportLegacy(connection: Connection, id: Int, store: Store) { + connection.prepareStatement( + """ + SELECT index_id, file_id, data, version + FROM resolved_files + WHERE crc_table_id = ? + """.trimIndent() + ).use { stmt -> + stmt.fetchSize = BATCH_SIZE + stmt.setInt(1, id) + + stmt.executeQuery().use { rows -> + alloc.buffer(2, 2).use { versionBuf -> + store.create(0) + + while (rows.next()) { + val index = rows.getInt(1) + val file = rows.getInt(2) + val bytes = rows.getBytes(3) + val version = rows.getInt(4) + val versionNull = rows.wasNull() + + versionBuf.clear() + if (!versionNull) { + versionBuf.writeShort(version) + } + + Unpooled.wrappedBuffer(Unpooled.wrappedBuffer(bytes), versionBuf.retain()).use { buf -> + store.write(index, file, buf) + } + } + } + } + } + } + public suspend fun exportKeys(id: Int): List { return database.execute { connection -> connection.prepareStatement( diff --git a/archive/src/main/kotlin/org/openrs2/archive/cache/CacheImporter.kt b/archive/src/main/kotlin/org/openrs2/archive/cache/CacheImporter.kt index 750b881af0..6d64d9a406 100644 --- a/archive/src/main/kotlin/org/openrs2/archive/cache/CacheImporter.kt +++ b/archive/src/main/kotlin/org/openrs2/archive/cache/CacheImporter.kt @@ -4,9 +4,13 @@ import com.github.michaelbull.logging.InlineLogger import io.netty.buffer.ByteBuf import io.netty.buffer.ByteBufAllocator import io.netty.buffer.ByteBufUtil +import io.netty.buffer.DefaultByteBufHolder import io.netty.buffer.Unpooled import org.openrs2.buffer.crc32 import org.openrs2.buffer.use +import org.openrs2.cache.ChecksumTable +import org.openrs2.cache.DiskStore +import org.openrs2.cache.JagArchive import org.openrs2.cache.Js5Archive import org.openrs2.cache.Js5Compression import org.openrs2.cache.Js5CompressionType @@ -15,6 +19,7 @@ import org.openrs2.cache.Js5MasterIndex import org.openrs2.cache.MasterIndexFormat import org.openrs2.cache.Store import org.openrs2.cache.StoreCorruptException +import org.openrs2.cache.VersionList import org.openrs2.cache.VersionTrailer import org.openrs2.crypto.Whirlpool import org.openrs2.db.Database @@ -74,6 +79,32 @@ public class CacheImporter @Inject constructor( public val versionTruncated: Boolean ) : Container(compressed, uncompressed) + public abstract class Blob( + buf: ByteBuf + ) : DefaultByteBufHolder(buf) { + public val bytes: ByteArray = ByteBufUtil.getBytes(buf, buf.readerIndex(), buf.readableBytes(), false) + public val crc32: Int = buf.crc32() + public val whirlpool: ByteArray = Whirlpool.whirlpool(bytes) + } + + public class ChecksumTableBlob( + buf: ByteBuf, + public val table: ChecksumTable + ) : Blob(buf) + + public class Archive( + public val id: Int, + buf: ByteBuf, + public val versionList: VersionList? + ) : Blob(buf) + + public class File( + public val index: Int, + public val file: Int, + buf: ByteBuf, + public val version: Int + ) : Blob(buf) + private enum class SourceType { DISK, JS5REMOTE @@ -100,83 +131,101 @@ public class CacheImporter @Inject constructor( val gameId = getGameId(connection, game) - // import master index - val masterIndex = createMasterIndex(store) - val masterIndexId = try { - if (masterIndex.index.entries.isEmpty()) { - throw IOException("Master index empty, cache probably corrupt") - } + if (store is DiskStore && store.legacy) { + importLegacy(connection, store, gameId, buildMajor, buildMinor, timestamp, name, description, url) + } else { + importJs5(connection, store, gameId, buildMajor, buildMinor, timestamp, name, description, url) + } + } + } - addMasterIndex(connection, masterIndex) - } finally { - masterIndex.release() + private fun importJs5( + connection: Connection, + store: Store, + gameId: Int, + buildMajor: Int?, + buildMinor: Int?, + timestamp: Instant?, + name: String?, + description: String?, + url: String? + ) { + // import master index + val masterIndex = createMasterIndex(store) + val masterIndexId = try { + if (masterIndex.index.entries.isEmpty()) { + throw IOException("Master index empty, cache probably corrupt") } - // create source - val sourceId = addSource( - connection, - SourceType.DISK, - masterIndexId, - gameId, - buildMajor, - buildMinor, - timestamp, - name, - description, - url - ) + addMasterIndex(connection, masterIndex) + } finally { + masterIndex.release() + } - // import indexes - val indexes = arrayOfNulls(Js5Archive.ARCHIVESET) - val indexGroups = mutableListOf() - try { - for (archive in store.list(Js5Archive.ARCHIVESET)) { - try { - val indexGroup = readIndex(store, archive) - indexes[archive] = indexGroup.index - indexGroups += indexGroup - } catch (ex: StoreCorruptException) { - // see the comment in Js5MasterIndex::create - logger.warn(ex) { "Skipping corrupt index (archive $archive)" } - } + // create source + val sourceId = addSource( + connection, + SourceType.DISK, + masterIndexId, + gameId, + buildMajor, + buildMinor, + timestamp, + name, + description, + url + ) + + // import indexes + val indexes = arrayOfNulls(Js5Archive.ARCHIVESET) + val indexGroups = mutableListOf() + try { + for (archive in store.list(Js5Archive.ARCHIVESET)) { + try { + val indexGroup = readIndex(store, archive) + indexes[archive] = indexGroup.index + indexGroups += indexGroup + } catch (ex: StoreCorruptException) { + // see the comment in Js5MasterIndex::create + logger.warn(ex) { "Skipping corrupt index (archive $archive)" } } + } - for (index in indexGroups) { - addIndex(connection, sourceId, index) - } - } finally { - indexGroups.forEach(Index::release) + for (index in indexGroups) { + addIndex(connection, sourceId, index) } + } finally { + indexGroups.forEach(Index::release) + } - // import groups - val groups = mutableListOf() - try { - for (archive in store.list()) { - if (archive == Js5Archive.ARCHIVESET) { - continue - } + // import groups + val groups = mutableListOf() + try { + for (archive in store.list()) { + if (archive == Js5Archive.ARCHIVESET) { + continue + } - val index = indexes[archive] + val index = indexes[archive] - for (id in store.list(archive)) { - val group = readGroup(store, archive, index, id) ?: continue - groups += group + for (id in store.list(archive)) { + val group = readGroup(store, archive, index, id) ?: continue + groups += group - if (groups.size >= BATCH_SIZE) { - addGroups(connection, sourceId, groups) + if (groups.size >= BATCH_SIZE) { + addGroups(connection, sourceId, groups) - groups.forEach(Group::release) - groups.clear() - } + groups.forEach(Group::release) + groups.clear() } } + } - if (groups.isNotEmpty()) { - addGroups(connection, sourceId, groups) - } - } finally { - groups.forEach(Group::release) + if (groups.isNotEmpty()) { + addGroups(connection, sourceId, groups) } + } finally { + groups.forEach(Group::release) } } @@ -229,7 +278,7 @@ public class CacheImporter @Inject constructor( buildMinor: Int?, lastId: Int?, timestamp: Instant - ): MasterIndexResult { + ): CacheImporter.MasterIndexResult { return database.execute { connection -> prepare(connection) @@ -471,7 +520,7 @@ public class CacheImporter @Inject constructor( private fun addSource( connection: Connection, type: SourceType, - masterIndexId: Int, + cacheId: Int, gameId: Int, buildMajor: Int?, buildMinor: Int?, @@ -485,10 +534,10 @@ public class CacheImporter @Inject constructor( """ SELECT id FROM sources - WHERE type = 'js5remote' AND master_index_id = ? AND game_id = ? AND build_major = ? AND build_minor IS NOT DISTINCT FROM ? + WHERE type = 'js5remote' AND cache_id = ? AND game_id = ? AND build_major = ? AND build_minor IS NOT DISTINCT FROM ? """.trimIndent() ).use { stmt -> - stmt.setInt(1, masterIndexId) + stmt.setInt(1, cacheId) stmt.setInt(2, gameId) stmt.setInt(3, buildMajor) stmt.setObject(4, buildMinor, Types.INTEGER) @@ -503,13 +552,13 @@ public class CacheImporter @Inject constructor( connection.prepareStatement( """ - INSERT INTO sources (type, master_index_id, game_id, build_major, build_minor, timestamp, name, description, url) + INSERT INTO sources (type, cache_id, game_id, build_major, build_minor, timestamp, name, description, url) VALUES (?::source_type, ?, ?, ?, ?, ?, ?, ?, ?) RETURNING id """.trimIndent() ).use { stmt -> stmt.setString(1, type.toString().lowercase()) - stmt.setInt(2, masterIndexId) + stmt.setInt(2, cacheId) stmt.setInt(3, gameId) stmt.setObject(4, buildMajor, Types.INTEGER) stmt.setObject(5, buildMinor, Types.INTEGER) @@ -747,6 +796,19 @@ public class CacheImporter @Inject constructor( ).use { stmt -> stmt.execute() } + + connection.prepareStatement( + """ + CREATE TEMPORARY TABLE tmp_blobs ( + index INTEGER NOT NULL, + crc32 INTEGER NOT NULL, + whirlpool BYTEA NOT NULL, + data BYTEA NOT NULL + ) ON COMMIT DROP + """.trimIndent() + ).use { stmt -> + stmt.execute() + } } private fun addContainer(connection: Connection, container: Container): Long { @@ -823,6 +885,71 @@ public class CacheImporter @Inject constructor( return ids } + private fun addBlob(connection: Connection, blob: Blob): Long { + return addBlobs(connection, listOf(blob)).single() + } + + private fun addBlobs(connection: Connection, blobs: List): List { + connection.prepareStatement( + """ + TRUNCATE TABLE tmp_blobs + """.trimIndent() + ).use { stmt -> + stmt.execute() + } + + connection.prepareStatement( + """ + INSERT INTO tmp_blobs (index, crc32, whirlpool, data) + VALUES (?, ?, ?, ?) + """.trimIndent() + ).use { stmt -> + for ((i, blob) in blobs.withIndex()) { + stmt.setInt(1, i) + stmt.setInt(2, blob.crc32) + stmt.setBytes(3, blob.whirlpool) + stmt.setBytes(4, blob.bytes) + + stmt.addBatch() + } + + stmt.executeBatch() + } + + connection.prepareStatement( + """ + INSERT INTO blobs (crc32, whirlpool, data) + SELECT t.crc32, t.whirlpool, t.data + FROM tmp_blobs t + LEFT JOIN blobs b ON b.whirlpool = t.whirlpool + WHERE b.whirlpool IS NULL + ON CONFLICT DO NOTHING + """.trimIndent() + ).use { stmt -> + stmt.execute() + } + + val ids = mutableListOf() + + connection.prepareStatement( + """ + SELECT b.id + FROM tmp_blobs t + JOIN blobs b ON b.whirlpool = t.whirlpool + ORDER BY t.index ASC + """.trimIndent() + ).use { stmt -> + stmt.executeQuery().use { rows -> + while (rows.next()) { + ids += rows.getLong(1) + } + } + } + + check(ids.size == blobs.size) + return ids + } + private fun getGameId(connection: Connection, name: String): Int { connection.prepareStatement( """ @@ -858,6 +985,313 @@ public class CacheImporter @Inject constructor( } } + private fun importLegacy( + connection: Connection, + store: Store, + gameId: Int, + buildMajor: Int?, + buildMinor: Int?, + timestamp: Instant?, + name: String?, + description: String?, + url: String? + ) { + // import checksum table + val checksumTable = createChecksumTable(store) + val checksumTableId = try { + if (checksumTable.table.entries.isEmpty()) { + throw IOException("Checksum table empty, cache probably corrupt") + } + + addChecksumTable(connection, checksumTable) + } finally { + checksumTable.release() + } + + // add source + val sourceId = addSource( + connection, + SourceType.DISK, + checksumTableId, + gameId, + buildMajor, + buildMinor, + timestamp, + name, + description, + url + ) + + // import archives and version list + for (id in store.list(0)) { + readArchive(store, id).use { archive -> + addArchive(connection, sourceId, archive) + } + } + + // import files + val files = mutableListOf() + try { + for (index in store.list()) { + if (index == 0) { + continue + } + + for (id in store.list(index)) { + val file = readFile(store, index, id) ?: continue + files += file + + if (files.size >= BATCH_SIZE) { + addFiles(connection, sourceId, files) + + files.forEach(File::release) + files.clear() + } + } + } + + if (files.isNotEmpty()) { + addFiles(connection, sourceId, files) + } + } finally { + files.forEach(File::release) + } + } + + private fun createChecksumTable(store: Store): ChecksumTableBlob { + alloc.buffer().use { buf -> + val table = ChecksumTable.create(store) + table.write(buf) + return ChecksumTableBlob(buf.retain(), table) + } + } + + private fun addChecksumTable( + connection: Connection, + checksumTable: ChecksumTableBlob + ): Int { + val blobId = addBlob(connection, checksumTable) + + connection.prepareStatement( + """ + SELECT id + FROM crc_tables + WHERE blob_id = ? + """.trimIndent() + ).use { stmt -> + stmt.setLong(1, blobId) + + stmt.executeQuery().use { rows -> + if (rows.next()) { + return rows.getInt(1) + } + } + } + + val checksumTableId: Int + + connection.prepareStatement( + """ + INSERT INTO caches (id) + VALUES (DEFAULT) + RETURNING id + """.trimIndent() + ).use { stmt -> + stmt.executeQuery().use { rows -> + check(rows.next()) + checksumTableId = rows.getInt(1) + } + } + + connection.prepareStatement( + """ + INSERT INTO crc_tables (id, blob_id) + VALUES (?, ?) + """.trimIndent() + ).use { stmt -> + stmt.setInt(1, checksumTableId) + stmt.setLong(2, blobId) + + stmt.execute() + } + + connection.prepareStatement( + """ + INSERT INTO crc_table_archives (crc_table_id, archive_id, crc32) + VALUES (?, ?, ?) + """.trimIndent() + ).use { stmt -> + for ((i, entry) in checksumTable.table.entries.withIndex()) { + stmt.setInt(1, checksumTableId) + stmt.setInt(2, i) + stmt.setInt(3, entry) + + stmt.addBatch() + } + + stmt.executeBatch() + } + + return checksumTableId + } + + private fun readArchive(store: Store, id: Int): Archive { + store.read(0, id).use { buf -> + val versionList = if (id == 5) { + JagArchive.unpack(buf.slice()).use { archive -> + VersionList.read(archive) + } + } else { + null + } + + return Archive(id, buf.retain(), versionList) + } + } + + private fun addArchive(connection: Connection, sourceId: Int, archive: Archive) { + val blobId = addBlob(connection, archive) + + connection.prepareStatement( + """ + INSERT INTO archives (archive_id, blob_id) + VALUES (?, ?) + ON CONFLICT DO NOTHING + """.trimIndent() + ).use { stmt -> + stmt.setInt(1, archive.id) + stmt.setLong(2, blobId) + + stmt.execute() + } + + connection.prepareStatement( + """ + INSERT INTO source_archives (source_id, archive_id, blob_id) + VALUES (?, ?, ?) + ON CONFLICT DO NOTHING + """.trimIndent() + ).use { stmt -> + stmt.setInt(1, sourceId) + stmt.setInt(2, archive.id) + stmt.setLong(3, blobId) + + stmt.execute() + } + + val versionList = archive.versionList ?: return + val savepoint = connection.setSavepoint() + + connection.prepareStatement( + """ + INSERT INTO version_lists (blob_id) + VALUES (?) + """.trimIndent() + ).use { stmt -> + try { + stmt.setLong(1, blobId) + + stmt.execute() + } catch (ex: SQLException) { + if (ex.sqlState == PSQLState.UNIQUE_VIOLATION.state) { + connection.rollback(savepoint) + return + } + throw ex + } + } + + connection.prepareStatement( + """ + INSERT INTO version_list_files (blob_id, index_id, file_id, version, crc32) + VALUES (?, ?, ?, ?, ?) + """.trimIndent() + ).use { stmt -> + for ((indexId, files) in versionList.files.withIndex()) { + for ((fileId, file) in files.withIndex()) { + stmt.setLong(1, blobId) + stmt.setInt(2, indexId + 1) + stmt.setInt(3, fileId) + stmt.setInt(4, file.version) + stmt.setInt(5, file.checksum) + + stmt.addBatch() + } + } + + stmt.executeBatch() + } + + connection.prepareStatement( + """ + INSERT INTO version_list_maps (blob_id, map_square, map_file_id, loc_file_id, free_to_play) + VALUES (?, ?, ?, ?, ?) + """.trimIndent() + ).use { stmt -> + for ((mapSquare, map) in versionList.maps) { + stmt.setLong(1, blobId) + stmt.setInt(2, mapSquare) + stmt.setInt(3, map.mapFile) + stmt.setInt(4, map.locFile) + stmt.setBoolean(5, map.freeToPlay) + + stmt.addBatch() + } + + stmt.executeBatch() + } + } + + private fun readFile(store: Store, index: Int, file: Int): File? { + store.read(index, file).use { buf -> + val version = VersionTrailer.strip(buf) ?: return null + return File(index, file, buf.retain(), version) + } + } + + private fun addFiles(connection: Connection, sourceId: Int, files: List) { + val blobIds = addBlobs(connection, files) + + connection.prepareStatement( + """ + INSERT INTO files (index_id, file_id, version, blob_id) + VALUES (?, ?, ?, ?) + ON CONFLICT DO NOTHING + """.trimIndent() + ).use { stmt -> + for ((i, file) in files.withIndex()) { + stmt.setInt(1, file.index) + stmt.setInt(2, file.file) + stmt.setInt(3, file.version) + stmt.setLong(4, blobIds[i]) + + stmt.addBatch() + } + + stmt.executeBatch() + } + + connection.prepareStatement( + """ + INSERT INTO source_files (source_id, index_id, file_id, version, blob_id) + VALUES (?, ?, ?, ?, ?) + ON CONFLICT DO NOTHING + """.trimIndent() + ).use { stmt -> + for ((i, file) in files.withIndex()) { + stmt.setInt(1, sourceId) + stmt.setInt(2, file.index) + stmt.setInt(3, file.file) + stmt.setInt(4, file.version) + stmt.setLong(5, blobIds[i]) + + stmt.addBatch() + } + + stmt.executeBatch() + } + } + public suspend fun refreshViews() { database.execute { connection -> connection.prepareStatement( @@ -875,6 +1309,22 @@ public class CacheImporter @Inject constructor( ).use { stmt -> stmt.execute() } + + connection.prepareStatement( + """ + REFRESH MATERIALIZED VIEW CONCURRENTLY version_list_stats + """.trimIndent() + ).use { stmt -> + stmt.execute() + } + + connection.prepareStatement( + """ + REFRESH MATERIALIZED VIEW CONCURRENTLY crc_table_stats + """.trimIndent() + ).use { stmt -> + stmt.execute() + } } } diff --git a/archive/src/main/kotlin/org/openrs2/archive/cache/ExportCommand.kt b/archive/src/main/kotlin/org/openrs2/archive/cache/ExportCommand.kt index 100100cd52..a435888d7e 100644 --- a/archive/src/main/kotlin/org/openrs2/archive/cache/ExportCommand.kt +++ b/archive/src/main/kotlin/org/openrs2/archive/cache/ExportCommand.kt @@ -23,8 +23,8 @@ public class ExportCommand : CliktCommand(name = "export") { CloseableInjector(Guice.createInjector(ArchiveModule)).use { injector -> val exporter = injector.getInstance(CacheExporter::class.java) - DiskStore.create(output).use { store -> - exporter.export(id, store) + exporter.export(id) { legacy -> + DiskStore.create(output, legacy = legacy) } } } diff --git a/archive/src/main/kotlin/org/openrs2/archive/web/CachesController.kt b/archive/src/main/kotlin/org/openrs2/archive/web/CachesController.kt index f0c3db96dc..329afd2fb7 100644 --- a/archive/src/main/kotlin/org/openrs2/archive/web/CachesController.kt +++ b/archive/src/main/kotlin/org/openrs2/archive/web/CachesController.kt @@ -71,8 +71,8 @@ public class CachesController @Inject constructor( ) call.respondOutputStream(contentType = ContentType.Application.Zip) { - DiskStoreZipWriter(ZipOutputStream(this), alloc = alloc).use { store -> - exporter.export(id, store) + exporter.export(id) { legacy -> + DiskStoreZipWriter(ZipOutputStream(this), alloc = alloc, legacy = legacy) } } } @@ -92,9 +92,8 @@ public class CachesController @Inject constructor( ) call.respondOutputStream(contentType = ContentType.Application.GZip) { - val output = TarArchiveOutputStream(GzipLevelOutputStream(this, Deflater.BEST_COMPRESSION)) - FlatFileStoreTarWriter(output).use { store -> - exporter.export(id, store) + exporter.export(id) { + FlatFileStoreTarWriter(TarArchiveOutputStream(GzipLevelOutputStream(this, Deflater.BEST_COMPRESSION))) } } } diff --git a/archive/src/main/resources/org/openrs2/archive/migrations/V9__legacy.sql b/archive/src/main/resources/org/openrs2/archive/migrations/V9__legacy.sql new file mode 100644 index 0000000000..70323db85f --- /dev/null +++ b/archive/src/main/resources/org/openrs2/archive/migrations/V9__legacy.sql @@ -0,0 +1,215 @@ +-- @formatter:off +CREATE TABLE blobs ( + id BIGSERIAL PRIMARY KEY NOT NULL, + crc32 INTEGER NOT NULL, + whirlpool BYTEA UNIQUE NOT NULL, + data BYTEA NOT NULL +); + +CREATE INDEX ON blobs USING HASH (crc32); + +CREATE TABLE caches ( + id SERIAL PRIMARY KEY NOT NULL +); + +INSERT INTO caches (id) +SELECT id FROM master_indexes; + +SELECT setval('caches_id_seq', MAX(id)) FROM caches; + +ALTER TABLE master_indexes + ADD FOREIGN KEY (id) REFERENCES caches (id), + ALTER COLUMN id DROP DEFAULT; + +DROP SEQUENCE master_indexes_id_seq; + +ALTER TABLE updates RENAME master_index_id TO cache_id; + +ALTER TABLE updates + DROP CONSTRAINT updates_master_index_id_fkey, + ADD FOREIGN KEY (cache_id) REFERENCES caches (id); + +ALTER TABLE sources RENAME master_index_id TO cache_id; + +ALTER TABLE sources + DROP CONSTRAINT sources_master_index_id_fkey, + ADD FOREIGN KEY (cache_id) REFERENCES caches (id); + +CREATE TABLE crc_tables ( + id INTEGER PRIMARY KEY NOT NULL REFERENCES caches (id), + blob_id BIGINT UNIQUE NOT NULL REFERENCES blobs (id) +); + +CREATE TABLE crc_table_archives ( + crc_table_id INTEGER NOT NULL REFERENCES crc_tables (id), + archive_id uint1 NOT NULL, + crc32 INTEGER NOT NULL, + PRIMARY KEY (crc_table_id, archive_id) +); + +CREATE TABLE archives ( + archive_id uint1 NOT NULL, + blob_id BIGINT NOT NULL REFERENCES blobs (id), + PRIMARY KEY (archive_id, blob_id) +); + +CREATE TABLE version_lists ( + blob_id BIGINT PRIMARY KEY NOT NULL REFERENCES blobs (id) +); + +CREATE TABLE version_list_files ( + blob_id BIGINT NOT NULL REFERENCES version_lists (blob_id), + index_id uint1 NOT NULL, + file_id uint2 NOT NULL, + version uint2 NOT NULL, + crc32 INTEGER NOT NULL, + PRIMARY KEY (blob_id, index_id, file_id) +); + +CREATE TABLE version_list_maps ( + blob_id BIGINT NOT NULL REFERENCES version_lists (blob_id), + map_square uint2 NOT NULL, + map_file_id uint2 NOT NULL, + loc_file_id uint2 NOT NULL, + free_to_play BOOLEAN NOT NULL, + PRIMARY KEY (blob_id, map_square) +); + +CREATE TABLE files ( + index_id uint1 NOT NULL, + file_id uint2 NOT NULL, + version uint2 NOT NULL, + blob_id BIGINT NOT NULL REFERENCES blobs (id), + PRIMARY KEY (index_id, file_id, version, blob_id) +); + +CREATE TABLE source_archives ( + source_id INTEGER NOT NULL REFERENCES sources (id), + archive_id uint1 NOT NULL, + blob_id BIGINT NOT NULL REFERENCES blobs (id), + PRIMARY KEY (source_id, archive_id, blob_id), + FOREIGN KEY (archive_id, blob_id) REFERENCES archives (archive_id, blob_id) +); + +CREATE INDEX ON source_archives (archive_id, blob_id); + +CREATE TABLE source_files ( + source_id INTEGER NOT NULL REFERENCES sources (id), + index_id uint1 NOT NULL, + file_id uint2 NOT NULL, + version uint2 NOT NULL, + blob_id BIGINT NOT NULL REFERENCES blobs (id), + PRIMARY KEY (source_id, index_id, file_id, version, blob_id), + FOREIGN KEY (index_id, file_id, version, blob_id) REFERENCES files (index_id, file_id, version, blob_id) +); + +CREATE INDEX ON source_files (index_id, file_id, version, blob_id); + +CREATE FUNCTION resolve_archive(_archive_id uint1, _crc32 INTEGER) RETURNS SETOF blobs AS $$ + SELECT b.* + FROM archives a + JOIN blobs b ON b.id = a.blob_id + WHERE a.archive_id = _archive_id AND b.crc32 = _crc32 + ORDER BY b.id ASC + LIMIT 1; +$$ LANGUAGE SQL STABLE PARALLEL SAFE ROWS 1; + +CREATE FUNCTION resolve_file(_index_id uint1, _file_id uint2, _version uint2, _crc32 INTEGER) RETURNS SETOF blobs AS $$ + SELECT b.* + FROM files f + JOIN blobs b on b.id = f.blob_id + WHERE f.index_id = _index_id AND f.file_id = _file_id AND f.version = _version AND b.crc32 = _crc32 + ORDER BY b.id ASC + LIMIT 1; +$$ LANGUAGE SQL STABLE PARALLEL SAFE ROWS 1; + +CREATE VIEW resolved_archives AS +SELECT c.id AS crc_table_id, a.archive_id, b.data, b.id AS blob_id +FROM crc_tables c +JOIN crc_table_archives a ON a.crc_table_id = c.id +JOIN resolve_archive(a.archive_id, a.crc32) b ON TRUE; + +CREATE VIEW resolved_files (crc_table_id, index_id, file_id, version, data) AS +WITH a AS NOT MATERIALIZED ( + SELECT crc_table_id, archive_id, data, blob_id + FROM resolved_archives +) +SELECT a.crc_table_id, 0::uint1, a.archive_id, NULL, a.data +FROM a +UNION ALL +SELECT a.crc_table_id, vf.index_id, vf.file_id, vf.version, f.data +FROM a +JOIN version_lists v ON v.blob_id = a.blob_id +JOIN version_list_files vf ON vf.blob_id = v.blob_id +JOIN resolve_file(vf.index_id, vf.file_id, vf.version, vf.crc32) f ON TRUE +WHERE a.archive_id = 5; + +ALTER VIEW collisions RENAME TO colliding_groups; + +CREATE VIEW colliding_archives (archive_id, crc32, blobs) AS +SELECT + a.archive_id, + b.crc32, + array_agg(DISTINCT b.id ORDER BY b.id ASC) +FROM archives a +JOIN blobs b ON b.id = a.blob_id +GROUP BY a.archive_id, b.crc32 +HAVING COUNT(DISTINCT b.id) > 1; + +CREATE VIEW colliding_files (index_id, file_id, version, crc32, blobs) AS +SELECT + f.index_id, + f.file_id, + f.version, + b.crc32, + array_agg(DISTINCT b.id ORDER BY b.id ASC) +FROM files f +JOIN blobs b ON b.id = f.blob_id +GROUP BY f.index_id, f.file_id, f.version, b.crc32 +HAVING COUNT(DISTINCT b.id) > 1; + +CREATE MATERIALIZED VIEW version_list_stats AS +SELECT + v.blob_id, + COUNT(*) FILTER (WHERE b.id IS NOT NULL) AS valid_files, + COUNT(*) AS files, + SUM(length(b.data) + 2) FILTER (WHERE b.id IS NOT NULL) AS size, + SUM(group_blocks(vf.file_id, length(b.data) + 2)) AS blocks +FROM version_lists v +JOIN version_list_files vf ON vf.blob_id = v.blob_id +LEFT JOIN resolve_file(vf.index_id, vf.file_id, vf.version, vf.crc32) b ON TRUE +GROUP BY v.blob_id; + +CREATE UNIQUE INDEX ON version_list_stats (blob_id); + +CREATE MATERIALIZED VIEW crc_table_stats AS +SELECT + c.id AS crc_table_id, + COUNT(*) FILTER (WHERE b.id IS NOT NULL AND a.crc32 <> 0) AS valid_archives, + COUNT(*) FILTER (WHERE a.crc32 <> 0) AS archives, + SUM(COALESCE(s.valid_files, 0)) AS valid_files, + SUM(COALESCE(s.files, 0)) AS files, + SUM(COALESCE(s.size, 0)) + SUM(COALESCE(length(b.data), 0)) AS size, + SUM(COALESCE(s.blocks, 0)) + SUM(COALESCE(group_blocks(a.archive_id, length(b.data)), 0)) AS blocks +FROM crc_tables c +LEFT JOIN crc_table_archives a ON a.crc_table_id = c.id +LEFT JOIN resolve_archive(a.archive_id, a.crc32) b ON TRUE +LEFT JOIN version_list_stats s ON s.blob_id = b.id +GROUP BY c.id; + +CREATE UNIQUE INDEX ON crc_table_stats (crc_table_id); + +CREATE VIEW cache_stats AS +SELECT + c.id AS cache_id, + COALESCE(ms.valid_indexes, cs.valid_archives) AS valid_indexes, + COALESCE(ms.indexes, cs.archives) AS indexes, + COALESCE(ms.valid_groups, cs.valid_files) AS valid_groups, + COALESCE(ms.groups, cs.files) AS groups, + COALESCE(ms.valid_keys, 0) AS valid_keys, + COALESCE(ms.keys, 0) AS keys, + COALESCE(ms.size, cs.size) AS size, + COALESCE(ms.blocks, cs.blocks) AS blocks +FROM caches c +LEFT JOIN master_index_stats ms ON ms.master_index_id = c.id +LEFT JOIN crc_table_stats cs ON cs.crc_table_id = c.id; diff --git a/archive/src/main/resources/org/openrs2/archive/templates/caches/show.html b/archive/src/main/resources/org/openrs2/archive/templates/caches/show.html index cc7a5ee8ed..67b4d0fe20 100644 --- a/archive/src/main/resources/org/openrs2/archive/templates/caches/show.html +++ b/archive/src/main/resources/org/openrs2/archive/templates/caches/show.html @@ -16,7 +16,7 @@ - + @@ -110,49 +110,74 @@
FormatVERSIONEDVERSIONED
Indexes
-

Master index

+
+

Master index

-
- - - - - - - - - - - - - - - - - - - - - -
ArchiveVersionChecksumDigestGroupsTotal uncompressed length
00 - 0 - - - - - - - -
+
+ + + + + + + + + + + + + + + + + + + + + +
ArchiveVersionChecksumDigestGroupsTotal uncompressed length
00 + 0 + + + + + + + +
+
+
+ +
+

Checksum table

+ +
+ + + + + + + + + + + + + +
ArchiveChecksum
0 + 0 +
+