Add DiskStoreZipWriter

Signed-off-by: Graham <gpe@openrs2.org>
Graham 4 years ago
parent b61992a20f
commit d5d76f9301
  1. 12
      cache/src/main/kotlin/org/openrs2/cache/DiskStore.kt
  2. 193
      cache/src/main/kotlin/org/openrs2/cache/DiskStoreZipWriter.kt
  3. 121
      cache/src/test/kotlin/org/openrs2/cache/DiskStoreZipWriterTest.kt
  4. BIN
      cache/src/test/resources/org/openrs2/cache/zip/cache.zip

@ -447,16 +447,16 @@ public class DiskStore private constructor(
} }
public companion object { public companion object {
private const val INDEX_ENTRY_SIZE = 6 internal const val INDEX_ENTRY_SIZE = 6
private const val BLOCK_HEADER_SIZE = 8 private const val BLOCK_HEADER_SIZE = 8
private const val BLOCK_DATA_SIZE = 512 internal const val BLOCK_DATA_SIZE = 512
private const val BLOCK_SIZE = BLOCK_HEADER_SIZE + BLOCK_DATA_SIZE internal const val BLOCK_SIZE = BLOCK_HEADER_SIZE + BLOCK_DATA_SIZE
private const val EXTENDED_BLOCK_HEADER_SIZE = 10 internal const val EXTENDED_BLOCK_HEADER_SIZE = 10
private const val EXTENDED_BLOCK_DATA_SIZE = 510 internal const val EXTENDED_BLOCK_DATA_SIZE = 510
private const val MAX_BLOCK = (1 shl 24) - 1 internal const val MAX_BLOCK = (1 shl 24) - 1
private val TEMP_BUFFER_SIZE = max(INDEX_ENTRY_SIZE, max(BLOCK_HEADER_SIZE, EXTENDED_BLOCK_HEADER_SIZE)) private val TEMP_BUFFER_SIZE = max(INDEX_ENTRY_SIZE, max(BLOCK_HEADER_SIZE, EXTENDED_BLOCK_HEADER_SIZE))
private const val INDEX_BUFFER_SIZE = INDEX_ENTRY_SIZE * 1000 private const val INDEX_BUFFER_SIZE = INDEX_ENTRY_SIZE * 1000

@ -0,0 +1,193 @@
package org.openrs2.cache
import io.netty.buffer.ByteBuf
import io.netty.buffer.ByteBufAllocator
import it.unimi.dsi.fastutil.ints.Int2ObjectAVLTreeMap
import it.unimi.dsi.fastutil.ints.Int2ObjectSortedMap
import org.openrs2.buffer.use
import java.nio.file.attribute.FileTime
import java.time.Instant
import java.util.zip.Deflater
import java.util.zip.ZipEntry
import java.util.zip.ZipOutputStream
import kotlin.math.min
/**
* A specialised [Store] implementation that writes a cache in the native
* `main_file_cache.dat2`/`main_file_cache.idx*` format to a [ZipOutputStream].
*
* The cache is not buffered to disk, though the index entries must be buffered
* in memory. The memory usage is therefore proportional to the number of
* groups, but crucially, not the size of the data file.
*
* This implementation only supports the [create] and [write] methods. All
* other methods throw [UnsupportedOperationException].
*
* It is only intended for use by the cache archiving service's web interface.
*/
public class DiskStoreZipWriter(
private val out: ZipOutputStream,
private val prefix: String = "cache/",
level: Int = Deflater.BEST_COMPRESSION,
timestamp: Instant = Instant.EPOCH,
private val alloc: ByteBufAllocator = ByteBufAllocator.DEFAULT
) : Store {
private data class IndexEntry(val len: Int, val block: Int)
private val timestamp = FileTime.from(timestamp)
private val indexes = arrayOfNulls<Int2ObjectSortedMap<IndexEntry>>(Store.MAX_ARCHIVE + 1)
private val zeroBlock = ByteArray(DiskStore.BLOCK_SIZE)
private var block = 0
private var padding = 0
init {
out.setLevel(level)
out.putNextEntry(createZipEntry("main_file_cache.dat2"))
out.write(zeroBlock)
}
private fun allocateBlock(): Int {
if (++block > DiskStore.MAX_BLOCK) {
throw StoreFullException()
}
return block
}
private fun createZipEntry(name: String): ZipEntry {
val entry = ZipEntry(prefix + name)
entry.creationTime = timestamp
entry.lastAccessTime = timestamp
entry.lastModifiedTime = timestamp
return entry
}
override fun exists(archive: Int): Boolean {
throw UnsupportedOperationException()
}
override fun exists(archive: Int, group: Int): Boolean {
throw UnsupportedOperationException()
}
override fun list(): List<Int> {
throw UnsupportedOperationException()
}
override fun list(archive: Int): List<Int> {
throw UnsupportedOperationException()
}
override fun create(archive: Int) {
require(archive in 0..Store.MAX_ARCHIVE)
if (indexes[archive] == null) {
indexes[archive] = Int2ObjectAVLTreeMap()
}
}
override fun read(archive: Int, group: Int): ByteBuf {
throw UnsupportedOperationException()
}
override fun write(archive: Int, group: Int, buf: ByteBuf) {
require(archive in 0..Store.MAX_ARCHIVE)
require(group >= 0)
require(buf.readableBytes() <= Store.MAX_GROUP_SIZE)
val entry = IndexEntry(buf.readableBytes(), allocateBlock())
var index = indexes[archive]
if (index == null) {
index = Int2ObjectAVLTreeMap()
indexes[archive] = index
}
index[group] = entry
val extended = group >= 65536
val dataSize = if (extended) {
DiskStore.EXTENDED_BLOCK_DATA_SIZE
} else {
DiskStore.BLOCK_DATA_SIZE
}
var num = 0
alloc.buffer(DiskStore.EXTENDED_BLOCK_HEADER_SIZE, DiskStore.EXTENDED_BLOCK_HEADER_SIZE).use { tempBuf ->
do {
// add padding to ensure we're at a multiple of the block size
out.write(zeroBlock, 0, padding)
// write header
tempBuf.clear()
if (extended) {
tempBuf.writeInt(group)
} else {
tempBuf.writeShort(group)
}
tempBuf.writeShort(num++)
if (buf.readableBytes() > dataSize) {
tempBuf.writeMedium(allocateBlock())
} else {
tempBuf.writeMedium(0)
}
tempBuf.writeByte(archive)
tempBuf.readBytes(out, tempBuf.readableBytes())
// write data
val len = min(buf.readableBytes(), dataSize)
buf.readBytes(out, len)
// remember how much padding we need to insert before the next block
padding = dataSize - len
} while (buf.isReadable)
}
}
override fun remove(archive: Int) {
throw UnsupportedOperationException()
}
override fun remove(archive: Int, group: Int) {
throw UnsupportedOperationException()
}
override fun flush() {
out.flush()
}
private fun finish() {
alloc.buffer(DiskStore.INDEX_ENTRY_SIZE, DiskStore.INDEX_ENTRY_SIZE).use { tempBuf ->
for ((archive, index) in indexes.withIndex()) {
if (index == null) {
continue
}
out.putNextEntry(createZipEntry("main_file_cache.idx$archive"))
if (index.isEmpty()) {
continue
}
for (group in 0..index.lastIntKey()) {
val entry = index[group]
if (entry != null) {
tempBuf.clear()
tempBuf.writeMedium(entry.len)
tempBuf.writeMedium(entry.block)
tempBuf.readBytes(out, tempBuf.readableBytes())
} else {
out.write(zeroBlock, 0, DiskStore.INDEX_ENTRY_SIZE)
}
}
}
}
}
public override fun close() {
finish()
out.close()
}
}

@ -0,0 +1,121 @@
package org.openrs2.cache
import com.google.common.jimfs.Configuration
import com.google.common.jimfs.Jimfs
import io.netty.buffer.Unpooled
import org.junit.jupiter.api.assertThrows
import org.openrs2.buffer.copiedBuffer
import org.openrs2.buffer.use
import org.openrs2.util.io.recursiveEquals
import java.io.OutputStream
import java.nio.file.Files
import java.nio.file.Path
import java.util.zip.ZipOutputStream
import kotlin.test.Test
import kotlin.test.assertTrue
object DiskStoreZipWriterTest {
private val ROOT = Path.of(DiskStoreZipWriterTest::class.java.getResource("zip").toURI())
@Test
fun testBounds() {
DiskStoreZipWriter(ZipOutputStream(OutputStream.nullOutputStream())).use { store ->
// create
assertThrows<IllegalArgumentException> {
store.create(-1)
}
store.create(0)
store.create(1)
store.create(254)
store.create(255)
assertThrows<IllegalArgumentException> {
store.create(256)
}
// write archive
assertThrows<IllegalArgumentException> {
store.write(-1, 0, Unpooled.EMPTY_BUFFER)
}
store.write(0, 0, Unpooled.EMPTY_BUFFER)
store.write(1, 0, Unpooled.EMPTY_BUFFER)
store.write(254, 0, Unpooled.EMPTY_BUFFER)
store.write(255, 0, Unpooled.EMPTY_BUFFER)
assertThrows<IllegalArgumentException> {
store.write(256, 0, Unpooled.EMPTY_BUFFER)
}
// write group
assertThrows<IllegalArgumentException> {
store.write(0, -1, Unpooled.EMPTY_BUFFER)
}
store.write(0, 0, Unpooled.EMPTY_BUFFER)
store.write(0, 1, Unpooled.EMPTY_BUFFER)
}
}
@Test
fun testUnsupported() {
DiskStoreZipWriter(ZipOutputStream(OutputStream.nullOutputStream())).use { store ->
assertThrows<UnsupportedOperationException> {
store.exists(0)
}
assertThrows<UnsupportedOperationException> {
store.exists(0, 0)
}
assertThrows<UnsupportedOperationException> {
store.list()
}
assertThrows<UnsupportedOperationException> {
store.list(0)
}
assertThrows<UnsupportedOperationException> {
store.read(0, 0)
}
assertThrows<UnsupportedOperationException> {
store.remove(0)
}
assertThrows<UnsupportedOperationException> {
store.remove(0, 0)
}
}
}
@Test
fun testWrite() {
Jimfs.newFileSystem(Configuration.forCurrentPlatform()).use { fs ->
val actual = fs.rootDirectories.first().resolve("zip")
Files.createDirectories(actual)
Files.newOutputStream(actual.resolve("cache.zip")).use { out ->
DiskStoreZipWriter(ZipOutputStream(out)).use { store ->
store.create(0)
copiedBuffer("OpenRS2").use { buf ->
store.write(2, 0, buf)
}
copiedBuffer("OpenRS2".repeat(100)).use { buf ->
store.write(2, 65535, buf)
}
copiedBuffer("OpenRS2".repeat(100)).use { buf ->
store.write(2, 65536, buf)
}
}
}
assertTrue(ROOT.recursiveEquals(actual))
}
}
}
Loading…
Cancel
Save