mdedetrich commented on PR #2409: URL: https://github.com/apache/pekko/pull/2409#issuecomment-3469014680
> You can ignore the copy cost, compression is not cheap anyway. Also, the copying is likely happening anyways as you otherwise have to globally lock the heap array and interfere with GC... Yes definitely true, my main point here is that even though the copying is already happening behind the scenes and in context of compression the copying is cheap, doing this ```scala ByteString.fromArrayUnsafe(zstdCompressCtx.compress(input.toArrayUnsafe())) ``` is far simpler and easier to maintain then ```scala val inputBB = ByteBuffer.allocateDirect(input.size) inputBB.put(input.toArray) inputBB.flip() compressingStream.compress(inputBB) compressingStream.flush() targetBuffer.flip() val arr = new Array[Byte](targetBuffer.limit()) targetBuffer.get(arr) targetBuffer.clear() ByteString.fromArrayUnsafe(arr) ``` And that version isn't even the ideal one since we are allocating a direct `ByteBuffer` on every element (which we shouldn't be doing since direct `ByteBuffer` allocation is extremely expensive). Rather we should get a direct `ByteBuffer` from a `DirectByteBufferPool` which will complicate the logic even further since `DirectByteBufferPool` only supports direct `ByteBuffer`'s of a static size (and in our case we have a dynamic `input.size` length) > It's a complex topic how and when a (streaming) compressor should outputs compressed data. Roughly speaking, for best compression, you leave the decision completely to the compressor. For more interactive usage you want to send any data as early as possible (kind of a nagle situation but for compression). The compressor cannot know what you want and we cannot know what the user wants. Also note, that streaming does not equal interactive usage, you might want to stream data to run with less memory, so for the (native) compressor API you don't want to make the size of the chunks part of the decision of when to flush. So my mentality behind is https://github.com/apache/pekko/pull/2409#issuecomment-3462846239 is that with all else being equal, its always better to compress larger chunks of data than small. Now the critical part here is all else being equal, what @raboof said earlier is entirely correct in that even if we have stream elements as small as 1 byte (or even less), for some kind of applications you want to compress and send as fast as possible due to latency reasons. > For our streaming API, the problem is that we cannot easily provide the signal when to flush using a raw ByteString based stream. For that reason, in the deflate compressor we allow to set a flush mode and flush after each ByteString. Unless I am missing something I don't think the flush mode is configurable as of now, in fact you even left a comment and an issue saying it should be configurable (see https://github.com/apache/pekko/blob/085bb4275a70d8db4c2c53946712ee6b54e74688/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Compression.scala#L29-L34) Also on the same point, are you saying that @raboof 's suggestion i.e. > It's possible to avoid this problem: we could 'keep asking for elements to buffer' only when there's currently no demand from downstream, but compress and send 'everything we have' as soon as demand is signaled. That might be significantly more complex, though. Is not possible or is it just overcomplicated/not necessary? To me, with the assumption that we can reliably figure out if downstream is demanding data this should be able to get the best of both worlds, no? i.e. if downstream is asking for data, we just compress the input element as is and if downstream is not asking for data then we can buffer elements (up to a sane limit) and we send the compressed data as soon as that limit is hit and/or downstream asks for demand again. Also if this suggested logic ends up being implemented, it would be configurable so users can disable it which would just compress each element as it is. > That way the user still has full control over when the flushing does happen (by buffering data in the stream if required). We should not add any inner logic on top of that. Let's not overthink the interface between our compression component and the native component. In general, if we can provide all of incoming data in one go to the compressor that's ok because it's already on the heap anyway and owned by us. But we should never hold anything back. If I am understanding what you are saying, then essentially you are suggesting that the implementation of the compressor would be a trivial `SimpleLinearGraphStage[ByteString]` that on each incoming processing element just does `ByteString.fromArrayUnsafe(zstdCompressCtx.compress(input.toArrayUnsafe()))` and we can call it a day? Don't have to worry about flushing at all, or buffering or anything like that and if a user wants buffering then they can just use the standard streaming operators to implement it on their own. > Btw. zstd is somewhat different from deflate in that regard, you can only ever decompress full blocks while deflate can at least in theory start streaming while still receiving a block (see [facebook/zstd#1359](https://github.com/facebook/zstd/issues/1359) and https://fastcompression.blogspot.com/2016/04/working-with-streaming.html, the tldr; is that a compressed block is max 128kb anyway and that usually dwarfs any LZ77 context buffers you might have to keep around anyways). Thanks for reminding, Zstd actually provides a constant called `Zstd.maxBlockSize` which should be the default value for `maxBytesPerChunk`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
