Add initial support for archiving legacy caches

Signed-off-by: Graham <gpe@openrs2.org>
bzip2
Graham 3 years ago
parent ebfe01e4c4
commit f079c415f5
  1. 223
      archive/src/main/kotlin/org/openrs2/archive/cache/CacheExporter.kt
  2. 584
      archive/src/main/kotlin/org/openrs2/archive/cache/CacheImporter.kt
  3. 4
      archive/src/main/kotlin/org/openrs2/archive/cache/ExportCommand.kt
  4. 9
      archive/src/main/kotlin/org/openrs2/archive/web/CachesController.kt
  5. 215
      archive/src/main/resources/org/openrs2/archive/migrations/V9__legacy.sql
  6. 111
      archive/src/main/resources/org/openrs2/archive/templates/caches/show.html

@ -4,6 +4,7 @@ import com.fasterxml.jackson.annotation.JsonProperty
import io.netty.buffer.ByteBufAllocator
import io.netty.buffer.Unpooled
import org.openrs2.buffer.use
import org.openrs2.cache.ChecksumTable
import org.openrs2.cache.DiskStore
import org.openrs2.cache.Js5Archive
import org.openrs2.cache.Js5Compression
@ -13,6 +14,7 @@ import org.openrs2.cache.Store
import org.openrs2.crypto.XteaKey
import org.openrs2.db.Database
import org.postgresql.util.PGobject
import java.sql.Connection
import java.time.Instant
import java.util.SortedSet
import javax.inject.Inject
@ -108,7 +110,8 @@ public class CacheExporter @Inject constructor(
val sources: List<Source>,
val updates: List<String>,
val stats: Stats?,
val masterIndex: Js5MasterIndex
val masterIndex: Js5MasterIndex?,
val checksumTable: ChecksumTable?
)
public data class Source(
@ -136,25 +139,25 @@ public class CacheExporter @Inject constructor(
SELECT *
FROM (
SELECT
m.id,
c.id,
g.name,
array_remove(array_agg(DISTINCT ROW(s.build_major, s.build_minor)::build ORDER BY ROW(s.build_major, s.build_minor)::build ASC), NULL) builds,
MIN(s.timestamp) AS timestamp,
array_remove(array_agg(DISTINCT s.name ORDER BY s.name ASC), NULL) sources,
ms.valid_indexes,
ms.indexes,
ms.valid_groups,
ms.groups,
ms.valid_keys,
ms.keys,
ms.size,
ms.blocks
FROM master_indexes m
JOIN sources s ON s.master_index_id = m.id
cs.valid_indexes,
cs.indexes,
cs.valid_groups,
cs.groups,
cs.valid_keys,
cs.keys,
cs.size,
cs.blocks
FROM caches c
JOIN sources s ON s.cache_id = c.id
JOIN games g ON g.id = s.game_id
LEFT JOIN master_index_stats ms ON ms.master_index_id = m.id
GROUP BY m.id, g.name, ms.valid_indexes, ms.indexes, ms.valid_groups, ms.groups, ms.valid_keys, ms.keys,
ms.size, ms.blocks
LEFT JOIN cache_stats cs ON cs.cache_id = c.id
GROUP BY c.id, g.name, cs.valid_indexes, cs.indexes, cs.valid_groups, cs.groups, cs.valid_keys, cs.keys,
cs.size, cs.blocks
) t
ORDER BY t.name ASC, t.builds[1] ASC, t.timestamp ASC
""".trimIndent()
@ -201,26 +204,31 @@ public class CacheExporter @Inject constructor(
public suspend fun get(id: Int): Cache? {
return database.execute { connection ->
val masterIndex: Js5MasterIndex
val masterIndex: Js5MasterIndex?
val checksumTable: ChecksumTable?
val stats: Stats?
connection.prepareStatement(
"""
SELECT
m.format,
c.data,
ms.valid_indexes,
ms.indexes,
ms.valid_groups,
ms.groups,
ms.valid_keys,
ms.keys,
ms.size,
ms.blocks
FROM master_indexes m
JOIN containers c ON c.id = m.container_id
LEFT JOIN master_index_stats ms ON ms.master_index_id = m.id
WHERE m.id = ?
mc.data,
b.data,
cs.valid_indexes,
cs.indexes,
cs.valid_groups,
cs.groups,
cs.valid_keys,
cs.keys,
cs.size,
cs.blocks
FROM caches c
LEFT JOIN master_indexes m ON m.id = c.id
LEFT JOIN containers mc ON mc.id = m.container_id
LEFT JOIN crc_tables t ON t.id = c.id
LEFT JOIN blobs b ON b.id = t.blob_id
LEFT JOIN cache_stats cs ON cs.cache_id = c.id
WHERE c.id = ?
""".trimIndent()
).use { stmt ->
stmt.setInt(1, id)
@ -230,25 +238,38 @@ public class CacheExporter @Inject constructor(
return@execute null
}
val format = MasterIndexFormat.valueOf(rows.getString(1).uppercase())
val formatString = rows.getString(1)
masterIndex = if (formatString != null) {
Unpooled.wrappedBuffer(rows.getBytes(2)).use { compressed ->
Js5Compression.uncompress(compressed).use { uncompressed ->
val format = MasterIndexFormat.valueOf(formatString.uppercase())
Js5MasterIndex.readUnverified(uncompressed, format)
}
}
} else {
null
}
masterIndex = Unpooled.wrappedBuffer(rows.getBytes(2)).use { compressed ->
Js5Compression.uncompress(compressed).use { uncompressed ->
Js5MasterIndex.readUnverified(uncompressed, format)
val blob = rows.getBytes(3)
checksumTable = if (blob != null) {
Unpooled.wrappedBuffer(blob).use { buf ->
ChecksumTable.read(buf)
}
} else {
null
}
val validIndexes = rows.getLong(3)
val validIndexes = rows.getLong(4)
stats = if (rows.wasNull()) {
null
} else {
val indexes = rows.getLong(4)
val validGroups = rows.getLong(5)
val groups = rows.getLong(6)
val validKeys = rows.getLong(7)
val keys = rows.getLong(8)
val size = rows.getLong(9)
val blocks = rows.getLong(10)
val indexes = rows.getLong(5)
val validGroups = rows.getLong(6)
val groups = rows.getLong(7)
val validKeys = rows.getLong(8)
val keys = rows.getLong(9)
val size = rows.getLong(10)
val blocks = rows.getLong(11)
Stats(validIndexes, indexes, validGroups, groups, validKeys, keys, size, blocks)
}
}
@ -261,7 +282,7 @@ public class CacheExporter @Inject constructor(
SELECT g.name, s.build_major, s.build_minor, s.timestamp, s.name, s.description, s.url
FROM sources s
JOIN games g ON g.id = s.game_id
WHERE s.master_index_id = ?
WHERE s.cache_id = ?
ORDER BY s.name ASC
""".trimIndent()
).use { stmt ->
@ -303,7 +324,7 @@ public class CacheExporter @Inject constructor(
"""
SELECT url
FROM updates
WHERE master_index_id = ?
WHERE cache_id = ?
""".trimIndent()
).use { stmt ->
stmt.setInt(1, id)
@ -315,45 +336,69 @@ public class CacheExporter @Inject constructor(
}
}
Cache(id, sources, updates, stats, masterIndex)
Cache(id, sources, updates, stats, masterIndex, checksumTable)
}
}
public suspend fun export(id: Int, store: Store) {
database.execute { connection ->
connection.prepareStatement(
public fun export(id: Int, storeFactory: (Boolean) -> Store) {
database.executeOnce { connection ->
val legacy = connection.prepareStatement(
"""
SELECT archive_id, group_id, data, version
FROM resolved_groups
WHERE master_index_id = ?
SELECT id
FROM crc_tables
WHERE id = ?
""".trimIndent()
).use { stmt ->
stmt.fetchSize = BATCH_SIZE
stmt.setInt(1, id)
stmt.executeQuery().use { rows ->
alloc.buffer(2, 2).use { versionBuf ->
store.create(Js5Archive.ARCHIVESET)
while (rows.next()) {
val archive = rows.getInt(1)
val group = rows.getInt(2)
val bytes = rows.getBytes(3)
val version = rows.getInt(4)
val versionNull = rows.wasNull()
versionBuf.clear()
if (!versionNull) {
versionBuf.writeShort(version)
}
rows.next()
}
}
storeFactory(legacy).use { store ->
if (legacy) {
exportLegacy(connection, id, store)
} else {
export(connection, id, store)
}
}
}
}
private fun export(connection: Connection, id: Int, store: Store) {
connection.prepareStatement(
"""
SELECT archive_id, group_id, data, version
FROM resolved_groups
WHERE master_index_id = ?
""".trimIndent()
).use { stmt ->
stmt.fetchSize = BATCH_SIZE
stmt.setInt(1, id)
stmt.executeQuery().use { rows ->
alloc.buffer(2, 2).use { versionBuf ->
store.create(Js5Archive.ARCHIVESET)
Unpooled.wrappedBuffer(Unpooled.wrappedBuffer(bytes), versionBuf.retain()).use { buf ->
store.write(archive, group, buf)
while (rows.next()) {
val archive = rows.getInt(1)
val group = rows.getInt(2)
val bytes = rows.getBytes(3)
val version = rows.getInt(4)
val versionNull = rows.wasNull()
versionBuf.clear()
if (!versionNull) {
versionBuf.writeShort(version)
}
// ensure the .idx file exists even if it is empty
if (archive == Js5Archive.ARCHIVESET) {
store.create(group)
}
Unpooled.wrappedBuffer(Unpooled.wrappedBuffer(bytes), versionBuf.retain()).use { buf ->
store.write(archive, group, buf)
// ensure the .idx file exists even if it is empty
if (archive == Js5Archive.ARCHIVESET) {
store.create(group)
}
}
}
@ -362,6 +407,42 @@ public class CacheExporter @Inject constructor(
}
}
private fun exportLegacy(connection: Connection, id: Int, store: Store) {
connection.prepareStatement(
"""
SELECT index_id, file_id, data, version
FROM resolved_files
WHERE crc_table_id = ?
""".trimIndent()
).use { stmt ->
stmt.fetchSize = BATCH_SIZE
stmt.setInt(1, id)
stmt.executeQuery().use { rows ->
alloc.buffer(2, 2).use { versionBuf ->
store.create(0)
while (rows.next()) {
val index = rows.getInt(1)
val file = rows.getInt(2)
val bytes = rows.getBytes(3)
val version = rows.getInt(4)
val versionNull = rows.wasNull()
versionBuf.clear()
if (!versionNull) {
versionBuf.writeShort(version)
}
Unpooled.wrappedBuffer(Unpooled.wrappedBuffer(bytes), versionBuf.retain()).use { buf ->
store.write(index, file, buf)
}
}
}
}
}
}
public suspend fun exportKeys(id: Int): List<Key> {
return database.execute { connection ->
connection.prepareStatement(

@ -4,9 +4,13 @@ import com.github.michaelbull.logging.InlineLogger
import io.netty.buffer.ByteBuf
import io.netty.buffer.ByteBufAllocator
import io.netty.buffer.ByteBufUtil
import io.netty.buffer.DefaultByteBufHolder
import io.netty.buffer.Unpooled
import org.openrs2.buffer.crc32
import org.openrs2.buffer.use
import org.openrs2.cache.ChecksumTable
import org.openrs2.cache.DiskStore
import org.openrs2.cache.JagArchive
import org.openrs2.cache.Js5Archive
import org.openrs2.cache.Js5Compression
import org.openrs2.cache.Js5CompressionType
@ -15,6 +19,7 @@ import org.openrs2.cache.Js5MasterIndex
import org.openrs2.cache.MasterIndexFormat
import org.openrs2.cache.Store
import org.openrs2.cache.StoreCorruptException
import org.openrs2.cache.VersionList
import org.openrs2.cache.VersionTrailer
import org.openrs2.crypto.Whirlpool
import org.openrs2.db.Database
@ -74,6 +79,32 @@ public class CacheImporter @Inject constructor(
public val versionTruncated: Boolean
) : Container(compressed, uncompressed)
public abstract class Blob(
buf: ByteBuf
) : DefaultByteBufHolder(buf) {
public val bytes: ByteArray = ByteBufUtil.getBytes(buf, buf.readerIndex(), buf.readableBytes(), false)
public val crc32: Int = buf.crc32()
public val whirlpool: ByteArray = Whirlpool.whirlpool(bytes)
}
public class ChecksumTableBlob(
buf: ByteBuf,
public val table: ChecksumTable
) : Blob(buf)
public class Archive(
public val id: Int,
buf: ByteBuf,
public val versionList: VersionList?
) : Blob(buf)
public class File(
public val index: Int,
public val file: Int,
buf: ByteBuf,
public val version: Int
) : Blob(buf)
private enum class SourceType {
DISK,
JS5REMOTE
@ -100,83 +131,101 @@ public class CacheImporter @Inject constructor(
val gameId = getGameId(connection, game)
// import master index
val masterIndex = createMasterIndex(store)
val masterIndexId = try {
if (masterIndex.index.entries.isEmpty()) {
throw IOException("Master index empty, cache probably corrupt")
}
if (store is DiskStore && store.legacy) {
importLegacy(connection, store, gameId, buildMajor, buildMinor, timestamp, name, description, url)
} else {
importJs5(connection, store, gameId, buildMajor, buildMinor, timestamp, name, description, url)
}
}
}
addMasterIndex(connection, masterIndex)
} finally {
masterIndex.release()
private fun importJs5(
connection: Connection,
store: Store,
gameId: Int,
buildMajor: Int?,
buildMinor: Int?,
timestamp: Instant?,
name: String?,
description: String?,
url: String?
) {
// import master index
val masterIndex = createMasterIndex(store)
val masterIndexId = try {
if (masterIndex.index.entries.isEmpty()) {
throw IOException("Master index empty, cache probably corrupt")
}
// create source
val sourceId = addSource(
connection,
SourceType.DISK,
masterIndexId,
gameId,
buildMajor,
buildMinor,
timestamp,
name,
description,
url
)
addMasterIndex(connection, masterIndex)
} finally {
masterIndex.release()
}
// import indexes
val indexes = arrayOfNulls<Js5Index>(Js5Archive.ARCHIVESET)
val indexGroups = mutableListOf<Index>()
try {
for (archive in store.list(Js5Archive.ARCHIVESET)) {
try {
val indexGroup = readIndex(store, archive)
indexes[archive] = indexGroup.index
indexGroups += indexGroup
} catch (ex: StoreCorruptException) {
// see the comment in Js5MasterIndex::create
logger.warn(ex) { "Skipping corrupt index (archive $archive)" }
}
// create source
val sourceId = addSource(
connection,
SourceType.DISK,
masterIndexId,
gameId,
buildMajor,
buildMinor,
timestamp,
name,
description,
url
)
// import indexes
val indexes = arrayOfNulls<Js5Index>(Js5Archive.ARCHIVESET)
val indexGroups = mutableListOf<Index>()
try {
for (archive in store.list(Js5Archive.ARCHIVESET)) {
try {
val indexGroup = readIndex(store, archive)
indexes[archive] = indexGroup.index
indexGroups += indexGroup
} catch (ex: StoreCorruptException) {
// see the comment in Js5MasterIndex::create
logger.warn(ex) { "Skipping corrupt index (archive $archive)" }
}
}
for (index in indexGroups) {
addIndex(connection, sourceId, index)
}
} finally {
indexGroups.forEach(Index::release)
for (index in indexGroups) {
addIndex(connection, sourceId, index)
}
} finally {
indexGroups.forEach(Index::release)
}
// import groups
val groups = mutableListOf<Group>()
try {
for (archive in store.list()) {
if (archive == Js5Archive.ARCHIVESET) {
continue
}
// import groups
val groups = mutableListOf<Group>()
try {
for (archive in store.list()) {
if (archive == Js5Archive.ARCHIVESET) {
continue
}
val index = indexes[archive]
val index = indexes[archive]
for (id in store.list(archive)) {
val group = readGroup(store, archive, index, id) ?: continue
groups += group
for (id in store.list(archive)) {
val group = readGroup(store, archive, index, id) ?: continue
groups += group
if (groups.size >= BATCH_SIZE) {
addGroups(connection, sourceId, groups)
if (groups.size >= BATCH_SIZE) {
addGroups(connection, sourceId, groups)
groups.forEach(Group::release)
groups.clear()
}
groups.forEach(Group::release)
groups.clear()
}
}
}
if (groups.isNotEmpty()) {
addGroups(connection, sourceId, groups)
}
} finally {
groups.forEach(Group::release)
if (groups.isNotEmpty()) {
addGroups(connection, sourceId, groups)
}
} finally {
groups.forEach(Group::release)
}
}
@ -229,7 +278,7 @@ public class CacheImporter @Inject constructor(
buildMinor: Int?,
lastId: Int?,
timestamp: Instant
): MasterIndexResult {
): CacheImporter.MasterIndexResult {
return database.execute { connection ->
prepare(connection)
@ -471,7 +520,7 @@ public class CacheImporter @Inject constructor(
private fun addSource(
connection: Connection,
type: SourceType,
masterIndexId: Int,
cacheId: Int,
gameId: Int,
buildMajor: Int?,
buildMinor: Int?,
@ -485,10 +534,10 @@ public class CacheImporter @Inject constructor(
"""
SELECT id
FROM sources
WHERE type = 'js5remote' AND master_index_id = ? AND game_id = ? AND build_major = ? AND build_minor IS NOT DISTINCT FROM ?
WHERE type = 'js5remote' AND cache_id = ? AND game_id = ? AND build_major = ? AND build_minor IS NOT DISTINCT FROM ?
""".trimIndent()
).use { stmt ->
stmt.setInt(1, masterIndexId)
stmt.setInt(1, cacheId)
stmt.setInt(2, gameId)
stmt.setInt(3, buildMajor)
stmt.setObject(4, buildMinor, Types.INTEGER)
@ -503,13 +552,13 @@ public class CacheImporter @Inject constructor(
connection.prepareStatement(
"""
INSERT INTO sources (type, master_index_id, game_id, build_major, build_minor, timestamp, name, description, url)
INSERT INTO sources (type, cache_id, game_id, build_major, build_minor, timestamp, name, description, url)
VALUES (?::source_type, ?, ?, ?, ?, ?, ?, ?, ?)
RETURNING id
""".trimIndent()
).use { stmt ->
stmt.setString(1, type.toString().lowercase())
stmt.setInt(2, masterIndexId)
stmt.setInt(2, cacheId)
stmt.setInt(3, gameId)
stmt.setObject(4, buildMajor, Types.INTEGER)
stmt.setObject(5, buildMinor, Types.INTEGER)
@ -747,6 +796,19 @@ public class CacheImporter @Inject constructor(
).use { stmt ->
stmt.execute()
}
connection.prepareStatement(
"""
CREATE TEMPORARY TABLE tmp_blobs (
index INTEGER NOT NULL,
crc32 INTEGER NOT NULL,
whirlpool BYTEA NOT NULL,
data BYTEA NOT NULL
) ON COMMIT DROP
""".trimIndent()
).use { stmt ->
stmt.execute()
}
}
private fun addContainer(connection: Connection, container: Container): Long {
@ -823,6 +885,71 @@ public class CacheImporter @Inject constructor(
return ids
}
private fun addBlob(connection: Connection, blob: Blob): Long {
return addBlobs(connection, listOf(blob)).single()
}
private fun addBlobs(connection: Connection, blobs: List<Blob>): List<Long> {
connection.prepareStatement(
"""
TRUNCATE TABLE tmp_blobs
""".trimIndent()
).use { stmt ->
stmt.execute()
}
connection.prepareStatement(
"""
INSERT INTO tmp_blobs (index, crc32, whirlpool, data)
VALUES (?, ?, ?, ?)
""".trimIndent()
).use { stmt ->
for ((i, blob) in blobs.withIndex()) {
stmt.setInt(1, i)
stmt.setInt(2, blob.crc32)
stmt.setBytes(3, blob.whirlpool)
stmt.setBytes(4, blob.bytes)
stmt.addBatch()
}
stmt.executeBatch()
}
connection.prepareStatement(
"""
INSERT INTO blobs (crc32, whirlpool, data)
SELECT t.crc32, t.whirlpool, t.data
FROM tmp_blobs t
LEFT JOIN blobs b ON b.whirlpool = t.whirlpool
WHERE b.whirlpool IS NULL
ON CONFLICT DO NOTHING
""".trimIndent()
).use { stmt ->
stmt.execute()
}
val ids = mutableListOf<Long>()
connection.prepareStatement(
"""
SELECT b.id
FROM tmp_blobs t
JOIN blobs b ON b.whirlpool = t.whirlpool
ORDER BY t.index ASC
""".trimIndent()
).use { stmt ->
stmt.executeQuery().use { rows ->
while (rows.next()) {
ids += rows.getLong(1)
}
}
}
check(ids.size == blobs.size)
return ids
}
private fun getGameId(connection: Connection, name: String): Int {
connection.prepareStatement(
"""
@ -858,6 +985,313 @@ public class CacheImporter @Inject constructor(
}
}
private fun importLegacy(
connection: Connection,
store: Store,
gameId: Int,
buildMajor: Int?,
buildMinor: Int?,
timestamp: Instant?,
name: String?,
description: String?,
url: String?
) {
// import checksum table
val checksumTable = createChecksumTable(store)
val checksumTableId = try {
if (checksumTable.table.entries.isEmpty()) {
throw IOException("Checksum table empty, cache probably corrupt")
}
addChecksumTable(connection, checksumTable)
} finally {
checksumTable.release()
}
// add source
val sourceId = addSource(
connection,
SourceType.DISK,
checksumTableId,
gameId,
buildMajor,
buildMinor,
timestamp,
name,
description,
url
)
// import archives and version list
for (id in store.list(0)) {
readArchive(store, id).use { archive ->
addArchive(connection, sourceId, archive)
}
}
// import files
val files = mutableListOf<File>()
try {
for (index in store.list()) {
if (index == 0) {
continue
}
for (id in store.list(index)) {
val file = readFile(store, index, id) ?: continue
files += file
if (files.size >= BATCH_SIZE) {
addFiles(connection, sourceId, files)
files.forEach(File::release)
files.clear()
}
}
}
if (files.isNotEmpty()) {
addFiles(connection, sourceId, files)
}
} finally {
files.forEach(File::release)
}
}
private fun createChecksumTable(store: Store): ChecksumTableBlob {
alloc.buffer().use { buf ->
val table = ChecksumTable.create(store)
table.write(buf)
return ChecksumTableBlob(buf.retain(), table)
}
}
private fun addChecksumTable(
connection: Connection,
checksumTable: ChecksumTableBlob
): Int {
val blobId = addBlob(connection, checksumTable)
connection.prepareStatement(
"""
SELECT id
FROM crc_tables
WHERE blob_id = ?
""".trimIndent()
).use { stmt ->
stmt.setLong(1, blobId)
stmt.executeQuery().use { rows ->
if (rows.next()) {
return rows.getInt(1)
}
}
}
val checksumTableId: Int
connection.prepareStatement(
"""
INSERT INTO caches (id)
VALUES (DEFAULT)
RETURNING id
""".trimIndent()
).use { stmt ->
stmt.executeQuery().use { rows ->
check(rows.next())
checksumTableId = rows.getInt(1)
}
}
connection.prepareStatement(
"""
INSERT INTO crc_tables (id, blob_id)
VALUES (?, ?)
""".trimIndent()
).use { stmt ->
stmt.setInt(1, checksumTableId)
stmt.setLong(2, blobId)
stmt.execute()
}
connection.prepareStatement(
"""
INSERT INTO crc_table_archives (crc_table_id, archive_id, crc32)
VALUES (?, ?, ?)
""".trimIndent()
).use { stmt ->
for ((i, entry) in checksumTable.table.entries.withIndex()) {
stmt.setInt(1, checksumTableId)
stmt.setInt(2, i)
stmt.setInt(3, entry)
stmt.addBatch()
}
stmt.executeBatch()
}
return checksumTableId
}
private fun readArchive(store: Store, id: Int): Archive {
store.read(0, id).use { buf ->
val versionList = if (id == 5) {
JagArchive.unpack(buf.slice()).use { archive ->
VersionList.read(archive)
}
} else {
null
}
return Archive(id, buf.retain(), versionList)
}
}
private fun addArchive(connection: Connection, sourceId: Int, archive: Archive) {
val blobId = addBlob(connection, archive)
connection.prepareStatement(
"""
INSERT INTO archives (archive_id, blob_id)
VALUES (?, ?)
ON CONFLICT DO NOTHING
""".trimIndent()
).use { stmt ->
stmt.setInt(1, archive.id)
stmt.setLong(2, blobId)
stmt.execute()
}
connection.prepareStatement(
"""
INSERT INTO source_archives (source_id, archive_id, blob_id)
VALUES (?, ?, ?)
ON CONFLICT DO NOTHING
""".trimIndent()
).use { stmt ->
stmt.setInt(1, sourceId)
stmt.setInt(2, archive.id)
stmt.setLong(3, blobId)
stmt.execute()
}
val versionList = archive.versionList ?: return
val savepoint = connection.setSavepoint()
connection.prepareStatement(
"""
INSERT INTO version_lists (blob_id)
VALUES (?)
""".trimIndent()
).use { stmt ->
try {
stmt.setLong(1, blobId)
stmt.execute()
} catch (ex: SQLException) {
if (ex.sqlState == PSQLState.UNIQUE_VIOLATION.state) {
connection.rollback(savepoint)
return
}
throw ex
}
}
connection.prepareStatement(
"""
INSERT INTO version_list_files (blob_id, index_id, file_id, version, crc32)
VALUES (?, ?, ?, ?, ?)
""".trimIndent()
).use { stmt ->
for ((indexId, files) in versionList.files.withIndex()) {
for ((fileId, file) in files.withIndex()) {
stmt.setLong(1, blobId)
stmt.setInt(2, indexId + 1)
stmt.setInt(3, fileId)
stmt.setInt(4, file.version)
stmt.setInt(5, file.checksum)
stmt.addBatch()
}
}
stmt.executeBatch()
}
connection.prepareStatement(
"""
INSERT INTO version_list_maps (blob_id, map_square, map_file_id, loc_file_id, free_to_play)
VALUES (?, ?, ?, ?, ?)
""".trimIndent()
).use { stmt ->
for ((mapSquare, map) in versionList.maps) {
stmt.setLong(1, blobId)
stmt.setInt(2, mapSquare)
stmt.setInt(3, map.mapFile)
stmt.setInt(4, map.locFile)
stmt.setBoolean(5, map.freeToPlay)
stmt.addBatch()
}
stmt.executeBatch()
}
}
private fun readFile(store: Store, index: Int, file: Int): File? {
store.read(index, file).use { buf ->
val version = VersionTrailer.strip(buf) ?: return null
return File(index, file, buf.retain(), version)
}
}
private fun addFiles(connection: Connection, sourceId: Int, files: List<File>) {
val blobIds = addBlobs(connection, files)
connection.prepareStatement(
"""
INSERT INTO files (index_id, file_id, version, blob_id)
VALUES (?, ?, ?, ?)
ON CONFLICT DO NOTHING
""".trimIndent()
).use { stmt ->
for ((i, file) in files.withIndex()) {
stmt.setInt(1, file.index)
stmt.setInt(2, file.file)
stmt.setInt(3, file.version)
stmt.setLong(4, blobIds[i])
stmt.addBatch()
}
stmt.executeBatch()
}
connection.prepareStatement(
"""
INSERT INTO source_files (source_id, index_id, file_id, version, blob_id)
VALUES (?, ?, ?, ?, ?)
ON CONFLICT DO NOTHING
""".trimIndent()
).use { stmt ->
for ((i, file) in files.withIndex()) {
stmt.setInt(1, sourceId)
stmt.setInt(2, file.index)
stmt.setInt(3, file.file)
stmt.setInt(4, file.version)
stmt.setLong(5, blobIds[i])
stmt.addBatch()
}
stmt.executeBatch()
}
}
public suspend fun refreshViews() {
database.execute { connection ->
connection.prepareStatement(
@ -875,6 +1309,22 @@ public class CacheImporter @Inject constructor(
).use { stmt ->
stmt.execute()
}
connection.prepareStatement(
"""
REFRESH MATERIALIZED VIEW CONCURRENTLY version_list_stats
""".trimIndent()
).use { stmt ->
stmt.execute()
}
connection.prepareStatement(
"""
REFRESH MATERIALIZED VIEW CONCURRENTLY crc_table_stats
""".trimIndent()
).use { stmt ->
stmt.execute()
}
}
}

@ -23,8 +23,8 @@ public class ExportCommand : CliktCommand(name = "export") {
CloseableInjector(Guice.createInjector(ArchiveModule)).use { injector ->
val exporter = injector.getInstance(CacheExporter::class.java)
DiskStore.create(output).use { store ->
exporter.export(id, store)
exporter.export(id) { legacy ->
DiskStore.create(output, legacy = legacy)
}
}
}

@ -71,8 +71,8 @@ public class CachesController @Inject constructor(
)
call.respondOutputStream(contentType = ContentType.Application.Zip) {
DiskStoreZipWriter(ZipOutputStream(this), alloc = alloc).use { store ->
exporter.export(id, store)
exporter.export(id) { legacy ->
DiskStoreZipWriter(ZipOutputStream(this), alloc = alloc, legacy = legacy)
}
}
}
@ -92,9 +92,8 @@ public class CachesController @Inject constructor(
)
call.respondOutputStream(contentType = ContentType.Application.GZip) {
val output = TarArchiveOutputStream(GzipLevelOutputStream(this, Deflater.BEST_COMPRESSION))
FlatFileStoreTarWriter(output).use { store ->
exporter.export(id, store)
exporter.export(id) {
FlatFileStoreTarWriter(TarArchiveOutputStream(GzipLevelOutputStream(this, Deflater.BEST_COMPRESSION)))
}
}
}

@ -0,0 +1,215 @@
-- @formatter:off
CREATE TABLE blobs (
id BIGSERIAL PRIMARY KEY NOT NULL,
crc32 INTEGER NOT NULL,
whirlpool BYTEA UNIQUE NOT NULL,
data BYTEA NOT NULL
);
CREATE INDEX ON blobs USING HASH (crc32);
CREATE TABLE caches (
id SERIAL PRIMARY KEY NOT NULL
);
INSERT INTO caches (id)
SELECT id FROM master_indexes;
SELECT setval('caches_id_seq', MAX(id)) FROM caches;
ALTER TABLE master_indexes
ADD FOREIGN KEY (id) REFERENCES caches (id),
ALTER COLUMN id DROP DEFAULT;
DROP SEQUENCE master_indexes_id_seq;
ALTER TABLE updates RENAME master_index_id TO cache_id;
ALTER TABLE updates
DROP CONSTRAINT updates_master_index_id_fkey,
ADD FOREIGN KEY (cache_id) REFERENCES caches (id);
ALTER TABLE sources RENAME master_index_id TO cache_id;
ALTER TABLE sources
DROP CONSTRAINT sources_master_index_id_fkey,
ADD FOREIGN KEY (cache_id) REFERENCES caches (id);
CREATE TABLE crc_tables (
id INTEGER PRIMARY KEY NOT NULL REFERENCES caches (id),
blob_id BIGINT UNIQUE NOT NULL REFERENCES blobs (id)
);
CREATE TABLE crc_table_archives (
crc_table_id INTEGER NOT NULL REFERENCES crc_tables (id),
archive_id uint1 NOT NULL,
crc32 INTEGER NOT NULL,
PRIMARY KEY (crc_table_id, archive_id)
);
CREATE TABLE archives (
archive_id uint1 NOT NULL,
blob_id BIGINT NOT NULL REFERENCES blobs (id),
PRIMARY KEY (archive_id, blob_id)
);
CREATE TABLE version_lists (
blob_id BIGINT PRIMARY KEY NOT NULL REFERENCES blobs (id)
);
CREATE TABLE version_list_files (
blob_id BIGINT NOT NULL REFERENCES version_lists (blob_id),
index_id uint1 NOT NULL,
file_id uint2 NOT NULL,
version uint2 NOT NULL,
crc32 INTEGER NOT NULL,
PRIMARY KEY (blob_id, index_id, file_id)
);
CREATE TABLE version_list_maps (
blob_id BIGINT NOT NULL REFERENCES version_lists (blob_id),
map_square uint2 NOT NULL,
map_file_id uint2 NOT NULL,
loc_file_id uint2 NOT NULL,
free_to_play BOOLEAN NOT NULL,
PRIMARY KEY (blob_id, map_square)
);
CREATE TABLE files (
index_id uint1 NOT NULL,
file_id uint2 NOT NULL,
version uint2 NOT NULL,
blob_id BIGINT NOT NULL REFERENCES blobs (id),
PRIMARY KEY (index_id, file_id, version, blob_id)
);
CREATE TABLE source_archives (
source_id INTEGER NOT NULL REFERENCES sources (id),
archive_id uint1 NOT NULL,
blob_id BIGINT NOT NULL REFERENCES blobs (id),
PRIMARY KEY (source_id, archive_id, blob_id),
FOREIGN KEY (archive_id, blob_id) REFERENCES archives (archive_id, blob_id)
);
CREATE INDEX ON source_archives (archive_id, blob_id);
CREATE TABLE source_files (
source_id INTEGER NOT NULL REFERENCES sources (id),
index_id uint1 NOT NULL,
file_id uint2 NOT NULL,
version uint2 NOT NULL,
blob_id BIGINT NOT NULL REFERENCES blobs (id),
PRIMARY KEY (source_id, index_id, file_id, version, blob_id),
FOREIGN KEY (index_id, file_id, version, blob_id) REFERENCES files (index_id, file_id, version, blob_id)
);
CREATE INDEX ON source_files (index_id, file_id, version, blob_id);
CREATE FUNCTION resolve_archive(_archive_id uint1, _crc32 INTEGER) RETURNS SETOF blobs AS $$
SELECT b.*
FROM archives a
JOIN blobs b ON b.id = a.blob_id
WHERE a.archive_id = _archive_id AND b.crc32 = _crc32
ORDER BY b.id ASC
LIMIT 1;
$$ LANGUAGE SQL STABLE PARALLEL SAFE ROWS 1;
CREATE FUNCTION resolve_file(_index_id uint1, _file_id uint2, _version uint2, _crc32 INTEGER) RETURNS SETOF blobs AS $$
SELECT b.*
FROM files f
JOIN blobs b on b.id = f.blob_id
WHERE f.index_id = _index_id AND f.file_id = _file_id AND f.version = _version AND b.crc32 = _crc32
ORDER BY b.id ASC
LIMIT 1;
$$ LANGUAGE SQL STABLE PARALLEL SAFE ROWS 1;
CREATE VIEW resolved_archives AS
SELECT c.id AS crc_table_id, a.archive_id, b.data, b.id AS blob_id
FROM crc_tables c
JOIN crc_table_archives a ON a.crc_table_id = c.id
JOIN resolve_archive(a.archive_id, a.crc32) b ON TRUE;
CREATE VIEW resolved_files (crc_table_id, index_id, file_id, version, data) AS
WITH a AS NOT MATERIALIZED (
SELECT crc_table_id, archive_id, data, blob_id
FROM resolved_archives
)
SELECT a.crc_table_id, 0::uint1, a.archive_id, NULL, a.data
FROM a
UNION ALL
SELECT a.crc_table_id, vf.index_id, vf.file_id, vf.version, f.data
FROM a
JOIN version_lists v ON v.blob_id = a.blob_id
JOIN version_list_files vf ON vf.blob_id = v.blob_id
JOIN resolve_file(vf.index_id, vf.file_id, vf.version, vf.crc32) f ON TRUE
WHERE a.archive_id = 5;
ALTER VIEW collisions RENAME TO colliding_groups;
CREATE VIEW colliding_archives (archive_id, crc32, blobs) AS
SELECT
a.archive_id,
b.crc32,
array_agg(DISTINCT b.id ORDER BY b.id ASC)
FROM archives a
JOIN blobs b ON b.id = a.blob_id
GROUP BY a.archive_id, b.crc32
HAVING COUNT(DISTINCT b.id) > 1;
CREATE VIEW colliding_files (index_id, file_id, version, crc32, blobs) AS
SELECT
f.index_id,
f.file_id,
f.version,
b.crc32,
array_agg(DISTINCT b.id ORDER BY b.id ASC)
FROM files f
JOIN blobs b ON b.id = f.blob_id
GROUP BY f.index_id, f.file_id, f.version, b.crc32
HAVING COUNT(DISTINCT b.id) > 1;
CREATE MATERIALIZED VIEW version_list_stats AS
SELECT
v.blob_id,
COUNT(*) FILTER (WHERE b.id IS NOT NULL) AS valid_files,
COUNT(*) AS files,
SUM(length(b.data) + 2) FILTER (WHERE b.id IS NOT NULL) AS size,
SUM(group_blocks(vf.file_id, length(b.data) + 2)) AS blocks
FROM version_lists v
JOIN version_list_files vf ON vf.blob_id = v.blob_id
LEFT JOIN resolve_file(vf.index_id, vf.file_id, vf.version, vf.crc32) b ON TRUE
GROUP BY v.blob_id;
CREATE UNIQUE INDEX ON version_list_stats (blob_id);
CREATE MATERIALIZED VIEW crc_table_stats AS
SELECT
c.id AS crc_table_id,
COUNT(*) FILTER (WHERE b.id IS NOT NULL AND a.crc32 <> 0) AS valid_archives,
COUNT(*) FILTER (WHERE a.crc32 <> 0) AS archives,
SUM(COALESCE(s.valid_files, 0)) AS valid_files,
SUM(COALESCE(s.files, 0)) AS files,
SUM(COALESCE(s.size, 0)) + SUM(COALESCE(length(b.data), 0)) AS size,
SUM(COALESCE(s.blocks, 0)) + SUM(COALESCE(group_blocks(a.archive_id, length(b.data)), 0)) AS blocks
FROM crc_tables c
LEFT JOIN crc_table_archives a ON a.crc_table_id = c.id
LEFT JOIN resolve_archive(a.archive_id, a.crc32) b ON TRUE
LEFT JOIN version_list_stats s ON s.blob_id = b.id
GROUP BY c.id;
CREATE UNIQUE INDEX ON crc_table_stats (crc_table_id);
CREATE VIEW cache_stats AS
SELECT
c.id AS cache_id,
COALESCE(ms.valid_indexes, cs.valid_archives) AS valid_indexes,
COALESCE(ms.indexes, cs.archives) AS indexes,
COALESCE(ms.valid_groups, cs.valid_files) AS valid_groups,
COALESCE(ms.groups, cs.files) AS groups,
COALESCE(ms.valid_keys, 0) AS valid_keys,
COALESCE(ms.keys, 0) AS keys,
COALESCE(ms.size, cs.size) AS size,
COALESCE(ms.blocks, cs.blocks) AS blocks
FROM caches c
LEFT JOIN master_index_stats ms ON ms.master_index_id = c.id
LEFT JOIN crc_table_stats cs ON cs.crc_table_id = c.id;

@ -16,7 +16,7 @@
<table class="table table-striped table-bordered table-hover">
<tr>
<th class="table-dark">Format</th>
<td th:text="${cache.masterIndex.format}">VERSIONED</td>
<td th:text="${cache.masterIndex}? ${cache.masterIndex.format} : 'LEGACY'">VERSIONED</td>
</tr>
<tr>
<th class="table-dark">Indexes</th>
@ -110,49 +110,74 @@
</table>
</div>
<h2>Master index</h2>
<div th:if="${cache.masterIndex}" th:remove="tag">
<h2>Master index</h2>
<div class="table-responsive">
<table class="table table-striped table-bordered table-hover">
<thead class="table-dark">
<tr>
<th>Archive</th>
<th>Version</th>
<th>Checksum</th>
<th>Digest</th>
<th>Groups</th>
<th>Total uncompressed length</th>
</tr>
</thead>
<tbody>
<tr th:each="entry, it : ${cache.masterIndex.entries}">
<td th:text="${it.index}" class="text-right">0</td>
<td th:text="${#numbers.formatInteger(entry.version, 1, 'COMMA')}" class="text-right">0</td>
<td class="text-right">
<code th:text="${entry.checksum}">0</code>
</td>
<td>
<code
th:if="${cache.masterIndex.format >= @org.openrs2.cache.MasterIndexFormat@DIGESTS}"><span
th:remove="tag"
th:text="${@io.netty.buffer.ByteBufUtil@hexDump(entry.digest).substring(0, 64)}"></span>&ZeroWidthSpace;<span
th:remove="tag"
th:text="${@io.netty.buffer.ByteBufUtil@hexDump(entry.digest).substring(64)}"></span></code>
</td>
<td class="text-right">
<span
th:if="${cache.masterIndex.format >= @org.openrs2.cache.MasterIndexFormat@LENGTHS}"
th:text="${#numbers.formatInteger(entry.groups, 1, 'COMMA')}"></span>
</td>
<td class="text-right">
<!--/*@thymesVar id="#byteunits" type="org.openrs2.archive.web.ByteUnits"*/-->
<span
th:if="${cache.masterIndex.format >= @org.openrs2.cache.MasterIndexFormat@LENGTHS}"
th:text="${#byteunits.format(@java.lang.Integer@toUnsignedLong(entry.totalUncompressedLength))}"></span>
</td>
</tr>
</tbody>
</table>
<div class="table-responsive">
<table class="table table-striped table-bordered table-hover">
<thead class="table-dark">
<tr>
<th>Archive</th>
<th>Version</th>
<th>Checksum</th>
<th>Digest</th>
<th>Groups</th>
<th>Total uncompressed length</th>
</tr>
</thead>
<tbody>
<tr th:each="entry, it : ${cache.masterIndex.entries}">
<td th:text="${it.index}" class="text-right">0</td>
<td th:text="${#numbers.formatInteger(entry.version, 1, 'COMMA')}" class="text-right">0</td>
<td class="text-right">
<code th:text="${entry.checksum}">0</code>
</td>
<td>
<code
th:if="${cache.masterIndex.format >= @org.openrs2.cache.MasterIndexFormat@DIGESTS}"><span
th:remove="tag"
th:text="${@io.netty.buffer.ByteBufUtil@hexDump(entry.digest).substring(0, 64)}"></span>&ZeroWidthSpace;<span
th:remove="tag"
th:text="${@io.netty.buffer.ByteBufUtil@hexDump(entry.digest).substring(64)}"></span></code>
</td>
<td class="text-right">
<span
th:if="${cache.masterIndex.format >= @org.openrs2.cache.MasterIndexFormat@LENGTHS}"
th:text="${#numbers.formatInteger(entry.groups, 1, 'COMMA')}"></span>
</td>
<td class="text-right">
<!--/*@thymesVar id="#byteunits" type="org.openrs2.archive.web.ByteUnits"*/-->
<span
th:if="${cache.masterIndex.format >= @org.openrs2.cache.MasterIndexFormat@LENGTHS}"
th:text="${#byteunits.format(@java.lang.Integer@toUnsignedLong(entry.totalUncompressedLength))}"></span>
</td>
</tr>
</tbody>
</table>
</div>
</div>
<div th:if="${cache.checksumTable}" th:remove="tag">
<h2>Checksum table</h2>
<div class="table-responsive">
<table class="table table-striped table-bordered table-hover">
<thead class="table-dark">
<tr>
<th>Archive</th>
<th>Checksum</th>
</tr>
</thead>
<tbody>
<tr th:each="entry, it : ${cache.checksumTable.entries}">
<td th:text="${it.index}" class="text-right">0</td>
<td class="text-right">
<code th:text="${entry}">0</code>
</td>
</tr>
</tbody>
</table>
</div>
</div>
</main>
</body>

Loading…
Cancel
Save