|
|
@ -19,6 +19,7 @@ public class ByteBufBodyHandler @Inject constructor( |
|
|
|
return object : HttpResponse.BodySubscriber<ByteBuf> { |
|
|
|
return object : HttpResponse.BodySubscriber<ByteBuf> { |
|
|
|
private val buf = alloc.compositeBuffer() |
|
|
|
private val buf = alloc.compositeBuffer() |
|
|
|
private val future = CompletableFuture<ByteBuf>() |
|
|
|
private val future = CompletableFuture<ByteBuf>() |
|
|
|
|
|
|
|
private var len = 0 |
|
|
|
|
|
|
|
|
|
|
|
override fun onSubscribe(subscription: Flow.Subscription) { |
|
|
|
override fun onSubscribe(subscription: Flow.Subscription) { |
|
|
|
subscription.request(Long.MAX_VALUE) |
|
|
|
subscription.request(Long.MAX_VALUE) |
|
|
@ -26,7 +27,11 @@ public class ByteBufBodyHandler @Inject constructor( |
|
|
|
|
|
|
|
|
|
|
|
override fun onNext(item: List<ByteBuffer>) { |
|
|
|
override fun onNext(item: List<ByteBuffer>) { |
|
|
|
for (b in item) { |
|
|
|
for (b in item) { |
|
|
|
buf.addComponent(Unpooled.wrappedBuffer(b)) |
|
|
|
val component = Unpooled.wrappedBuffer(b) |
|
|
|
|
|
|
|
buf.addComponent(component) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
len += component.readableBytes() |
|
|
|
|
|
|
|
check(len >= 0) |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
@ -36,6 +41,7 @@ public class ByteBufBodyHandler @Inject constructor( |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
override fun onComplete() { |
|
|
|
override fun onComplete() { |
|
|
|
|
|
|
|
buf.writerIndex(len) |
|
|
|
future.complete(buf) |
|
|
|
future.complete(buf) |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|