From f87d89fe7cbb12995331c5a5e7f29c891032de71 Mon Sep 17 00:00:00 2001 From: Graham Date: Thu, 31 Dec 2020 22:00:25 +0000 Subject: [PATCH] Add initial command for downloading the cache from a JS5 server Signed-off-by: Graham --- archive/build.gradle.kts | 2 + .../org/openrs2/archive/ArchiveModule.kt | 2 + .../org/openrs2/archive/cache/CacheCommand.kt | 1 + .../openrs2/archive/cache/CacheDownloader.kt | 28 +++ .../openrs2/archive/cache/CacheImporter.kt | 162 ++++++++++++++-- .../openrs2/archive/cache/DownloadCommand.kt | 15 ++ .../archive/cache/Js5ChannelHandler.kt | 176 ++++++++++++++++++ .../archive/cache/Js5ChannelInitializer.kt | 22 +++ buildSrc/src/main/kotlin/Versions.kt | 1 + net/build.gradle.kts | 32 ++++ .../dev/openrs2/net/BootstrapFactory.kt | 74 ++++++++ .../dev/openrs2/net/FutureExtensions.kt | 26 +++ .../kotlin/dev/openrs2/net/NetworkModule.kt | 10 + settings.gradle.kts | 1 + 14 files changed, 539 insertions(+), 13 deletions(-) create mode 100644 archive/src/main/kotlin/org/openrs2/archive/cache/CacheDownloader.kt create mode 100644 archive/src/main/kotlin/org/openrs2/archive/cache/DownloadCommand.kt create mode 100644 archive/src/main/kotlin/org/openrs2/archive/cache/Js5ChannelHandler.kt create mode 100644 archive/src/main/kotlin/org/openrs2/archive/cache/Js5ChannelInitializer.kt create mode 100644 net/build.gradle.kts create mode 100644 net/src/main/kotlin/dev/openrs2/net/BootstrapFactory.kt create mode 100644 net/src/main/kotlin/dev/openrs2/net/FutureExtensions.kt create mode 100644 net/src/main/kotlin/dev/openrs2/net/NetworkModule.kt diff --git a/archive/build.gradle.kts b/archive/build.gradle.kts index cb9b0cd3..7718eb2c 100644 --- a/archive/build.gradle.kts +++ b/archive/build.gradle.kts @@ -15,6 +15,8 @@ dependencies { implementation(project(":cache")) implementation(project(":db")) implementation(project(":json")) + implementation(project(":net")) + implementation(project(":protocol")) implementation(project(":util")) implementation("com.google.guava:guava:${Versions.guava}") implementation("org.flywaydb:flyway-core:${Versions.flyway}") diff --git a/archive/src/main/kotlin/org/openrs2/archive/ArchiveModule.kt b/archive/src/main/kotlin/org/openrs2/archive/ArchiveModule.kt index 584293ff..e4f5f67e 100644 --- a/archive/src/main/kotlin/org/openrs2/archive/ArchiveModule.kt +++ b/archive/src/main/kotlin/org/openrs2/archive/ArchiveModule.kt @@ -2,6 +2,7 @@ package org.openrs2.archive import com.google.inject.AbstractModule import com.google.inject.Scopes +import dev.openrs2.net.NetworkModule import org.openrs2.buffer.BufferModule import org.openrs2.db.Database import org.openrs2.json.JsonModule @@ -9,6 +10,7 @@ import org.openrs2.json.JsonModule public object ArchiveModule : AbstractModule() { override fun configure() { install(BufferModule) + install(NetworkModule) install(JsonModule) bind(Database::class.java) diff --git a/archive/src/main/kotlin/org/openrs2/archive/cache/CacheCommand.kt b/archive/src/main/kotlin/org/openrs2/archive/cache/CacheCommand.kt index 1f57945d..2db20266 100644 --- a/archive/src/main/kotlin/org/openrs2/archive/cache/CacheCommand.kt +++ b/archive/src/main/kotlin/org/openrs2/archive/cache/CacheCommand.kt @@ -6,6 +6,7 @@ import com.github.ajalt.clikt.core.subcommands public class CacheCommand : NoOpCliktCommand(name = "cache") { init { subcommands( + DownloadCommand(), ImportCommand(), ImportMasterIndexCommand(), ExportCommand() diff --git a/archive/src/main/kotlin/org/openrs2/archive/cache/CacheDownloader.kt b/archive/src/main/kotlin/org/openrs2/archive/cache/CacheDownloader.kt new file mode 100644 index 00000000..60cf76fb --- /dev/null +++ b/archive/src/main/kotlin/org/openrs2/archive/cache/CacheDownloader.kt @@ -0,0 +1,28 @@ +package org.openrs2.archive.cache + +import dev.openrs2.net.BootstrapFactory +import dev.openrs2.net.suspend +import javax.inject.Inject +import javax.inject.Singleton +import kotlin.coroutines.suspendCoroutine + +@Singleton +public class CacheDownloader @Inject constructor( + private val bootstrapFactory: BootstrapFactory, + private val importer: CacheImporter +) { + public suspend fun download(hostname: String, port: Int, version: Int) { + val group = bootstrapFactory.createEventLoopGroup() + try { + suspendCoroutine { continuation -> + val bootstrap = bootstrapFactory.createBootstrap(group) + val handler = Js5ChannelHandler(bootstrap, hostname, port, version, continuation, importer) + + bootstrap.handler(Js5ChannelInitializer(handler)) + .connect(hostname, port) + } + } finally { + group.shutdownGracefully().suspend() + } + } +} diff --git a/archive/src/main/kotlin/org/openrs2/archive/cache/CacheImporter.kt b/archive/src/main/kotlin/org/openrs2/archive/cache/CacheImporter.kt index d4b77bde..6badb3b6 100644 --- a/archive/src/main/kotlin/org/openrs2/archive/cache/CacheImporter.kt +++ b/archive/src/main/kotlin/org/openrs2/archive/cache/CacheImporter.kt @@ -4,6 +4,7 @@ import io.netty.buffer.ByteBuf import io.netty.buffer.ByteBufAllocator import io.netty.buffer.ByteBufUtil import io.netty.buffer.DefaultByteBufHolder +import io.netty.buffer.Unpooled import org.openrs2.buffer.crc32 import org.openrs2.buffer.use import org.openrs2.cache.Js5Archive @@ -28,13 +29,13 @@ public class CacheImporter @Inject constructor( private val database: Database, private val alloc: ByteBufAllocator ) { - private abstract class Container( + public abstract class Container( data: ByteBuf ) : DefaultByteBufHolder(data) { - val bytes: ByteArray = ByteBufUtil.getBytes(data, data.readerIndex(), data.readableBytes(), false) - val crc32: Int = data.crc32() - val whirlpool: ByteArray = Whirlpool.whirlpool(bytes) - abstract val encrypted: Boolean + public val bytes: ByteArray = ByteBufUtil.getBytes(data, data.readerIndex(), data.readableBytes(), false) + public val crc32: Int = data.crc32() + public val whirlpool: ByteArray = Whirlpool.whirlpool(bytes) + public abstract val encrypted: Boolean } private class MasterIndex( @@ -44,18 +45,18 @@ public class CacheImporter @Inject constructor( override val encrypted: Boolean = false } - private class Index( - val index: Js5Index, + public class Index( + public val index: Js5Index, data: ByteBuf ) : Container(data) { override val encrypted: Boolean = false } - private class Group( - val archive: Int, - val group: Int, + public class Group( + public val archive: Int, + public val group: Int, data: ByteBuf, - val version: Int, + public val version: Int, override val encrypted: Boolean ) : Container(data) @@ -126,6 +127,141 @@ public class CacheImporter @Inject constructor( } } + public suspend fun importMasterIndexAndGetIndexes(masterIndex: Js5MasterIndex, buf: ByteBuf): List { + return database.execute { connection -> + prepare(connection) + addMasterIndex(connection, MasterIndex(masterIndex, buf)) + + connection.prepareStatement( + """ + CREATE TEMPORARY TABLE tmp_indexes ( + archive_id uint1 NOT NULL, + crc32 INTEGER NOT NULL, + version INTEGER NOT NULL + ) ON COMMIT DROP + """.trimIndent() + ).use { stmt -> + stmt.execute() + } + + connection.prepareStatement( + """ + INSERT INTO tmp_indexes (archive_id, crc32, version) + VALUES (?, ?, ?) + """.trimIndent() + ).use { stmt -> + for ((i, entry) in masterIndex.entries.withIndex()) { + stmt.setInt(1, i) + stmt.setInt(2, entry.checksum) + stmt.setInt(3, entry.version) + + stmt.addBatch() + } + + stmt.executeBatch() + } + + connection.prepareStatement( + """ + SELECT c.data + FROM tmp_indexes t + LEFT JOIN containers c ON c.crc32 = t.crc32 + LEFT JOIN indexes i ON i.version = t.version AND i.container_id = c.id + ORDER BY t.archive_id ASC + """.trimIndent() + ).use { stmt -> + stmt.executeQuery().use { rows -> + val indexes = mutableListOf() + try { + while (rows.next()) { + val bytes = rows.getBytes(1) + if (bytes != null) { + indexes += Unpooled.wrappedBuffer(bytes) + } else { + indexes += null + } + } + + indexes.filterNotNull().forEach(ByteBuf::retain) + return@execute indexes + } finally { + indexes.filterNotNull().forEach(ByteBuf::release) + } + } + } + } + } + + public suspend fun importIndexAndGetMissingGroups(archive: Int, index: Js5Index, buf: ByteBuf): List { + return database.execute { connection -> + prepare(connection) + addIndex(connection, Index(index, buf)) + + connection.prepareStatement( + """ + CREATE TEMPORARY TABLE tmp_groups ( + group_id INTEGER NOT NULL, + crc32 INTEGER NOT NULL, + version INTEGER NOT NULL + ) ON COMMIT DROP + """.trimIndent() + ).use { stmt -> + stmt.execute() + } + + connection.prepareStatement( + """ + INSERT INTO tmp_groups (group_id, crc32, version) + VALUES (?, ?, ?) + """.trimIndent() + ).use { stmt -> + for (entry in index) { + stmt.setInt(1, entry.id) + stmt.setInt(2, entry.checksum) + stmt.setInt(3, entry.version) + + stmt.addBatch() + } + + stmt.executeBatch() + } + + connection.prepareStatement( + """ + SELECT t.group_id + FROM tmp_groups t + LEFT JOIN groups g ON g.archive_id = ? AND g.group_id = t.group_id AND g.truncated_version = t.version & 65535 + LEFT JOIN containers c ON c.id = g.container_id AND c.crc32 = t.crc32 + WHERE g.container_id IS NULL + ORDER BY t.group_id ASC + """.trimIndent() + ).use { stmt -> + stmt.setInt(1, archive) + + stmt.executeQuery().use { rows -> + val groups = mutableListOf() + + while (rows.next()) { + groups += rows.getInt(1) + } + + return@execute groups + } + } + } + } + + public suspend fun importGroups(groups: List) { + if (groups.isEmpty()) { + return + } + + database.execute { connection -> + prepare(connection) + addGroups(connection, groups) + } + } + private fun createMasterIndex(store: Store): MasterIndex { val index = Js5MasterIndex.create(store) @@ -386,7 +522,7 @@ public class CacheImporter @Inject constructor( return ids } - private companion object { - private const val BATCH_SIZE = 1024 + public companion object { + public const val BATCH_SIZE: Int = 1024 } } diff --git a/archive/src/main/kotlin/org/openrs2/archive/cache/DownloadCommand.kt b/archive/src/main/kotlin/org/openrs2/archive/cache/DownloadCommand.kt new file mode 100644 index 00000000..984838f0 --- /dev/null +++ b/archive/src/main/kotlin/org/openrs2/archive/cache/DownloadCommand.kt @@ -0,0 +1,15 @@ +package org.openrs2.archive.cache + +import com.github.ajalt.clikt.core.CliktCommand +import com.google.inject.Guice +import kotlinx.coroutines.runBlocking +import org.openrs2.archive.ArchiveModule + +public class DownloadCommand : CliktCommand(name = "download") { + override fun run(): Unit = runBlocking { + val injector = Guice.createInjector(ArchiveModule) + val downloader = injector.getInstance(CacheDownloader::class.java) + // TODO(gpe): make these configurable and/or fetch from the database + downloader.download("oldschool1.runescape.com", 43594, 193) + } +} diff --git a/archive/src/main/kotlin/org/openrs2/archive/cache/Js5ChannelHandler.kt b/archive/src/main/kotlin/org/openrs2/archive/cache/Js5ChannelHandler.kt new file mode 100644 index 00000000..b0d9afa6 --- /dev/null +++ b/archive/src/main/kotlin/org/openrs2/archive/cache/Js5ChannelHandler.kt @@ -0,0 +1,176 @@ +package org.openrs2.archive.cache + +import io.netty.bootstrap.Bootstrap +import io.netty.buffer.ByteBuf +import io.netty.channel.ChannelHandler +import io.netty.channel.ChannelHandlerContext +import io.netty.channel.SimpleChannelInboundHandler +import kotlinx.coroutines.runBlocking +import org.openrs2.buffer.use +import org.openrs2.cache.Js5Archive +import org.openrs2.cache.Js5Compression +import org.openrs2.cache.Js5Index +import org.openrs2.cache.Js5MasterIndex +import org.openrs2.protocol.Rs2Decoder +import org.openrs2.protocol.Rs2Encoder +import org.openrs2.protocol.js5.Js5Request +import org.openrs2.protocol.js5.Js5RequestEncoder +import org.openrs2.protocol.js5.Js5Response +import org.openrs2.protocol.js5.Js5ResponseDecoder +import org.openrs2.protocol.js5.XorDecoder +import org.openrs2.protocol.login.LoginRequest +import org.openrs2.protocol.login.LoginResponse +import kotlin.coroutines.Continuation +import kotlin.coroutines.resume +import kotlin.coroutines.resumeWithException + +@ChannelHandler.Sharable +public class Js5ChannelHandler( + private val bootstrap: Bootstrap, + private val hostname: String, + private val port: Int, + private var version: Int, + private val continuation: Continuation, + private val importer: CacheImporter, + private val maxInFlightRequests: Int = 200, + maxVersionAttempts: Int = 10 +) : SimpleChannelInboundHandler(Object::class.java) { + private val maxVersion = version + maxVersionAttempts + private val inFlightRequests = mutableSetOf() + private val pendingRequests = ArrayDeque() + private lateinit var indexes: Array + private val groups = mutableListOf() + + override fun channelActive(ctx: ChannelHandlerContext) { + ctx.writeAndFlush(LoginRequest.InitJs5RemoteConnection(version), ctx.voidPromise()) + ctx.read() + } + + override fun channelRead0(ctx: ChannelHandlerContext, msg: Any) { + when (msg) { + is LoginResponse.Js5Ok -> handleOk(ctx) + is LoginResponse.ClientOutOfDate -> handleClientOutOfDate(ctx) + is LoginResponse -> throw Exception("Invalid response: $msg") + is Js5Response -> handleResponse(ctx, msg) + else -> throw Exception("Unknown message type: ${msg.javaClass.name}") + } + } + + override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) { + releaseGroups() + ctx.close() + continuation.resumeWithException(cause) + } + + private fun handleOk(ctx: ChannelHandlerContext) { + val pipeline = ctx.pipeline() + + pipeline.remove(Rs2Encoder::class.java) + pipeline.remove(Rs2Decoder::class.java) + pipeline.addFirst( + Js5RequestEncoder, + XorDecoder(), + Js5ResponseDecoder() + ) + + val request = Js5Request.Group(false, Js5Archive.ARCHIVESET, Js5Archive.ARCHIVESET) + pendingRequests += request + flushRequests(ctx) + } + + private fun handleClientOutOfDate(ctx: ChannelHandlerContext) { + if (++version > maxVersion) { + throw Exception("Failed to identify current version") + } + + ctx.close() + bootstrap.connect(hostname, port) + } + + private fun handleResponse(ctx: ChannelHandlerContext, response: Js5Response) { + val request = Js5Request.Group(response.prefetch, response.archive, response.group) + + val removed = inFlightRequests.remove(request) + if (!removed) { + throw Exception("Received response for request not in-flight") + } + + if (response.archive == Js5Archive.ARCHIVESET && response.group == Js5Archive.ARCHIVESET) { + processMasterIndex(response.data) + } else if (response.archive == Js5Archive.ARCHIVESET) { + processIndex(response.group, response.data) + } else { + val version = indexes[response.archive]!![response.group]!!.version + val encrypted = Js5Compression.isEncrypted(response.data.slice()) + groups += CacheImporter.Group(response.archive, response.group, response.data.retain(), version, encrypted) + } + + val complete = pendingRequests.isEmpty() && inFlightRequests.isEmpty() + + if (groups.size >= CacheImporter.BATCH_SIZE || complete) { + runBlocking { + importer.importGroups(groups) + } + + releaseGroups() + } + + if (complete) { + ctx.close() + continuation.resume(Unit) + } else { + flushRequests(ctx) + } + } + + private fun processMasterIndex(buf: ByteBuf) { + val masterIndex = Js5Compression.uncompress(buf.slice()).use { uncompressed -> + Js5MasterIndex.read(uncompressed) + } + + val rawIndexes = runBlocking { importer.importMasterIndexAndGetIndexes(masterIndex, buf) } + try { + indexes = arrayOfNulls(rawIndexes.size) + + for ((archive, index) in rawIndexes.withIndex()) { + if (index != null) { + processIndex(archive, index) + } else { + pendingRequests += Js5Request.Group(false, Js5Archive.ARCHIVESET, archive) + } + } + } finally { + rawIndexes.filterNotNull().forEach(ByteBuf::release) + } + } + + private fun processIndex(archive: Int, buf: ByteBuf) { + val index = Js5Compression.uncompress(buf.slice()).use { uncompressed -> + Js5Index.read(uncompressed) + } + indexes[archive] = index + + val groups = runBlocking { + importer.importIndexAndGetMissingGroups(archive, index, buf) + } + for (group in groups) { + pendingRequests += Js5Request.Group(false, archive, group) + } + } + + private fun flushRequests(ctx: ChannelHandlerContext) { + while (inFlightRequests.size < maxInFlightRequests) { + val request = pendingRequests.removeFirstOrNull() ?: break + inFlightRequests += request + ctx.write(request, ctx.voidPromise()) + } + + ctx.flush() + ctx.read() + } + + private fun releaseGroups() { + groups.forEach(CacheImporter.Group::release) + groups.clear() + } +} diff --git a/archive/src/main/kotlin/org/openrs2/archive/cache/Js5ChannelInitializer.kt b/archive/src/main/kotlin/org/openrs2/archive/cache/Js5ChannelInitializer.kt new file mode 100644 index 00000000..e484cfc3 --- /dev/null +++ b/archive/src/main/kotlin/org/openrs2/archive/cache/Js5ChannelInitializer.kt @@ -0,0 +1,22 @@ +package org.openrs2.archive.cache + +import io.netty.channel.Channel +import io.netty.channel.ChannelInitializer +import org.openrs2.protocol.Protocol +import org.openrs2.protocol.Rs2Decoder +import org.openrs2.protocol.Rs2Encoder +import org.openrs2.protocol.login.ClientOutOfDateCodec +import org.openrs2.protocol.login.InitJs5RemoteConnectionCodec +import org.openrs2.protocol.login.IpLimitCodec +import org.openrs2.protocol.login.Js5OkCodec +import org.openrs2.protocol.login.ServerFullCodec + +public class Js5ChannelInitializer(private val handler: Js5ChannelHandler) : ChannelInitializer() { + override fun initChannel(ch: Channel) { + ch.pipeline().addLast( + Rs2Encoder(Protocol(InitJs5RemoteConnectionCodec)), + Rs2Decoder(Protocol(Js5OkCodec, ClientOutOfDateCodec, IpLimitCodec, ServerFullCodec)), + handler + ) + } +} diff --git a/buildSrc/src/main/kotlin/Versions.kt b/buildSrc/src/main/kotlin/Versions.kt index 8d8ce09b..a53464c9 100644 --- a/buildSrc/src/main/kotlin/Versions.kt +++ b/buildSrc/src/main/kotlin/Versions.kt @@ -24,6 +24,7 @@ object Versions { const val kotlinter = "3.3.0" const val logback = "1.2.3" const val netty = "4.1.56.Final" + const val nettyIoUring = "0.0.2.Final" const val openrs2Natives = "3.0.0" const val postgres = "42.2.18" const val shadowPlugin = "6.1.0" diff --git a/net/build.gradle.kts b/net/build.gradle.kts new file mode 100644 index 00000000..d63401e0 --- /dev/null +++ b/net/build.gradle.kts @@ -0,0 +1,32 @@ +plugins { + `maven-publish` + kotlin("jvm") +} + +dependencies { + api("com.google.inject:guice:${Versions.guice}") + api("io.netty:netty-transport:${Versions.netty}") + api("org.jetbrains.kotlinx:kotlinx-coroutines-core:${Versions.kotlinCoroutines}") + + implementation(project(":buffer")) + implementation("io.netty:netty-transport-native-epoll:${Versions.netty}:linux-aarch_64") + implementation("io.netty:netty-transport-native-epoll:${Versions.netty}:linux-x86_64") + implementation("io.netty:netty-transport-native-kqueue:${Versions.netty}:osx-x86_64") + implementation("io.netty.incubator:netty-incubator-transport-native-io_uring:${Versions.nettyIoUring}:linux-x86_64") +} + +publishing { + publications.create("maven") { + from(components["java"]) + + pom { + packaging = "jar" + name.set("OpenRS2 Network") + description.set( + """ + Common Netty utility code. + """.trimIndent() + ) + } + } +} diff --git a/net/src/main/kotlin/dev/openrs2/net/BootstrapFactory.kt b/net/src/main/kotlin/dev/openrs2/net/BootstrapFactory.kt new file mode 100644 index 00000000..d8d225af --- /dev/null +++ b/net/src/main/kotlin/dev/openrs2/net/BootstrapFactory.kt @@ -0,0 +1,74 @@ +package dev.openrs2.net + +import io.netty.bootstrap.Bootstrap +import io.netty.bootstrap.ServerBootstrap +import io.netty.buffer.ByteBufAllocator +import io.netty.channel.ChannelOption +import io.netty.channel.EventLoopGroup +import io.netty.channel.epoll.Epoll +import io.netty.channel.epoll.EpollEventLoopGroup +import io.netty.channel.epoll.EpollServerSocketChannel +import io.netty.channel.epoll.EpollSocketChannel +import io.netty.channel.kqueue.KQueue +import io.netty.channel.kqueue.KQueueEventLoopGroup +import io.netty.channel.kqueue.KQueueServerSocketChannel +import io.netty.channel.kqueue.KQueueSocketChannel +import io.netty.channel.nio.NioEventLoopGroup +import io.netty.channel.socket.nio.NioServerSocketChannel +import io.netty.channel.socket.nio.NioSocketChannel +import io.netty.incubator.channel.uring.IOUring +import io.netty.incubator.channel.uring.IOUringEventLoopGroup +import io.netty.incubator.channel.uring.IOUringServerSocketChannel +import io.netty.incubator.channel.uring.IOUringSocketChannel +import javax.inject.Inject +import javax.inject.Singleton + +@Singleton +public class BootstrapFactory @Inject constructor( + private val alloc: ByteBufAllocator +) { + public fun createEventLoopGroup(): EventLoopGroup { + return when { + IOUring.isAvailable() -> IOUringEventLoopGroup() + Epoll.isAvailable() -> EpollEventLoopGroup() + KQueue.isAvailable() -> KQueueEventLoopGroup() + else -> NioEventLoopGroup() + } + } + + public fun createBootstrap(group: EventLoopGroup): Bootstrap { + val channel = when (group) { + is IOUringEventLoopGroup -> IOUringSocketChannel::class.java + is EpollEventLoopGroup -> EpollSocketChannel::class.java + is KQueueEventLoopGroup -> KQueueSocketChannel::class.java + is NioEventLoopGroup -> NioSocketChannel::class.java + else -> throw IllegalArgumentException("Unknown EventLoopGroup type") + } + + return Bootstrap() + .group(group) + .channel(channel) + .option(ChannelOption.ALLOCATOR, alloc) + .option(ChannelOption.AUTO_READ, false) + .option(ChannelOption.TCP_NODELAY, true) + } + + public fun createServerBootstrap(group: EventLoopGroup): ServerBootstrap { + val channel = when (group) { + is IOUringEventLoopGroup -> IOUringServerSocketChannel::class.java + is EpollEventLoopGroup -> EpollServerSocketChannel::class.java + is KQueueEventLoopGroup -> KQueueServerSocketChannel::class.java + is NioEventLoopGroup -> NioServerSocketChannel::class.java + else -> throw IllegalArgumentException("Unknown EventLoopGroup type") + } + + return ServerBootstrap() + .group(group) + .channel(channel) + .option(ChannelOption.ALLOCATOR, alloc) + .option(ChannelOption.AUTO_READ, false) + .childOption(ChannelOption.ALLOCATOR, alloc) + .childOption(ChannelOption.AUTO_READ, false) + .childOption(ChannelOption.TCP_NODELAY, true) + } +} diff --git a/net/src/main/kotlin/dev/openrs2/net/FutureExtensions.kt b/net/src/main/kotlin/dev/openrs2/net/FutureExtensions.kt new file mode 100644 index 00000000..1fc3c9ef --- /dev/null +++ b/net/src/main/kotlin/dev/openrs2/net/FutureExtensions.kt @@ -0,0 +1,26 @@ +package dev.openrs2.net + +import io.netty.util.concurrent.Future +import kotlin.coroutines.resume +import kotlin.coroutines.resumeWithException +import kotlin.coroutines.suspendCoroutine + +public suspend fun Future.suspend(): V { + if (isDone) { + if (isSuccess) { + return now + } else { + throw cause() + } + } + + return suspendCoroutine { continuation -> + addListener { + if (isSuccess) { + continuation.resume(now) + } else { + continuation.resumeWithException(cause()) + } + } + } +} diff --git a/net/src/main/kotlin/dev/openrs2/net/NetworkModule.kt b/net/src/main/kotlin/dev/openrs2/net/NetworkModule.kt new file mode 100644 index 00000000..7b7e78e8 --- /dev/null +++ b/net/src/main/kotlin/dev/openrs2/net/NetworkModule.kt @@ -0,0 +1,10 @@ +package dev.openrs2.net + +import com.google.inject.AbstractModule +import org.openrs2.buffer.BufferModule + +public object NetworkModule : AbstractModule() { + override fun configure() { + install(BufferModule) + } +} diff --git a/settings.gradle.kts b/settings.gradle.kts index 88abfe12..4fdd037a 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -20,6 +20,7 @@ include( "deob-util", "game", "json", + "net", "nonfree", "nonfree:client", "nonfree:gl",