raboof commented on PR #2409:
URL: https://github.com/apache/pekko/pull/2409#issuecomment-3467244878

   > So I have been thinking about this, and I am coming to the conclusion that 
I should just reimplement the zstd compressor from scratch using the low level 
zstd-jni API (i.e. `public int compress(byte[] dst, byte[] src)` within the 
`ZstdCompressCtx` class. The reasoning is as follows
   > 
   > * `ZstdDirectBufferCompressingStreamNoFinalizer` and related classes were 
designed to work with sources that provide direct `ByteBuffer`'s i.e. 
Netty/Java NIO however in our case we are dealing with `ByteString` where we 
don't have direct `ByteBuffer`'s (`ByteString.asByteBuffer` is not a direct 
byte buffer) so we always need up having to copy into an array that then gets 
pushed into a direct `ByteBuffer` before going into the compress function. 
Using `ZstdCompressCtx` we can just directly feed/get the byte arrays.
   
   This seems like a convincing argument
   
   > * Now lets get onto buffering
   >       
   >   * When it comes to compression, whether we want to buffer or not 
actually depends on the size of the `ByteString` element thats incoming onto 
the stream.
   >   * If the element happens to be really "big", its actually ideal to 
compress that element as is without feeding it into any buffer as the larger 
the element, the better the compression ratio is. In this case we can just use 
`input.toByteArrayUnsafe` and feed it directly into `ZstdCompressCtx.compress`, 
even better we don't need to worry about any copying in happy path case for 
small `ByteString`.
   
   This sounds like the resulting bytes will be different depending on how the 
input was split, which could be nondeterministic (e.g. depending on network 
conditions). I guess it's fine since this will most likely be used for 
'ephemeral' streams, we can 'blame' the nondeterminism on the component that 
does the splitting, and someone who cares about determinism could insert a 
component to convert the input into fixed-size chunks.
   
   >   * If the input `ByteStrings` is "small", then we can keep on asking for 
elements to buffer and once it hits a constant size limit we can then feed it 
into `ZstdCompressCtx.compress`
   
   This is a trade-off between latency and compression performance: in a naive 
implementation, if we for instance 'keep asking elements to buffer' on the 
boundary of 2 elements of a gRPC stream, it would delay sending+processing the 
first element while waiting for the next element to show up - which could take 
a long time on 'event streams', for example. It's possible to avoid this 
problem: we could 'keep asking for elements to buffer' only when there's 
currently no demand from downstream, but compress and send 'everything we have' 
as soon as demand is signaled. That might be significantly more complex, though.
   
   > * When it comes to decompression, the logic can remain as is. We don't 
need to use a `DirectByteBufferPool` because we only create 2 buffers per 
stream instantiation (**not** per element of stream) and those `ByteBuffer`'s 
are reused for each incoming element, albeit it should be updated so that those 
`ByteBuffer`'s get cleaned up with `if (ByteBufferCleaner.isSupported) 
ByteBufferCleaner.clean(byteBuffer)`. Using `ZstdDecompressCtx` isn't going to 
gain us much net benefit as we need to use the native streaming capabilities of 
zstd-jni to accept incoming data until we can get to a point where we can 
decompress.
   
   Sounds good to me.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to