HI xiangying

> The rewind operation is seen in the test log.

That seems weird. Not sure if this rewind is related to the chunk consuming.

> 1. SequenceID: 0, ChunkID: 0
2. SequenceID: 0, ChunkID: 1
3. SequenceID: 0, ChunkID: 1
4. SequenceID: 0, ChunkID: 2
Such four chunks cannot be processed correctly by the consumer.

How would this happen to get two duplicated and consecutive ChunkID-1
messages? The producer should guarantee to retry the whole chunked
messages instead of some parts of the chunks.

> But chunks 1, 2, 3, and 4 are still persisted in the topic.

I think it's OK to persist them all in the topic. Is there any issue
with doing that?

> There is another point. The resend of the chunk message has a bug that
I shared with you, and you fixed in PR [0]. It will make this case
happen in another way.

If the user sets the sequence ID manually, the case could be reproduced.

BR,
Zike Yang

On Thu, Aug 24, 2023 at 8:48 PM Xiangying Meng <xiangy...@apache.org> wrote:
>
> >IIUC, this may change the existing behavior and may introduce 
> >inconsistencies.
> >Suppose that we have a large message with 3 chunks. But the producer
> >crashes and resends the message after sending the chunk-1. It will
> >send a total of 5 messages to the Pulsar topic:
> >
> >1. SequenceID: 0, ChunkID: 0
> >2. SequenceID: 0, ChunkID: 1
> >3. SequenceID: 0, ChunkID: 0   -> This message will be dropped
> >4. SequenceID: 0, ChunkID: 1    -> Will also be dropped
> >5. SequenceID: 0, ChunkID: 2    -> The last chunk of the message
>
> Hi Zike
> There is another point. The resend of the chunk message has a bug that
> I shared with you, and you fixed in PR [0]. It will make this case
> happen in another way.
> Sample description for the  bug:
> Because the chunk message uses the same message metadata, if the chunk
> is not sent out immediately. Then, when resending, all chunks of the
> same chunk message use the chunk ID of the last chunk.
> In this case, It should happen as:
> 1. SequenceID: 0, ChunkID: 0 (Put op1 into `pendingMessages` and send)
> 2. SequenceID: 0, ChunkID: 1 (Put op2 into `pendingMessages` and send)
> 3. SequenceID: 0, ChunkID: 2   -> (Put op3 into `pendingMessages`)
> 4. SequenceID: 0, ChunkID: 2   -> (Resend op1)
> 5. SequenceID: 0, ChunkID: 2   -> (Resend op2)
> 6. SequenceID: 0, ChunkID: 2   -> (Send op3)
>
>
> BR,
> Xiangying
>
> [0] - https://github.com/apache/pulsar/pull/21048
>
> On Thu, Aug 24, 2023 at 8:09 PM Xiangying Meng <xiangy...@apache.org> wrote:
> >
> > >> This solution also cannot solve the out-of-order messages inside the
> > >>chunks. For example, the above five messages will still be persisted.
> > >The consumer already handles this case. The above 5 messages will all
> > >be persisted but the consumer will skip message 1 and 2.
> > >For messages 3, 4, and 5. The producer can guarantee these chunks are in 
> > >order.
> >
> > The rewind operation is seen in the test log. Every time an incorrect
> > chunk message is received, it will rewind, and the code has yet to be
> > studied in depth.
> > If it does not call rewind, then this case is considered a workable
> > case. Let's look at another case.
> > 1. SequenceID: 0, ChunkID: 0
> > 2. SequenceID: 0, ChunkID: 1
> > 3. SequenceID: 0, ChunkID: 1
> > 4. SequenceID: 0, ChunkID: 2
> > Such four chunks cannot be processed correctly by the consumer.
> >
> > In fact, this solution is my original idea. The PR I mentioned in the
> > first email above uses a similar solution and modifies the logic on
> > the consumer side.
> > Also, as I mentioned in the first email, this solution can only solve
> > the problem of end-to-end duplication. But chunks 1, 2, 3, and 4 are
> > still persisted in the topic.
> >
> > On Thu, Aug 24, 2023 at 3:00 PM Zike Yang <z...@apache.org> wrote:
> > >
> > > Hi Heesung,
> > >
> > > > I believe in this PIP "similar to the existing "sequence ID map",
> > > to facilitate effective filtering" actually means tracking the last
> > > chunkId(not all chunk ids) on the broker side.
> > >
> > > With this simple solution, I think we don't need to track the
> > > (sequenceID, chunkID) on the broker side at all. The broker just needs
> > > to apply the deduplication logic to the last chunk instead of all
> > > previous chunks. This PIP actually could do that, but it will
> > > introduce a new data format and compatibility issue.
> > >
> > > > This is still a behavior change(deduping chunk messages on the broker),
> > > and I believe we need to discuss this addition as a PIP.
> > >
> > > Actually, we didn't specifically state the deduping chunk message
> > > behavior before. The chunked message should be equally applicable to
> > > the de-duplication logic as a regular message. Therefore, I think it
> > > should be considered as a bug fix. But if this FIX is worth discussing
> > > in depth. I have no objection to it being a new PIP.
> > >
> > > > I think brokers can track the last chunkMaxMessageSize for
> > > each producer.
> > >
> > > Using different chunkMaxMessageSize is just one of the aspects. In
> > > PIP-132 [0], we have included the message metadata size when checking
> > > maxMessageSize.
> > > The message metadata can be changed after splitting the chunks. We are
> > > still uncertain about the way the chunked message is split, even using
> > > the same ss chunkMaxMessageSize.
> > >
> > > > then the brokers can assume that the producer is resending the chunks 
> > > > from
> > > the beginning with a different scheme(restarted with a different
> > > chunkMaxMessageSize) and accept those new chunks from the beginning.
> > >
> > > Regarding this, it seems like we are implementing dynamic
> > > configuration for the chunkMaxMessageSize. I'm afraid that this would
> > > change the expected behavior and introduce more complexity to this
> > > configuration.
> > >
> > >
> > > [0] https://github.com/apache/pulsar/pull/14007
> > >
> > > BR,
> > > Zike Yang
> > >
> > > On Thu, Aug 24, 2023 at 2:21 PM Zike Yang <z...@apache.org> wrote:
> > > >
> > > > Hi, xiangying
> > > >
> > > > > it will find that the message
> > > > is out of order and rewind the cursor. Loop this operation, and
> > > > discard this message after it expires instead of assembling 3, 4, 5
> > > > into a message.
> > > >
> > > > Could you point out where the implementation for this?  From my
> > > > understanding, there should not be any rewind operation for the
> > > > chunking feature. You can check more detail here:
> > > > https://streamnative.io/blog/deep-dive-into-message-chunking-in-pulsar#how-message-chunking-is-implemented
> > > >
> > > > > This solution also cannot solve the out-of-order messages inside the
> > > > chunks. For example, the above five messages will still be persisted.
> > > >
> > > > The consumer already handles this case. The above 5 messages will all
> > > > be persisted but the consumer will skip message 1 and 2.
> > > > For messages 3, 4, and 5. The producer can guarantee these chunks are 
> > > > in order.
> > > >
> > > > BR,
> > > > Zike Yang
> > > >
> > > > On Thu, Aug 24, 2023 at 11:48 AM Yubiao Feng
> > > > <yubiao.f...@streamnative.io.invalid> wrote:
> > > > >
> > > > > > 1. SequenceID: 0, ChunkID: 0
> > > > > > 2. SequenceID: 0, ChunkID: 1
> > > > > > 3. SequenceID: 0, ChunkID: 0
> > > > > > 4. SequenceID: 0, ChunkID: 1
> > > > > > 5. SequenceID: 0, ChunkID: 2
> > > > > > For the existing behavior, the consumer assembles
> > > > > > messages 3,4,5 into
> > > > > > the original large message. But the changes brought
> > > > > > about by this PIP
> > > > > > will cause the consumer to use messages 1,2,5 for
> > > > > > assembly. There is
> > > > > > no guarantee that the producer will split the message
> > > > > > in the same way
> > > > > > twice before and after. For example, the producer's
> > > > > > maxMessageSize may
> > > > > > be different. This may cause the consumer to
> > > > > > receive a corrupt
> > > > > > message.
> > > > >
> > > > > Good point.
> > > > >
> > > > >
> > > > > Thanks
> > > > > Yubiao Feng
> > > > >
> > > > > On Wed, Aug 23, 2023 at 12:34 PM Zike Yang <z...@apache.org> wrote:
> > > > >
> > > > > > Hi, xiangying,
> > > > > >
> > > > > > Thanks for your PIP.
> > > > > >
> > > > > > IIUC, this may change the existing behavior and may introduce
> > > > > > inconsistencies.
> > > > > > Suppose that we have a large message with 3 chunks. But the producer
> > > > > > crashes and resends the message after sending the chunk-1. It will
> > > > > > send a total of 5 messages to the Pulsar topic:
> > > > > >
> > > > > > 1. SequenceID: 0, ChunkID: 0
> > > > > > 2. SequenceID: 0, ChunkID: 1
> > > > > > 3. SequenceID: 0, ChunkID: 0   -> This message will be dropped
> > > > > > 4. SequenceID: 0, ChunkID: 1    -> Will also be dropped
> > > > > > 5. SequenceID: 0, ChunkID: 2    -> The last chunk of the message
> > > > > >
> > > > > > For the existing behavior, the consumer assembles messages 3,4,5 
> > > > > > into
> > > > > > the original large message. But the changes brought about by this 
> > > > > > PIP
> > > > > > will cause the consumer to use messages 1,2,5 for assembly. There is
> > > > > > no guarantee that the producer will split the message in the same 
> > > > > > way
> > > > > > twice before and after. For example, the producer's maxMessageSize 
> > > > > > may
> > > > > > be different. This may cause the consumer to receive a corrupt
> > > > > > message.
> > > > > >
> > > > > > Also, this PIP increases the complexity of handling chunks on the
> > > > > > broker side. Brokers should, in general, treat the chunk as a normal
> > > > > > message.
> > > > > >
> > > > > > I think a simple better approach is to only check the deduplication
> > > > > > for the last chunk of the large message. The consumer only gets the
> > > > > > whole message after receiving the last chunk. We don't need to check
> > > > > > the deduplication for all previous chunks. Also by doing this we 
> > > > > > only
> > > > > > need bug fixes, we don't need to introduce a new PIP.
> > > > > >
> > > > > > BR,
> > > > > > Zike Yang
> > > > > >
> > > > > > On Fri, Aug 18, 2023 at 7:54 PM Xiangying Meng 
> > > > > > <xiangy...@apache.org>
> > > > > > wrote:
> > > > > > >
> > > > > > > Dear Community,
> > > > > > >
> > > > > > > I hope this email finds you well. I'd like to address an important
> > > > > > > issue related to Apache Pulsar and discuss a solution I've 
> > > > > > > proposed on
> > > > > > > GitHub. The problem pertains to the handling of Chunk Messages 
> > > > > > > after
> > > > > > > enabling deduplication.
> > > > > > >
> > > > > > > In the current version of Apache Pulsar, all chunks of a Chunk 
> > > > > > > Message
> > > > > > > share the same sequence ID. However, enabling the depublication
> > > > > > > feature results in an inability to send Chunk Messages. To tackle 
> > > > > > > this
> > > > > > > problem, I've proposed a solution [1] that ensures messages are 
> > > > > > > not
> > > > > > > duplicated throughout end-to-end delivery. While this fix 
> > > > > > > addresses
> > > > > > > the duplication issue for end-to-end messages, there remains a
> > > > > > > possibility of duplicate chunks within topics.
> > > > > > >
> > > > > > > To address this concern, I believe we should introduce a "Chunk ID
> > > > > > > map" at the Broker level, similar to the existing "sequence ID 
> > > > > > > map",
> > > > > > > to facilitate effective filtering. However, implementing this has 
> > > > > > > led
> > > > > > > to a challenge: a producer requires storage for two Long values
> > > > > > > simultaneously (sequence ID and chunk ID). Because the snapshot 
> > > > > > > of the
> > > > > > > sequence ID map is stored through the properties of the cursor
> > > > > > > (Map<String, Long>), so in order to satisfy the storage of two 
> > > > > > > Longs
> > > > > > > (sequence ID, chunk ID) corresponding to one producer, we hope to 
> > > > > > > add
> > > > > > > a mark DeleteProperties (Map<String, Long>) String, String>) to
> > > > > > > replace the properties (Map<String, Long>) field. To resolve this,
> > > > > > > I've proposed an alternative proposal [2] involving the 
> > > > > > > introduction
> > > > > > > of a "mark DeleteProperties" (Map<String, String>) to replace the
> > > > > > > current properties (Map<String, Long>) field.
> > > > > > >
> > > > > > > I'd appreciate it if you carefully review both PRs and share your
> > > > > > > valuable feedback and insights. Thank you immensely for your time 
> > > > > > > and
> > > > > > > attention. I eagerly anticipate your valuable opinions and
> > > > > > > recommendations.
> > > > > > >
> > > > > > > Warm regards,
> > > > > > > Xiangying
> > > > > > >
> > > > > > > [1] https://github.com/apache/pulsar/pull/20948
> > > > > > > [2] https://github.com/apache/pulsar/pull/21027
> > > > > >

Reply via email to