diff --git a/buffer/src/main/kotlin/org/openrs2/buffer/ByteBufBodyHandler.kt b/buffer/src/main/kotlin/org/openrs2/buffer/ByteBufBodyHandler.kt new file mode 100644 index 00000000..de77a22e --- /dev/null +++ b/buffer/src/main/kotlin/org/openrs2/buffer/ByteBufBodyHandler.kt @@ -0,0 +1,47 @@ +package org.openrs2.buffer + +import io.netty.buffer.ByteBuf +import io.netty.buffer.ByteBufAllocator +import io.netty.buffer.Unpooled +import java.net.http.HttpResponse +import java.nio.ByteBuffer +import java.util.concurrent.CompletableFuture +import java.util.concurrent.CompletionStage +import java.util.concurrent.Flow +import javax.inject.Inject +import javax.inject.Singleton + +@Singleton +public class ByteBufBodyHandler @Inject constructor( + private val alloc: ByteBufAllocator = ByteBufAllocator.DEFAULT +) : HttpResponse.BodyHandler { + override fun apply(responseInfo: HttpResponse.ResponseInfo): HttpResponse.BodySubscriber { + return object : HttpResponse.BodySubscriber { + private val buf = alloc.compositeBuffer() + private val future = CompletableFuture() + + override fun onSubscribe(subscription: Flow.Subscription) { + subscription.request(Long.MAX_VALUE) + } + + override fun onNext(item: MutableList) { + for (b in item) { + buf.addComponent(Unpooled.wrappedBuffer(b)) + } + } + + override fun onError(throwable: Throwable) { + future.completeExceptionally(throwable) + buf.release() + } + + override fun onComplete() { + future.complete(buf) + } + + override fun getBody(): CompletionStage { + return future + } + } + } +}