mdedetrich commented on PR #2409:
URL: https://github.com/apache/pekko/pull/2409#issuecomment-3469014680

   > You can ignore the copy cost, compression is not cheap anyway. Also, the 
copying is likely happening anyways as you otherwise have to globally lock the 
heap array and interfere with GC...
   
   Yes definitely true, my main point here is that even though the copying is 
already happening behind the scenes and in context of compression the copying 
is cheap, doing this
   
   ```scala
   ByteString.fromArrayUnsafe(zstdCompressCtx.compress(input.toArrayUnsafe()))
   ```
   
   is far simpler and easier to maintain then
   
   ```scala
   val inputBB = ByteBuffer.allocateDirect(input.size)
   inputBB.put(input.toArray)
   inputBB.flip()
   compressingStream.compress(inputBB)
   compressingStream.flush()
   targetBuffer.flip()
   
   val arr = new Array[Byte](targetBuffer.limit())
   targetBuffer.get(arr)
   targetBuffer.clear()
   ByteString.fromArrayUnsafe(arr)
   ```
   
   And that version isn't even the ideal one since we are allocating a direct 
`ByteBuffer` on every element (which we shouldn't be doing since direct 
`ByteBuffer` allocation is extremely expensive). Rather we should get a direct 
`ByteBuffer` from a `DirectByteBufferPool` which will complicate the logic even 
further since `DirectByteBufferPool` only supports direct `ByteBuffer`'s of a 
static size (and in our case we have a dynamic `input.size` length)
   
   > It's a complex topic how and when a (streaming) compressor should outputs 
compressed data. Roughly speaking, for best compression, you leave the decision 
completely to the compressor. For more interactive usage you want to send any 
data as early as possible (kind of a nagle situation but for compression). The 
compressor cannot know what you want and we cannot know what the user wants. 
Also note, that streaming does not equal interactive usage, you might want to 
stream data to run with less memory, so for the (native) compressor API you 
don't want to make the size of the chunks part of the decision of when to flush.
   
   So my mentality behind is 
https://github.com/apache/pekko/pull/2409#issuecomment-3462846239 is that with 
all else being equal, its always better to compress larger chunks of data than 
small. Now the critical part here is all else being equal, what @raboof said 
earlier is entirely correct in that even if we have stream elements as small as 
1 byte (or even less), for some kind of applications you want to compress and 
send as fast as possible due to latency reasons.
   
   > For our streaming API, the problem is that we cannot easily provide the 
signal when to flush using a raw ByteString based stream. For that reason, in 
the deflate compressor we allow to set a flush mode and flush after each 
ByteString.
   
   Unless I am missing something I don't think the flush mode is configurable 
as of now, in fact you even left a comment and an issue saying it should be 
configurable (see 
https://github.com/apache/pekko/blob/085bb4275a70d8db4c2c53946712ee6b54e74688/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Compression.scala#L29-L34)
   
   Also on the same point, are you saying that @raboof 's suggestion i.e.
   
   > It's possible to avoid this problem: we could 'keep asking for elements to 
buffer' only when there's currently no demand from downstream, but compress and 
send 'everything we have' as soon as demand is signaled. That might be 
significantly more complex, though.
   
   Is not possible or is it just overcomplicated/not necessary? To me, with the 
assumption that we can reliably figure out if downstream is demanding data this 
should be able to get the best of both worlds, no? i.e. if downstream is asking 
for data, we just compress the input element as is and if downstream is not 
asking for data then we can buffer elements (up to a sane limit) and we send 
the compressed data as soon as that limit is hit and/or downstream asks for 
demand again.
   
   Also if this suggested logic ends up being implemented, it would be 
configurable so users can disable it which would just compress each element as 
it is.
   
   > That way the user still has full control over when the flushing does 
happen (by buffering data in the stream if required). We should not add any 
inner logic on top of that. Let's not overthink the interface between our 
compression component and the native component. In general, if we can provide 
all of incoming data in one go to the compressor that's ok because it's already 
on the heap anyway and owned by us. But we should never hold anything back.
   
   If I am understanding what you are saying, then essentially you are 
suggesting that the implementation of the compressor would be a trivial 
`SimpleLinearGraphStage[ByteString]` that on each incoming processing element 
just does 
`ByteString.fromArrayUnsafe(zstdCompressCtx.compress(input.toArrayUnsafe()))` 
and we can call it a day?
   
   Don't have to worry about flushing at all, or buffering or anything like 
that and if a user wants buffering then they can just use the standard 
streaming operators to implement it on their own.
   
   > Btw. zstd is somewhat different from deflate in that regard, you can only 
ever decompress full blocks while deflate can at least in theory start 
streaming while still receiving a block (see 
[facebook/zstd#1359](https://github.com/facebook/zstd/issues/1359) and 
https://fastcompression.blogspot.com/2016/04/working-with-streaming.html, the 
tldr; is that a compressed block is max 128kb anyway and that usually dwarfs 
any LZ77 context buffers you might have to keep around anyways).
   
   Thanks for reminding, Zstd actually provides a constant called 
`Zstd.maxBlockSize` which should be the default value for `maxBytesPerChunk`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to