raboof commented on PR #2409: URL: https://github.com/apache/pekko/pull/2409#issuecomment-3467244878
> So I have been thinking about this, and I am coming to the conclusion that I should just reimplement the zstd compressor from scratch using the low level zstd-jni API (i.e. `public int compress(byte[] dst, byte[] src)` within the `ZstdCompressCtx` class. The reasoning is as follows > > * `ZstdDirectBufferCompressingStreamNoFinalizer` and related classes were designed to work with sources that provide direct `ByteBuffer`'s i.e. Netty/Java NIO however in our case we are dealing with `ByteString` where we don't have direct `ByteBuffer`'s (`ByteString.asByteBuffer` is not a direct byte buffer) so we always need up having to copy into an array that then gets pushed into a direct `ByteBuffer` before going into the compress function. Using `ZstdCompressCtx` we can just directly feed/get the byte arrays. This seems like a convincing argument > * Now lets get onto buffering > > * When it comes to compression, whether we want to buffer or not actually depends on the size of the `ByteString` element thats incoming onto the stream. > * If the element happens to be really "big", its actually ideal to compress that element as is without feeding it into any buffer as the larger the element, the better the compression ratio is. In this case we can just use `input.toByteArrayUnsafe` and feed it directly into `ZstdCompressCtx.compress`, even better we don't need to worry about any copying in happy path case for small `ByteString`. This sounds like the resulting bytes will be different depending on how the input was split, which could be nondeterministic (e.g. depending on network conditions). I guess it's fine since this will most likely be used for 'ephemeral' streams, we can 'blame' the nondeterminism on the component that does the splitting, and someone who cares about determinism could insert a component to convert the input into fixed-size chunks. > * If the input `ByteStrings` is "small", then we can keep on asking for elements to buffer and once it hits a constant size limit we can then feed it into `ZstdCompressCtx.compress` This is a trade-off between latency and compression performance: in a naive implementation, if we for instance 'keep asking elements to buffer' on the boundary of 2 elements of a gRPC stream, it would delay sending+processing the first element while waiting for the next element to show up - which could take a long time on 'event streams', for example. It's possible to avoid this problem: we could 'keep asking for elements to buffer' only when there's currently no demand from downstream, but compress and send 'everything we have' as soon as demand is signaled. That might be significantly more complex, though. > * When it comes to decompression, the logic can remain as is. We don't need to use a `DirectByteBufferPool` because we only create 2 buffers per stream instantiation (**not** per element of stream) and those `ByteBuffer`'s are reused for each incoming element, albeit it should be updated so that those `ByteBuffer`'s get cleaned up with `if (ByteBufferCleaner.isSupported) ByteBufferCleaner.clean(byteBuffer)`. Using `ZstdDecompressCtx` isn't going to gain us much net benefit as we need to use the native streaming capabilities of zstd-jni to accept incoming data until we can get to a point where we can decompress. Sounds good to me. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
