[
https://issues.apache.org/jira/browse/KAFKA-15602?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Matthias J. Sax resolved KAFKA-15602.
-------------------------------------
Fix Version/s: 3.4.2
3.5.2
3.7.0
3.6.1
Assignee: Matthias J. Sax
Resolution: Fixed
As discussed, reverted this in all applicable branches.
> Breaking change in 3.4.0 ByteBufferSerializer
> ---------------------------------------------
>
> Key: KAFKA-15602
> URL: https://issues.apache.org/jira/browse/KAFKA-15602
> Project: Kafka
> Issue Type: Bug
> Components: producer
> Affects Versions: 3.4.0, 3.5.0, 3.4.1, 3.6.0, 3.5.1
> Reporter: Luke Kirby
> Assignee: Matthias J. Sax
> Priority: Critical
> Fix For: 3.4.2, 3.5.2, 3.7.0, 3.6.1
>
>
> [This PR|https://github.com/apache/kafka/pull/12683/files] claims to have
> solved the situation described by KAFKA-4852, namely, to have
> ByteBufferSerializer respect ByteBuffers wrapping byte arrays with non-0
> offsets (or, put another way, to honor the buffer's position() as the start
> point to consume bytes from). Unfortunately, it failed to actually do this,
> and instead changed the expectations for how an input ByteBuffer's limit and
> position should be set before being provided to send() on a producer
> configured with ByteBufferSerializer. Code that worked with pre-3.4.0
> releases now produce 0-length messages instead of the intended messages,
> effectively introducing a breaking change for existing users of the
> serializer in the wild.
> Here are a few different inputs and serialized outputs under pre-3.4.0 and
> 3.4.0+ to summarize the breaking change:
> ||buffer argument||3.3.2 serialized output||3.4.0+ serialized output||
> |ByteBuffer.wrap("test".getBytes(UTF_8))|len=4
> val=test|len=4 val=test|
> |ByteBuffer.allocate(8).put("test".getBytes(UTF_8)).flip()|len=4
> val=test|len=0 val=|
> |ByteBuffer.allocate(8).put("test".getBytes(UTF_8))|len=8
> val=test<0><0><0><0>|len=4 val=test|
> |ByteBuffer buff = ByteBuffer.allocate(8).put("test".getBytes(UTF_8));
> buff.limit(buff.position());|len=4
> val=test|len=4 val=test|
> |ByteBuffer.wrap("test".getBytes(UTF_8), 1, 3)|len=4 val=test|len=1 val=t|
> Notably, plain-wrappers of byte arrays continue to work under both versions
> due to the special case in the serializer for them. I suspect that this is
> the dominant use-case, which is why this has apparently gone un-reported to
> this point. The wrapped-with-offset case fails for both cases for different
> reasons (the expected value would be "est"). As demonstrated here, you can
> ensure that a manually assembled ByteBuffer will work under both versions by
> ensuring that your buffers start have position == limit == message-length
> (and an actual desired start position of 0). Clearly, though, behavior has
> changed dramatically for the second and third case there, with the 3.3.2
> behavior, in my experience, aligning better with naive expectations.
> [Previously|https://github.com/apache/kafka/blob/35a0de32ee3823dfb548a1cd5d5faf4f7c99e4e0/clients/src/main/java/org/apache/kafka/common/serialization/ByteBufferSerializer.java],
> the serializer would just rewind() the buffer and respect the limit as the
> indicator as to how much data was in the buffer. So, essentially, the
> prevailing contract was that the data from position 0 (always!) up to the
> limit on the buffer would be serialized; so it was really just the limit that
> was honored. So if, per the original issue, you have a byte[] array wrapped
> with, say, ByteBuffer.wrap(bytes, 3, 5) then that will yield a ByteBuffer()
> with position = 3 indicating the desired start point to read from, but
> effectively ignored by the serializer due to the rewind().
> So while the serializer didn't work when presenting a ByteBuffer view onto a
> sub-view of a backing array, it did however follow expected behavior when
> employing standard patterns to populate ByteBuffers backed by
> larger-than-necessary arrays and using limit() to identify the end of actual
> data, consistent with conventional usage of flip() to switch from writing to
> a buffer to setting it up to be read from (e.g., to be passed into a
> producer.send() call). E.g.,
> {code:java}
> ByteBuffer bb = ByteBuffer.allocate(TOO_MUCH);
> ... // some sequence of
> bb.put(...); // populate buffer with some number of bytes less than TOO_MUCH
> ...
> bb.flip(); /* logically, this says "I am done writing, let's set this up for
> reading"; pragmatically, it sets the limit to the current position so that
> whoever reads the buffer knows when to stop reading, and sets the position to
> zero so it knows where to start reading from */
> producer.send(bb); {code}
> Technically, you wouldn't even need to use flip() there, since position is
> ignored; it would sufficient to just call {{{}bb.limit(bb.position()){}}}.
> Notably, a buffer constructed using any variant of ByteBuffer.wrap() is
> essentially immediately in read-mode with position indicating the start and
> limit the end.
> With the change introduced in 3.4.0, however, the contract changes
> dramatically, and the code just presented produces a 0-byte message. As
> indicated above, it also continues to fail if you just passed in an
> offset-specified ByteBuffer.wrap()ped message, too, i.e., the case described
> by KAFKA-4852:
> {code:java}
> @Test
> public void testByteBufferSerializerOnOffsetWrappedBytes() {
> final byte[] bytes = "Hello".getBytes(UTF_8);
> try (final ByteBufferSerializer serializer = new ByteBufferSerializer()) {
> assertArrayEquals("ello".getBytes(UTF_8),
> // FAILS: this will yield "H", not "ello"
> serializer.serialize(topic, ByteBuffer.wrap(bytes, 1,
> bytes.length - 1)));
> }
> }
> {code}
> What happened here?
> The resulting PR, it seems, focussed on a flawed proposed test case in the
> first comment of KAFKA-4852 that failed against pre-3.4.0 Kafka. I reproduce
> that here with commented annotations from me:
> {code:java}
> @Test // flawed proposed test case
> public void testByteBufferSerializer() {
> final byte[] bytes = "Hello".getBytes(UTF_8);
> final ByteBuffer buffer = ByteBuffer.allocate(7);
> buffer.put(bytes);
> // buffer.flip(); <-- would make the test work
> try (final ByteBufferSerializer serializer = new ByteBufferSerializer()) {
> assertArrayEquals(bytes, serializer.serialize(topic, buffer));
> }
> } {code}
> In pre-3.4.0, this would yield a 7-byte serialization of "Hello" followed by
> 2 0-value bytes. I contend that this was actually expected and correct
> behavior, as the ByteBuffer had never had its limit set, so the implicit and
> mildly expected contract was never actually abided by. If there was a
> buffer.flip() after the .put(bytes) call, as the calling code _should_ have
> applied, however, then the test would have succeeded. In short, by trying to
> make this test case succeed, I think this PR represented nothing more than a
> misunderstanding for how one should prepare ByteBuffers for reading, but has
> managed to result in a breaking change. The breaking nature of this was
> actually briefly noted in PR comments but kind of shrugged off with some test
> changes and explanatory comments on the class.
> Obviously a correction to restore 3.3.2 behavior would represent another
> breaking change for users that are running on 3.4+, unless they were also
> somewhat surprisingly configuring buffers for position() == limit() before
> passing them to send. Arguably, it would also be a breaking change (though
> possibly not one of great consequence) if either version was changed to
> correctly handle the wrapped-with-offset case as represented in the original
> ticket.
> I do not have much experience contending with a situation like this, but at
> the risk of jumping to a solution here, I wonder if the only way to really
> move forward safely and unambiguously here is to remove ByteBufferSerializer
> as it stands and replace it with a differently named substitute that handles
> both the plain-wrapped special case and just serializes content from
> position() to limit(), forcing an evaluation by users when upgrading as to
> whether the provided byte buffer is correctly configured or not. Of course, a
> change like that would have be released at an appropriate version level, too,
> so I don't know exactly what the desired interim behavior would be
> (deprecation?). I believe I would be eager to contribute to a fix, but
> obviously I would need guidance from maintainers regarding the correct path
> forward semantically.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)