Add low-level cache interface

Unit tests to follow - I've been working on these classes for a few days
now, so I wanted to make sure they get backed up in the repository.

Signed-off-by: Graham <gpe@openrs2.dev>
bzip2
Graham 4 years ago
parent cec68723a4
commit cea016d4ef
  1. 1
      cache/build.gradle.kts
  2. 53
      cache/src/main/java/dev/openrs2/cache/BufferedFileChannel.kt
  3. 476
      cache/src/main/java/dev/openrs2/cache/DiskStore.kt
  4. 129
      cache/src/main/java/dev/openrs2/cache/FlatFileStore.kt
  5. 122
      cache/src/main/java/dev/openrs2/cache/Store.kt
  6. 5
      cache/src/main/java/dev/openrs2/cache/StoreCorruptException.kt
  7. 5
      cache/src/main/java/dev/openrs2/cache/StoreFullException.kt

@ -7,6 +7,7 @@ dependencies {
implementation(project(":buffer"))
implementation(project(":compress"))
implementation(project(":crypto"))
implementation(project(":util"))
}
publishing {

@ -0,0 +1,53 @@
package dev.openrs2.cache
import io.netty.buffer.ByteBuf
import java.io.Closeable
import java.io.EOFException
import java.io.Flushable
import java.nio.channels.FileChannel
// TODO(gpe): actually implement buffering
class BufferedFileChannel(
private val channel: FileChannel
) : Flushable, Closeable {
fun read(pos: Long, dest: ByteBuf, len: Int) {
require(len <= dest.writableBytes())
var off = pos
var remaining = len
while (remaining > 0) {
val n = dest.writeBytes(channel, off, remaining)
if (n == -1) {
throw EOFException()
}
off += n
remaining -= n
}
}
fun write(pos: Long, src: ByteBuf, len: Int) {
require(len <= src.readableBytes())
var off = pos
var remaining = len
while (remaining > 0) {
val n = src.readBytes(channel, off, remaining)
off += n
remaining -= n
}
}
fun size(): Long {
return channel.size()
}
override fun flush() {
// empty
}
override fun close() {
channel.close()
}
}

@ -0,0 +1,476 @@
package dev.openrs2.cache
import dev.openrs2.buffer.use
import io.netty.buffer.ByteBuf
import io.netty.buffer.ByteBufAllocator
import java.io.FileNotFoundException
import java.nio.channels.FileChannel
import java.nio.file.Files
import java.nio.file.Path
import java.nio.file.StandardOpenOption.CREATE
import java.nio.file.StandardOpenOption.READ
import java.nio.file.StandardOpenOption.WRITE
import kotlin.math.max
import kotlin.math.min
/**
* A [Store] implementation compatible with the native `main_file_cache.dat2`
* and `main_file_cache.idx*` format used by the client.
*/
class DiskStore private constructor(
private val root: Path,
private val data: BufferedFileChannel,
private val indexes: Array<BufferedFileChannel?>,
private val alloc: ByteBufAllocator
) : Store {
private data class IndexEntry(val size: Int, val block: Int)
init {
require(indexes.size == Store.MAX_ARCHIVE + 1)
}
private fun checkArchive(archive: Int) {
require(archive in 0..Store.MAX_ARCHIVE)
}
private fun checkGroup(archive: Int, group: Int) {
checkArchive(archive)
// no upper bound on the range check here, as newer caches support 4 byte group IDs
require(group >= 0)
}
private fun readIndexEntry(archive: Int, group: Int, tempBuf: ByteBuf): IndexEntry? {
checkGroup(archive, group)
val index = indexes[archive] ?: return null
val pos = group.toLong() * INDEX_ENTRY_SIZE
if ((pos + INDEX_ENTRY_SIZE) > index.size()) {
return null
}
index.read(pos, tempBuf, INDEX_ENTRY_SIZE)
val size = tempBuf.readUnsignedMedium()
val block = tempBuf.readUnsignedMedium()
return IndexEntry(size, block)
}
override fun exists(archive: Int): Boolean {
checkArchive(archive)
return indexes[archive] != null
}
override fun exists(archive: Int, group: Int): Boolean {
alloc.buffer(TEMP_BUFFER_SIZE, TEMP_BUFFER_SIZE).use { tempBuf ->
val entry = readIndexEntry(archive, group, tempBuf) ?: return false
return entry.block != 0
}
}
override fun list(): List<Int> {
return indexes.withIndex()
.filter { it.value != null }
.map { it.index }
.toList()
}
override fun list(archive: Int): List<Int> {
checkArchive(archive)
alloc.buffer(TEMP_BUFFER_SIZE, TEMP_BUFFER_SIZE).use { tempBuf ->
val index = indexes[archive] ?: throw FileNotFoundException()
val groups = mutableListOf<Int>()
val groupCount = min(index.size() / INDEX_ENTRY_SIZE, Int.MAX_VALUE.toLong()).toInt()
var pos = 0L
for (group in 0 until groupCount) {
tempBuf.clear()
index.read(pos, tempBuf, INDEX_ENTRY_SIZE)
tempBuf.skipBytes(3)
val block = tempBuf.readUnsignedMedium()
if (block != 0) {
groups += group
}
pos += INDEX_ENTRY_SIZE
}
return groups
}
}
private fun getOrCreateIndex(archive: Int): BufferedFileChannel {
val index = indexes[archive]
if (index != null) {
return index
}
val newIndex = BufferedFileChannel(FileChannel.open(indexPath(root, archive), CREATE, READ, WRITE))
indexes[archive] = newIndex
return newIndex
}
override fun create(archive: Int) {
checkArchive(archive)
getOrCreateIndex(archive)
}
override fun read(archive: Int, group: Int): ByteBuf {
alloc.buffer(TEMP_BUFFER_SIZE, TEMP_BUFFER_SIZE).use { tempBuf ->
val entry = readIndexEntry(archive, group, tempBuf) ?: throw FileNotFoundException()
if (entry.block == 0) {
throw FileNotFoundException()
}
alloc.buffer(entry.size, entry.size).use { buf ->
val extended = group >= 65536
val headerSize = if (extended) {
EXTENDED_BLOCK_HEADER_SIZE
} else {
BLOCK_HEADER_SIZE
}
val dataSize = if (extended) {
EXTENDED_BLOCK_DATA_SIZE
} else {
BLOCK_DATA_SIZE
}
var block = entry.block
var num = 0
do {
if (block == 0) {
throw StoreCorruptException("Group shorter than expected")
}
val pos = block.toLong() * BLOCK_SIZE
if (pos + headerSize > data.size()) {
throw StoreCorruptException("Next block is outside the data file")
}
// read header
tempBuf.clear()
data.read(pos, tempBuf, headerSize)
val actualGroup = if (extended) {
tempBuf.readInt()
} else {
tempBuf.readUnsignedShort()
}
val actualNum = tempBuf.readUnsignedShort()
val nextBlock = tempBuf.readUnsignedMedium()
val actualArchive = tempBuf.readUnsignedByte().toInt()
// verify header
when {
actualGroup != group -> throw StoreCorruptException("Expecting group $group, was $actualGroup")
actualNum != num -> throw StoreCorruptException("Expecting block number $num, was $actualNum")
actualArchive != archive ->
throw StoreCorruptException("Expecting archive $archive, was $actualArchive")
}
// read data
val len = min(buf.writableBytes(), dataSize)
data.read(pos + headerSize, buf, len)
// advance to next block
block = nextBlock
num++
} while (buf.isWritable)
if (block != 0) {
throw StoreCorruptException("Group longer than expected")
}
return buf.retain()
}
}
}
private fun allocateBlock(): Int {
var block = (data.size() + BLOCK_SIZE - 1) / BLOCK_SIZE
if (block == 0L) {
// 0 is reserved to represent the absence of a file
block = 1
} else if (block < 0 || block > MAX_BLOCK) {
throw StoreFullException()
}
return block.toInt()
}
override fun write(archive: Int, group: Int, buf: ByteBuf) {
/*
* This method is more complicated than both the client's
* implementation and most existing third-party implementations that I
* am aware of.
*
* Unlike the client, it is capable of overwriting a shorter group with
* a longer one in a single pass by switching between overwrite and
* non-overwrite modes when it reaches the end of the original group.
*
* The client performs this in two passes. It wastes space, as it
* doesn't re-use any of the original group's blocks in the second
* pass.
*
* Unlike most existing third-party implementations, this
* implementation is capable of overwriting a corrupt group by
* switching to non-overwrite mode immediately upon detecting
* corruption, even if it hasn't hit the end of the original group yet.
* This requires reading ahead by a block, making the logic more
* complicated.
*
* Most existing third-party implementations throw an exception when
* they attempt to overwrite a corrupt group. The client is capable o
* overwriting corrupt groups, but as above it does so in two passes.
* Again, this two pass approach wastes space.
*
* This class mixes the best features of all implementations at the
* expense of additional complexity: all writes use a single pass, as
* many blocks are re-used as possible (minimising the size of the
* .dat2 file) and it is capable of overwriting corrupt groups.
*/
checkGroup(archive, group)
val newSize = buf.readableBytes()
require(newSize <= Store.MAX_GROUP_SIZE)
val index = getOrCreateIndex(archive)
alloc.buffer(TEMP_BUFFER_SIZE, TEMP_BUFFER_SIZE).use { tempBuf ->
// read existing index entry, if it exists
val indexPos = group.toLong() * INDEX_ENTRY_SIZE
var block = if ((indexPos + INDEX_ENTRY_SIZE) <= index.size()) {
index.read(indexPos, tempBuf, INDEX_ENTRY_SIZE)
tempBuf.skipBytes(3)
tempBuf.readUnsignedMedium()
} else {
0
}
// determine header/data sizes
val extended = group >= 65536
val headerSize = if (extended) {
EXTENDED_BLOCK_HEADER_SIZE
} else {
BLOCK_HEADER_SIZE
}
val dataSize = if (extended) {
EXTENDED_BLOCK_DATA_SIZE
} else {
BLOCK_DATA_SIZE
}
// check that the first block isn't outside the data file
val firstBlockPos = block.toLong() * BLOCK_SIZE
if (firstBlockPos + headerSize > data.size()) {
block = 0
}
// check that the first block is valid
var num = 0
var nextBlock = 0
if (block != 0) {
tempBuf.clear()
data.read(firstBlockPos, tempBuf, headerSize)
val actualGroup = if (extended) {
tempBuf.readInt()
} else {
tempBuf.readUnsignedShort()
}
val actualNum = tempBuf.readUnsignedShort()
nextBlock = tempBuf.readUnsignedMedium()
val actualArchive = tempBuf.readUnsignedByte().toInt()
if (actualGroup != group || actualNum != num || actualArchive != archive) {
block = 0
nextBlock = 0
}
}
// allocate a new block if necessary
var overwrite: Boolean
if (block == 0) {
block = allocateBlock()
overwrite = false
} else {
overwrite = true
}
// write new index entry
tempBuf.clear()
tempBuf.writeMedium(newSize)
tempBuf.writeMedium(block)
index.write(indexPos, tempBuf, INDEX_ENTRY_SIZE)
do {
val nextNum = num + 1
var nextNextBlock = 0
val len: Int
val remaining = buf.readableBytes()
if (remaining <= dataSize) {
// we're in the last block, so the next block is zero
len = remaining
nextBlock = 0
} else {
len = dataSize
if (overwrite) {
// check that the next block isn't outside the data file
val nextBlockPos = nextBlock.toLong() * BLOCK_SIZE
if (nextBlockPos + headerSize > data.size()) {
nextBlock = 0
}
// check that the next block is valid
if (nextBlock != 0) {
tempBuf.clear()
data.read(nextBlockPos, tempBuf, headerSize)
val actualGroup = if (extended) {
tempBuf.readInt()
} else {
tempBuf.readUnsignedShort()
}
val actualNum = tempBuf.readUnsignedShort()
nextNextBlock = tempBuf.readUnsignedMedium()
val actualArchive = tempBuf.readUnsignedByte().toInt()
if (actualGroup != group || actualNum != nextNum || actualArchive != archive) {
nextBlock = 0
nextNextBlock = 0
}
}
// allocate a new block if necessary
if (nextBlock == 0) {
nextBlock = allocateBlock()
overwrite = false
}
} else {
nextBlock = block + 1
if (nextBlock > MAX_BLOCK) {
throw StoreFullException()
}
}
}
// write header
val blockPos = block.toLong() * BLOCK_SIZE
tempBuf.clear()
if (extended) {
tempBuf.writeInt(group)
} else {
tempBuf.writeShort(group)
}
tempBuf.writeShort(num)
tempBuf.writeMedium(nextBlock)
tempBuf.writeByte(archive)
data.write(blockPos, tempBuf, headerSize)
// write data
data.write(blockPos + headerSize, buf, len)
// advance to next block
block = nextBlock
nextBlock = nextNextBlock
num = nextNum
} while (buf.isReadable)
}
}
override fun remove(archive: Int) {
checkArchive(archive)
val index = indexes[archive] ?: return
index.close()
Files.deleteIfExists(indexPath(root, archive))
indexes[archive] = null
}
override fun remove(archive: Int, group: Int) {
checkArchive(archive)
val index = indexes[archive] ?: return
val pos = group.toLong() * INDEX_ENTRY_SIZE
if ((pos + INDEX_ENTRY_SIZE) > index.size()) {
return
}
alloc.buffer(TEMP_BUFFER_SIZE, TEMP_BUFFER_SIZE).use { tempBuf ->
tempBuf.writeZero(INDEX_ENTRY_SIZE)
index.write(pos, tempBuf, INDEX_ENTRY_SIZE)
}
}
override fun flush() {
data.flush()
for (index in indexes) {
index?.flush()
}
}
override fun close() {
data.close()
for (index in indexes) {
index?.close()
}
}
companion object {
private const val INDEX_ENTRY_SIZE = 6
private const val BLOCK_HEADER_SIZE = 8
private const val BLOCK_DATA_SIZE = 512
private const val BLOCK_SIZE = BLOCK_HEADER_SIZE + BLOCK_DATA_SIZE
private const val EXTENDED_BLOCK_HEADER_SIZE = 10
private const val EXTENDED_BLOCK_DATA_SIZE = 510
private const val MAX_BLOCK = (1 shl 24) - 1
private val TEMP_BUFFER_SIZE = max(INDEX_ENTRY_SIZE, max(BLOCK_HEADER_SIZE, EXTENDED_BLOCK_HEADER_SIZE))
private fun dataPath(root: Path): Path {
return root.resolve("main_file_cache.dat2")
}
private fun indexPath(root: Path, archive: Int): Path {
return root.resolve("main_file_cache.idx$archive")
}
fun open(root: Path, alloc: ByteBufAllocator = ByteBufAllocator.DEFAULT): Store {
val data = BufferedFileChannel(FileChannel.open(dataPath(root), READ, WRITE))
val archives = Array(Store.MAX_ARCHIVE + 1) { archive ->
val path = indexPath(root, archive)
if (Files.exists(path)) {
BufferedFileChannel(FileChannel.open(path, READ, WRITE))
} else {
null
}
}
return DiskStore(root, data, archives, alloc)
}
fun create(root: Path, alloc: ByteBufAllocator = ByteBufAllocator.DEFAULT): Store {
Files.createDirectories(root)
val data = BufferedFileChannel(FileChannel.open(dataPath(root), CREATE, READ, WRITE))
val archives = Array<BufferedFileChannel?>(Store.MAX_ARCHIVE + 1) { null }
return DiskStore(root, data, archives, alloc)
}
}
}

@ -0,0 +1,129 @@
package dev.openrs2.cache
import dev.openrs2.buffer.use
import dev.openrs2.util.io.useAtomicOutputStream
import io.netty.buffer.ByteBuf
import io.netty.buffer.ByteBufAllocator
import java.io.FileNotFoundException
import java.nio.channels.FileChannel
import java.nio.file.Files
import java.nio.file.Path
/**
* A [Store] implementation that represents archives as file system directories
* and groups as file system files. This format is much friendlier to
* content-addressable version control systems, such as Git, than the native
* format used by the client.
*/
class FlatFileStore private constructor(
private val root: Path,
private val alloc: ByteBufAllocator
) : Store {
private fun archivePath(archive: Int): Path {
require(archive in 0..Store.MAX_ARCHIVE)
return root.resolve(archive.toString())
}
private fun groupPath(archive: Int, group: Int): Path {
// no upper bound on the range check here, as newer caches support 4 byte group IDs
require(group >= 0)
return archivePath(archive).resolve("$group$GROUP_EXTENSION")
}
override fun exists(archive: Int): Boolean {
return Files.isDirectory(archivePath(archive))
}
override fun exists(archive: Int, group: Int): Boolean {
return Files.isRegularFile(groupPath(archive, group))
}
override fun list(): List<Int> {
Files.newDirectoryStream(root).use { stream ->
return stream.filter { Files.isDirectory(it) && ARCHIVE_NAME.matches(it.fileName.toString()) }
.map { Integer.parseInt(it.fileName.toString()) }
.sorted()
.toList()
}
}
override fun list(archive: Int): List<Int> {
Files.newDirectoryStream(archivePath(archive)).use { stream ->
return stream.filter { Files.isRegularFile(it) && GROUP_NAME.matches(it.fileName.toString()) }
.map { Integer.parseInt(it.fileName.toString().removeSuffix(GROUP_EXTENSION)) }
.sorted()
.toList()
}
}
override fun create(archive: Int) {
Files.createDirectory(archivePath(archive))
}
override fun read(archive: Int, group: Int): ByteBuf {
FileChannel.open(groupPath(archive, group)).use { channel ->
val size = channel.size()
if (size > Store.MAX_GROUP_SIZE) {
throw StoreCorruptException("Group too large")
}
alloc.buffer(size.toInt(), size.toInt()).use { buf ->
buf.writeBytes(channel, 0, buf.writableBytes())
return buf.retain()
}
}
}
override fun write(archive: Int, group: Int, buf: ByteBuf) {
require(buf.readableBytes() <= Store.MAX_GROUP_SIZE)
val path = groupPath(archive, group)
Files.createDirectories(path.parent)
path.useAtomicOutputStream { output ->
buf.readBytes(output, buf.readableBytes())
}
}
override fun remove(archive: Int) {
val path = archivePath(archive)
Files.newDirectoryStream(path).use { stream ->
stream.filter { Files.isRegularFile(it) && GROUP_NAME.matches(it.fileName.toString()) }
.forEach { Files.deleteIfExists(it) }
}
Files.deleteIfExists(path)
}
override fun remove(archive: Int, group: Int) {
Files.deleteIfExists(groupPath(archive, group))
}
override fun flush() {
// no-op
}
override fun close() {
// no-op
}
companion object {
private val ARCHIVE_NAME = Regex("[1-9][0-9]*")
private val GROUP_NAME = Regex("[1-9][0-9]*[.]dat")
private const val GROUP_EXTENSION = ".dat"
fun open(root: Path, alloc: ByteBufAllocator = ByteBufAllocator.DEFAULT): Store {
if (!Files.isDirectory(root)) {
throw FileNotFoundException()
}
return FlatFileStore(root, alloc)
}
fun create(root: Path, alloc: ByteBufAllocator = ByteBufAllocator.DEFAULT): Store {
Files.createDirectories(root)
return FlatFileStore(root, alloc)
}
}
}

@ -0,0 +1,122 @@
package dev.openrs2.cache
import io.netty.buffer.ByteBuf
import java.io.Closeable
import java.io.FileNotFoundException
import java.io.Flushable
import java.io.IOException
/**
* A low-level interface for reading and writing raw groups directly to and
* from a collection of JS5 archives.
*/
interface Store : Flushable, Closeable {
/**
* Checks whether an archive exists.
* @param archive the archive ID.
* @return `true` if so, `false` otherwise.
* @throws IllegalArgumentException if the archive ID is out of bounds.
* @throws IOException if an underlying I/O error occurs.
*/
fun exists(archive: Int): Boolean
/**
* Checks whether a group exists.
* @param archive the archive ID.
* @param group the group ID.
* @return `true` if both the group and archive exist, `false` otherwise.
* @throws IllegalArgumentException if the archive or group ID is out of
* bounds.
* @throws IOException if an underlying I/O error occurs.
*/
fun exists(archive: Int, group: Int): Boolean
/**
* Lists all archives in the store.
* @return a sorted list of archive IDs.
* @throws IOException if an underlying I/O error occurs.
*/
fun list(): List<Int>
/**
* Lists all groups in an archive.
* @param archive the archive ID.
* @return a sorted list of group IDs.
* @throws IllegalArgumentException if the archive or group ID is out of
* bounds.
* @throws FileNotFoundException if the archive does not exist.
* @throws IOException if an underlying I/O error occurs.
*/
fun list(archive: Int): List<Int>
/**
* Creates an archive. Does nothing if the archive already exists.
* @param archive the archive ID.
* @throws IllegalArgumentException if the archive ID is out of bounds.
* @throws IOException if an underlying I/O error occurs.
*/
fun create(archive: Int)
/**
* Reads a group.
*
* This method allocates and returns a new [ByteBuf]. It is the caller's
* responsibility to release the [ByteBuf].
* @param archive the archive ID.
* @param group the group ID.
* @return the contents of the group.
* @throws IllegalArgumentException if the archive or group ID is out of
* bounds.
* @throws FileNotFoundException if the archive or group does not exist.
* @throws StoreCorruptException if the store is corrupt.
* @throws IOException if an underlying I/O error occurs.
*/
fun read(archive: Int, group: Int): ByteBuf
/**
* Writes a group. If the archive does not exist, it is created first. If a
* group with the same ID already exists, it is overwritten.
*
* This method consumes the readable portion of the given [ByteBuf]. It
* does not modify the [ByteBuf]'s reference count.
* @param archive the archive ID.
* @param group the group ID.
* @param buf the new contents of the group.
* @throws IllegalArgumentException if the archive or group ID is out of
* bounds, or if [buf] is too long (see [MAX_GROUP_SIZE]).
* @throws StoreFullException if the store is full.
* @throws IOException if an underlying I/O error occurs.
*/
fun write(archive: Int, group: Int, buf: ByteBuf)
/**
* Deletes an archive and all groups contained inside it. Does nothing if
* the archive does not exist.
* @param archive the archive ID.
* @throws IllegalArgumentException if the archive ID is out of bounds.
* @throws IOException if an underlying I/O error occurs.
*/
fun remove(archive: Int)
/**
* Deletes a group. Does nothing if the archive or group does not exist.
* @param archive the archive ID.
* @param group the group ID.
* @throws IllegalArgumentException if the archive or group ID is out of
* bounds.
* @throws IOException if an underlying I/O error occurs.
*/
fun remove(archive: Int, group: Int)
companion object {
/**
* The maximum archive ID.
*/
const val MAX_ARCHIVE = 255
/**
* The maximum length of a group's contents in bytes.
*/
const val MAX_GROUP_SIZE = (1 shl 24) - 1
}
}

@ -0,0 +1,5 @@
package dev.openrs2.cache
import java.io.IOException
class StoreCorruptException(message: String) : IOException(message)

@ -0,0 +1,5 @@
package dev.openrs2.cache
import java.io.IOException
class StoreFullException : IOException()
Loading…
Cancel
Save