Hi Heesung

It is a good point.
Assume the producer application jvm restarts in the middle of chunking and
resends the message chunks from the beginning with the previous sequence id.

For the previous version, it should be:

Producer send:
1. SequenceID: 0, ChunkID: 0
2. SequenceID: 0, ChunkID: 1
3. SequenceID: 0, ChunkID: 0
4. SequenceID: 0, ChunkID: 1
5. SequenceID: 0, ChunkID: 2

Consumer receive:
1. SequenceID: 0, ChunkID: 0
2. SequenceID: 0, ChunkID: 1
3. SequenceID: 0, ChunkID: 0 // chunk ID out of order. Release this
chunk and recycle its `chunkedMsgCtx`.
4. SequenceID: 0, ChunkID: 1  // chunkedMsgCtx == null Release it.
5. SequenceID: 0, ChunkID: 2  // chunkedMsgCtx == null Release it.

Therefore, for the previous version, this chunk message can not be
received by the consumer. It is not an incompatibility issue.

However, the solution of optimizing the `uuid` is valuable to the new
implementation.
I will modify this in the PR[0]. Thank you very much for your reminder
and the provided UUID optimization solution.

BR,
Xiangying

[0] https://github.com/apache/pulsar/pull/20948

On Sat, Aug 26, 2023 at 8:48 AM Heesung Sohn
<heesung.s...@streamnative.io.invalid> wrote:
>
> Hi, I meant
>
> What if the producer application jvm restarts in the middle of chunking and
> resends the message chunks from the beginning with the previous sequence id?
>
>
>
> On Fri, Aug 25, 2023 at 5:15 PM Xiangying Meng <xiangy...@apache.org> wrote:
>
> > Hi Heesung
> >
> > It is a good idea to cover this incompatibility case if the producer
> > splits the chunk message again when retrying.
> >
> > But in fact, the producer only resents the chunks that are assembled
> > to `OpSendMsg` instead of splitting the chunk message again.
> > So, there is no incompatibility issue of resenting the chunk message
> > by splitting the chunk message again.
> >
> > The logic of sending chunk messages can be found here:
> >
> > https://github.com/apache/pulsar/blob/e0c481e5f8d7fa5534d3327785928a234376789e/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L533
> >
> > The logic of resending the message can be found here:
> >
> > https://github.com/apache/pulsar/blob/e0c481e5f8d7fa5534d3327785928a234376789e/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L1892
> >
> > BR,
> > Xiangying
> >
> >
> >
> >
> >
> >
> >
> > On Sat, Aug 26, 2023 at 2:24 AM Heesung Sohn
> > <heesung.s...@streamnative.io.invalid> wrote:
> > >
> > > >> I think brokers can track the last chunkMaxMessageSize for each
> > producer.
> > >
> > > > Using different chunkMaxMessageSize is just one of the aspects. In
> > > PIP-132 [0], we have included the message metadata size when checking
> > > maxMessageSize.The message metadata can be changed after splitting the
> > > chunks. We are still uncertain about the way the chunked message is
> > split,
> > > even using the same ss chunkMaxMessageSize.
> > >
> > > This sounds like we need to revisit chunking uuid logic.
> > > Like I commented here,
> > > https://github.com/apache/pulsar/pull/20948/files#r1305997883
> > > Why don't we add a chunk session id suffix to identify the ongoing
> > chunking
> > > uniquely?
> > >
> > > Currently,
> > >
> > > chunking uuid = producer + sequence_id
> > >
> > > Proposal
> > > chunking  uuid = producer + sequence_id + chunkingSessionId
> > >
> > > * chunkingSessionId could be a timestamp when the chunking started.
> > >
> > >
> > >
> > > On Fri, Aug 25, 2023 at 6:02 AM Xiangying Meng <xiangy...@apache.org>
> > wrote:
> > >
> > > > Hi Zike,
> > > >
> > > > >How would this happen to get two duplicated and consecutive ChunkID-1
> > > > >messages? The producer should guarantee to retry the whole chunked
> > > > >messages instead of some parts of the chunks.
> > > >
> > > > If the producer guarantees to retry the whole chunked messages instead
> > > > of some parts of the chunks,
> > > > Why doesn't the bug of the producer retry chunk messages in the PR [0]
> > > > appear?
> > > > And why do you need to set `chunkId` in `op.rePopulate`?
> > > > It will be rested when split chunk messages again if the producer
> > > > guarantees to retry the whole chunked messages.
> > > > ```
> > > > final MessageMetadata finalMsgMetadata = msgMetadata;
> > > > op.rePopulate = () -> {
> > > > if (msgMetadata.hasChunkId()) {
> > > > // The message metadata is shared between all chunks in a large message
> > > > // We need to reset the chunk id for each call of this method
> > > > // It's safe to do that because there is only 1 thread to manipulate
> > > > this message metadata
> > > > finalMsgMetadata.setChunkId(chunkId);
> > > > }
> > > > op.cmd = sendMessage(producerId, sequenceId, numMessages, messageId,
> > > > finalMsgMetadata,
> > > > encryptedPayload);
> > > > };
> > > >
> > > > ```
> > > >
> > > > >> But chunks 1, 2, 3, and 4 are still persisted in the topic.
> > > > >
> > > > >I think it's OK to persist them all on the topic. Is there any issue
> > > > >with doing that?
> > > >
> > > > This is just one scenario. Whether only check the sequence ID of the
> > > > first chunk (as I used in PR[1]) or check the sequence ID of the last
> > > > chunk (as you suggested), in reality, neither of these methods can
> > > > deduplicate chunks on the broker side because the broker cannot know
> > > > the chunk ID of the previous message.
> > > >
> > > > However, if combined with the optimization of consumer-side logic,
> > > > end-to-end deduplication can be completed.
> > > > This is also a less-than-perfect solution I mentioned in my first
> > > > email and implemented in PR[1].
> > > >
> > > > The reason I propose this proposal is not to solve the end-to-end
> > > > deduplication of chunk messages between producers and consumers. That
> > > > aspect has essentially been addressed in PR[1] and is still undergoing
> > > > review.
> > > >
> > > > This proposal aims to ensure that no corrupt data exists within the
> > > > topic, as our data might be offloaded and used elsewhere in scenarios
> > > > where consumer logic is not optimized.
> > > >
> > > > BR,
> > > > Xiangying
> > > >
> > > > [0] https://github.com/apache/pulsar/pull/21048
> > > > [1] https://github.com/apache/pulsar/pull/20948
> > > >
> > > > On Fri, Aug 25, 2023 at 5:18 PM Zike Yang <z...@apache.org> wrote:
> > > > >
> > > > > HI xiangying
> > > > >
> > > > > > The rewind operation is seen in the test log.
> > > > >
> > > > > That seems weird. Not sure if this rewind is related to the chunk
> > > > consuming.
> > > > >
> > > > > > 1. SequenceID: 0, ChunkID: 0
> > > > > 2. SequenceID: 0, ChunkID: 1
> > > > > 3. SequenceID: 0, ChunkID: 1
> > > > > 4. SequenceID: 0, ChunkID: 2
> > > > > Such four chunks cannot be processed correctly by the consumer.
> > > > >
> > > > > How would this happen to get two duplicated and consecutive ChunkID-1
> > > > > messages? The producer should guarantee to retry the whole chunked
> > > > > messages instead of some parts of the chunks.
> > > > >
> > > > > > But chunks 1, 2, 3, and 4 are still persisted in the topic.
> > > > >
> > > > > I think it's OK to persist them all in the topic. Is there any issue
> > > > > with doing that?
> > > > >
> > > > > > There is another point. The resend of the chunk message has a bug
> > that
> > > > > I shared with you, and you fixed in PR [0]. It will make this case
> > > > > happen in another way.
> > > > >
> > > > > If the user sets the sequence ID manually, the case could be
> > reproduced.
> > > > >
> > > > > BR,
> > > > > Zike Yang
> > > > >
> > > > > On Thu, Aug 24, 2023 at 8:48 PM Xiangying Meng <xiangy...@apache.org
> > >
> > > > wrote:
> > > > > >
> > > > > > >IIUC, this may change the existing behavior and may introduce
> > > > inconsistencies.
> > > > > > >Suppose that we have a large message with 3 chunks. But the
> > producer
> > > > > > >crashes and resends the message after sending the chunk-1. It will
> > > > > > >send a total of 5 messages to the Pulsar topic:
> > > > > > >
> > > > > > >1. SequenceID: 0, ChunkID: 0
> > > > > > >2. SequenceID: 0, ChunkID: 1
> > > > > > >3. SequenceID: 0, ChunkID: 0   -> This message will be dropped
> > > > > > >4. SequenceID: 0, ChunkID: 1    -> Will also be dropped
> > > > > > >5. SequenceID: 0, ChunkID: 2    -> The last chunk of the message
> > > > > >
> > > > > > Hi Zike
> > > > > > There is another point. The resend of the chunk message has a bug
> > that
> > > > > > I shared with you, and you fixed in PR [0]. It will make this case
> > > > > > happen in another way.
> > > > > > Sample description for the  bug:
> > > > > > Because the chunk message uses the same message metadata, if the
> > chunk
> > > > > > is not sent out immediately. Then, when resending, all chunks of
> > the
> > > > > > same chunk message use the chunk ID of the last chunk.
> > > > > > In this case, It should happen as:
> > > > > > 1. SequenceID: 0, ChunkID: 0 (Put op1 into `pendingMessages` and
> > send)
> > > > > > 2. SequenceID: 0, ChunkID: 1 (Put op2 into `pendingMessages` and
> > send)
> > > > > > 3. SequenceID: 0, ChunkID: 2   -> (Put op3 into `pendingMessages`)
> > > > > > 4. SequenceID: 0, ChunkID: 2   -> (Resend op1)
> > > > > > 5. SequenceID: 0, ChunkID: 2   -> (Resend op2)
> > > > > > 6. SequenceID: 0, ChunkID: 2   -> (Send op3)
> > > > > >
> > > > > >
> > > > > > BR,
> > > > > > Xiangying
> > > > > >
> > > > > > [0] - https://github.com/apache/pulsar/pull/21048
> > > > > >
> > > > > > On Thu, Aug 24, 2023 at 8:09 PM Xiangying Meng <
> > xiangy...@apache.org>
> > > > wrote:
> > > > > > >
> > > > > > > >> This solution also cannot solve the out-of-order messages
> > inside
> > > > the
> > > > > > > >>chunks. For example, the above five messages will still be
> > > > persisted.
> > > > > > > >The consumer already handles this case. The above 5 messages
> > will
> > > > all
> > > > > > > >be persisted but the consumer will skip message 1 and 2.
> > > > > > > >For messages 3, 4, and 5. The producer can guarantee these
> > chunks
> > > > are in order.
> > > > > > >
> > > > > > > The rewind operation is seen in the test log. Every time an
> > incorrect
> > > > > > > chunk message is received, it will rewind, and the code has yet
> > to be
> > > > > > > studied in depth.
> > > > > > > If it does not call rewind, then this case is considered a
> > workable
> > > > > > > case. Let's look at another case.
> > > > > > > 1. SequenceID: 0, ChunkID: 0
> > > > > > > 2. SequenceID: 0, ChunkID: 1
> > > > > > > 3. SequenceID: 0, ChunkID: 1
> > > > > > > 4. SequenceID: 0, ChunkID: 2
> > > > > > > Such four chunks cannot be processed correctly by the consumer.
> > > > > > >
> > > > > > > In fact, this solution is my original idea. The PR I mentioned
> > in the
> > > > > > > first email above uses a similar solution and modifies the logic
> > on
> > > > > > > the consumer side.
> > > > > > > Also, as I mentioned in the first email, this solution can only
> > solve
> > > > > > > the problem of end-to-end duplication. But chunks 1, 2, 3, and 4
> > are
> > > > > > > still persisted in the topic.
> > > > > > >
> > > > > > > On Thu, Aug 24, 2023 at 3:00 PM Zike Yang <z...@apache.org>
> > wrote:
> > > > > > > >
> > > > > > > > Hi Heesung,
> > > > > > > >
> > > > > > > > > I believe in this PIP "similar to the existing "sequence ID
> > map",
> > > > > > > > to facilitate effective filtering" actually means tracking the
> > last
> > > > > > > > chunkId(not all chunk ids) on the broker side.
> > > > > > > >
> > > > > > > > With this simple solution, I think we don't need to track the
> > > > > > > > (sequenceID, chunkID) on the broker side at all. The broker
> > just
> > > > needs
> > > > > > > > to apply the deduplication logic to the last chunk instead of
> > all
> > > > > > > > previous chunks. This PIP actually could do that, but it will
> > > > > > > > introduce a new data format and compatibility issue.
> > > > > > > >
> > > > > > > > > This is still a behavior change(deduping chunk messages on
> > the
> > > > broker),
> > > > > > > > and I believe we need to discuss this addition as a PIP.
> > > > > > > >
> > > > > > > > Actually, we didn't specifically state the deduping chunk
> > message
> > > > > > > > behavior before. The chunked message should be equally
> > applicable
> > > > to
> > > > > > > > the de-duplication logic as a regular message. Therefore, I
> > think
> > > > it
> > > > > > > > should be considered as a bug fix. But if this FIX is worth
> > > > discussing
> > > > > > > > in depth. I have no objection to it being a new PIP.
> > > > > > > >
> > > > > > > > > I think brokers can track the last chunkMaxMessageSize for
> > > > > > > > each producer.
> > > > > > > >
> > > > > > > > Using different chunkMaxMessageSize is just one of the
> > aspects. In
> > > > > > > > PIP-132 [0], we have included the message metadata size when
> > > > checking
> > > > > > > > maxMessageSize.
> > > > > > > > The message metadata can be changed after splitting the
> > chunks. We
> > > > are
> > > > > > > > still uncertain about the way the chunked message is split,
> > even
> > > > using
> > > > > > > > the same ss chunkMaxMessageSize.
> > > > > > > >
> > > > > > > > > then the brokers can assume that the producer is resending
> > the
> > > > chunks from
> > > > > > > > the beginning with a different scheme(restarted with a
> > different
> > > > > > > > chunkMaxMessageSize) and accept those new chunks from the
> > > > beginning.
> > > > > > > >
> > > > > > > > Regarding this, it seems like we are implementing dynamic
> > > > > > > > configuration for the chunkMaxMessageSize. I'm afraid that this
> > > > would
> > > > > > > > change the expected behavior and introduce more complexity to
> > this
> > > > > > > > configuration.
> > > > > > > >
> > > > > > > >
> > > > > > > > [0] https://github.com/apache/pulsar/pull/14007
> > > > > > > >
> > > > > > > > BR,
> > > > > > > > Zike Yang
> > > > > > > >
> > > > > > > > On Thu, Aug 24, 2023 at 2:21 PM Zike Yang <z...@apache.org>
> > wrote:
> > > > > > > > >
> > > > > > > > > Hi, xiangying
> > > > > > > > >
> > > > > > > > > > it will find that the message
> > > > > > > > > is out of order and rewind the cursor. Loop this operation,
> > and
> > > > > > > > > discard this message after it expires instead of assembling
> > 3,
> > > > 4, 5
> > > > > > > > > into a message.
> > > > > > > > >
> > > > > > > > > Could you point out where the implementation for this?  From
> > my
> > > > > > > > > understanding, there should not be any rewind operation for
> > the
> > > > > > > > > chunking feature. You can check more detail here:
> > > > > > > > >
> > > >
> > https://streamnative.io/blog/deep-dive-into-message-chunking-in-pulsar#how-message-chunking-is-implemented
> > > > > > > > >
> > > > > > > > > > This solution also cannot solve the out-of-order messages
> > > > inside the
> > > > > > > > > chunks. For example, the above five messages will still be
> > > > persisted.
> > > > > > > > >
> > > > > > > > > The consumer already handles this case. The above 5 messages
> > > > will all
> > > > > > > > > be persisted but the consumer will skip message 1 and 2.
> > > > > > > > > For messages 3, 4, and 5. The producer can guarantee these
> > > > chunks are in order.
> > > > > > > > >
> > > > > > > > > BR,
> > > > > > > > > Zike Yang
> > > > > > > > >
> > > > > > > > > On Thu, Aug 24, 2023 at 11:48 AM Yubiao Feng
> > > > > > > > > <yubiao.f...@streamnative.io.invalid> wrote:
> > > > > > > > > >
> > > > > > > > > > > 1. SequenceID: 0, ChunkID: 0
> > > > > > > > > > > 2. SequenceID: 0, ChunkID: 1
> > > > > > > > > > > 3. SequenceID: 0, ChunkID: 0
> > > > > > > > > > > 4. SequenceID: 0, ChunkID: 1
> > > > > > > > > > > 5. SequenceID: 0, ChunkID: 2
> > > > > > > > > > > For the existing behavior, the consumer assembles
> > > > > > > > > > > messages 3,4,5 into
> > > > > > > > > > > the original large message. But the changes brought
> > > > > > > > > > > about by this PIP
> > > > > > > > > > > will cause the consumer to use messages 1,2,5 for
> > > > > > > > > > > assembly. There is
> > > > > > > > > > > no guarantee that the producer will split the message
> > > > > > > > > > > in the same way
> > > > > > > > > > > twice before and after. For example, the producer's
> > > > > > > > > > > maxMessageSize may
> > > > > > > > > > > be different. This may cause the consumer to
> > > > > > > > > > > receive a corrupt
> > > > > > > > > > > message.
> > > > > > > > > >
> > > > > > > > > > Good point.
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > Thanks
> > > > > > > > > > Yubiao Feng
> > > > > > > > > >
> > > > > > > > > > On Wed, Aug 23, 2023 at 12:34 PM Zike Yang <
> > z...@apache.org>
> > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > Hi, xiangying,
> > > > > > > > > > >
> > > > > > > > > > > Thanks for your PIP.
> > > > > > > > > > >
> > > > > > > > > > > IIUC, this may change the existing behavior and may
> > introduce
> > > > > > > > > > > inconsistencies.
> > > > > > > > > > > Suppose that we have a large message with 3 chunks. But
> > the
> > > > producer
> > > > > > > > > > > crashes and resends the message after sending the
> > chunk-1.
> > > > It will
> > > > > > > > > > > send a total of 5 messages to the Pulsar topic:
> > > > > > > > > > >
> > > > > > > > > > > 1. SequenceID: 0, ChunkID: 0
> > > > > > > > > > > 2. SequenceID: 0, ChunkID: 1
> > > > > > > > > > > 3. SequenceID: 0, ChunkID: 0   -> This message will be
> > > > dropped
> > > > > > > > > > > 4. SequenceID: 0, ChunkID: 1    -> Will also be dropped
> > > > > > > > > > > 5. SequenceID: 0, ChunkID: 2    -> The last chunk of the
> > > > message
> > > > > > > > > > >
> > > > > > > > > > > For the existing behavior, the consumer assembles
> > messages
> > > > 3,4,5 into
> > > > > > > > > > > the original large message. But the changes brought
> > about by
> > > > this PIP
> > > > > > > > > > > will cause the consumer to use messages 1,2,5 for
> > assembly.
> > > > There is
> > > > > > > > > > > no guarantee that the producer will split the message in
> > the
> > > > same way
> > > > > > > > > > > twice before and after. For example, the producer's
> > > > maxMessageSize may
> > > > > > > > > > > be different. This may cause the consumer to receive a
> > > > corrupt
> > > > > > > > > > > message.
> > > > > > > > > > >
> > > > > > > > > > > Also, this PIP increases the complexity of handling
> > chunks
> > > > on the
> > > > > > > > > > > broker side. Brokers should, in general, treat the chunk
> > as
> > > > a normal
> > > > > > > > > > > message.
> > > > > > > > > > >
> > > > > > > > > > > I think a simple better approach is to only check the
> > > > deduplication
> > > > > > > > > > > for the last chunk of the large message. The consumer
> > only
> > > > gets the
> > > > > > > > > > > whole message after receiving the last chunk. We don't
> > need
> > > > to check
> > > > > > > > > > > the deduplication for all previous chunks. Also by doing
> > > > this we only
> > > > > > > > > > > need bug fixes, we don't need to introduce a new PIP.
> > > > > > > > > > >
> > > > > > > > > > > BR,
> > > > > > > > > > > Zike Yang
> > > > > > > > > > >
> > > > > > > > > > > On Fri, Aug 18, 2023 at 7:54 PM Xiangying Meng <
> > > > xiangy...@apache.org>
> > > > > > > > > > > wrote:
> > > > > > > > > > > >
> > > > > > > > > > > > Dear Community,
> > > > > > > > > > > >
> > > > > > > > > > > > I hope this email finds you well. I'd like to address
> > an
> > > > important
> > > > > > > > > > > > issue related to Apache Pulsar and discuss a solution
> > I've
> > > > proposed on
> > > > > > > > > > > > GitHub. The problem pertains to the handling of Chunk
> > > > Messages after
> > > > > > > > > > > > enabling deduplication.
> > > > > > > > > > > >
> > > > > > > > > > > > In the current version of Apache Pulsar, all chunks of
> > a
> > > > Chunk Message
> > > > > > > > > > > > share the same sequence ID. However, enabling the
> > > > depublication
> > > > > > > > > > > > feature results in an inability to send Chunk
> > Messages. To
> > > > tackle this
> > > > > > > > > > > > problem, I've proposed a solution [1] that ensures
> > > > messages are not
> > > > > > > > > > > > duplicated throughout end-to-end delivery. While this
> > fix
> > > > addresses
> > > > > > > > > > > > the duplication issue for end-to-end messages, there
> > > > remains a
> > > > > > > > > > > > possibility of duplicate chunks within topics.
> > > > > > > > > > > >
> > > > > > > > > > > > To address this concern, I believe we should introduce
> > a
> > > > "Chunk ID
> > > > > > > > > > > > map" at the Broker level, similar to the existing
> > > > "sequence ID map",
> > > > > > > > > > > > to facilitate effective filtering. However,
> > implementing
> > > > this has led
> > > > > > > > > > > > to a challenge: a producer requires storage for two
> > Long
> > > > values
> > > > > > > > > > > > simultaneously (sequence ID and chunk ID). Because the
> > > > snapshot of the
> > > > > > > > > > > > sequence ID map is stored through the properties of the
> > > > cursor
> > > > > > > > > > > > (Map<String, Long>), so in order to satisfy the
> > storage of
> > > > two Longs
> > > > > > > > > > > > (sequence ID, chunk ID) corresponding to one producer,
> > we
> > > > hope to add
> > > > > > > > > > > > a mark DeleteProperties (Map<String, Long>) String,
> > > > String>) to
> > > > > > > > > > > > replace the properties (Map<String, Long>) field. To
> > > > resolve this,
> > > > > > > > > > > > I've proposed an alternative proposal [2] involving the
> > > > introduction
> > > > > > > > > > > > of a "mark DeleteProperties" (Map<String, String>) to
> > > > replace the
> > > > > > > > > > > > current properties (Map<String, Long>) field.
> > > > > > > > > > > >
> > > > > > > > > > > > I'd appreciate it if you carefully review both PRs and
> > > > share your
> > > > > > > > > > > > valuable feedback and insights. Thank you immensely for
> > > > your time and
> > > > > > > > > > > > attention. I eagerly anticipate your valuable opinions
> > and
> > > > > > > > > > > > recommendations.
> > > > > > > > > > > >
> > > > > > > > > > > > Warm regards,
> > > > > > > > > > > > Xiangying
> > > > > > > > > > > >
> > > > > > > > > > > > [1] https://github.com/apache/pulsar/pull/20948
> > > > > > > > > > > > [2] https://github.com/apache/pulsar/pull/21027
> > > > > > > > > > >
> > > >
> >

Reply via email to