From 1101f6e885711b39719f265446700bd2012fda66 Mon Sep 17 00:00:00 2001 From: Graham Date: Sat, 19 Jun 2021 15:49:19 +0100 Subject: [PATCH] Add ByteBufBodyHandler Allows a response from HttpClient to be efficiently read into a ByteBuf. Signed-off-by: Graham --- .../org/openrs2/buffer/ByteBufBodyHandler.kt | 47 +++++++++++++++++++ 1 file changed, 47 insertions(+) create mode 100644 buffer/src/main/kotlin/org/openrs2/buffer/ByteBufBodyHandler.kt diff --git a/buffer/src/main/kotlin/org/openrs2/buffer/ByteBufBodyHandler.kt b/buffer/src/main/kotlin/org/openrs2/buffer/ByteBufBodyHandler.kt new file mode 100644 index 0000000000..de77a22e88 --- /dev/null +++ b/buffer/src/main/kotlin/org/openrs2/buffer/ByteBufBodyHandler.kt @@ -0,0 +1,47 @@ +package org.openrs2.buffer + +import io.netty.buffer.ByteBuf +import io.netty.buffer.ByteBufAllocator +import io.netty.buffer.Unpooled +import java.net.http.HttpResponse +import java.nio.ByteBuffer +import java.util.concurrent.CompletableFuture +import java.util.concurrent.CompletionStage +import java.util.concurrent.Flow +import javax.inject.Inject +import javax.inject.Singleton + +@Singleton +public class ByteBufBodyHandler @Inject constructor( + private val alloc: ByteBufAllocator = ByteBufAllocator.DEFAULT +) : HttpResponse.BodyHandler { + override fun apply(responseInfo: HttpResponse.ResponseInfo): HttpResponse.BodySubscriber { + return object : HttpResponse.BodySubscriber { + private val buf = alloc.compositeBuffer() + private val future = CompletableFuture() + + override fun onSubscribe(subscription: Flow.Subscription) { + subscription.request(Long.MAX_VALUE) + } + + override fun onNext(item: MutableList) { + for (b in item) { + buf.addComponent(Unpooled.wrappedBuffer(b)) + } + } + + override fun onError(throwable: Throwable) { + future.completeExceptionally(throwable) + buf.release() + } + + override fun onComplete() { + future.complete(buf) + } + + override fun getBody(): CompletionStage { + return future + } + } + } +}