HI xiangying > The rewind operation is seen in the test log.
That seems weird. Not sure if this rewind is related to the chunk consuming. > 1. SequenceID: 0, ChunkID: 0 2. SequenceID: 0, ChunkID: 1 3. SequenceID: 0, ChunkID: 1 4. SequenceID: 0, ChunkID: 2 Such four chunks cannot be processed correctly by the consumer. How would this happen to get two duplicated and consecutive ChunkID-1 messages? The producer should guarantee to retry the whole chunked messages instead of some parts of the chunks. > But chunks 1, 2, 3, and 4 are still persisted in the topic. I think it's OK to persist them all in the topic. Is there any issue with doing that? > There is another point. The resend of the chunk message has a bug that I shared with you, and you fixed in PR [0]. It will make this case happen in another way. If the user sets the sequence ID manually, the case could be reproduced. BR, Zike Yang On Thu, Aug 24, 2023 at 8:48 PM Xiangying Meng <xiangy...@apache.org> wrote: > > >IIUC, this may change the existing behavior and may introduce > >inconsistencies. > >Suppose that we have a large message with 3 chunks. But the producer > >crashes and resends the message after sending the chunk-1. It will > >send a total of 5 messages to the Pulsar topic: > > > >1. SequenceID: 0, ChunkID: 0 > >2. SequenceID: 0, ChunkID: 1 > >3. SequenceID: 0, ChunkID: 0 -> This message will be dropped > >4. SequenceID: 0, ChunkID: 1 -> Will also be dropped > >5. SequenceID: 0, ChunkID: 2 -> The last chunk of the message > > Hi Zike > There is another point. The resend of the chunk message has a bug that > I shared with you, and you fixed in PR [0]. It will make this case > happen in another way. > Sample description for the bug: > Because the chunk message uses the same message metadata, if the chunk > is not sent out immediately. Then, when resending, all chunks of the > same chunk message use the chunk ID of the last chunk. > In this case, It should happen as: > 1. SequenceID: 0, ChunkID: 0 (Put op1 into `pendingMessages` and send) > 2. SequenceID: 0, ChunkID: 1 (Put op2 into `pendingMessages` and send) > 3. SequenceID: 0, ChunkID: 2 -> (Put op3 into `pendingMessages`) > 4. SequenceID: 0, ChunkID: 2 -> (Resend op1) > 5. SequenceID: 0, ChunkID: 2 -> (Resend op2) > 6. SequenceID: 0, ChunkID: 2 -> (Send op3) > > > BR, > Xiangying > > [0] - https://github.com/apache/pulsar/pull/21048 > > On Thu, Aug 24, 2023 at 8:09 PM Xiangying Meng <xiangy...@apache.org> wrote: > > > > >> This solution also cannot solve the out-of-order messages inside the > > >>chunks. For example, the above five messages will still be persisted. > > >The consumer already handles this case. The above 5 messages will all > > >be persisted but the consumer will skip message 1 and 2. > > >For messages 3, 4, and 5. The producer can guarantee these chunks are in > > >order. > > > > The rewind operation is seen in the test log. Every time an incorrect > > chunk message is received, it will rewind, and the code has yet to be > > studied in depth. > > If it does not call rewind, then this case is considered a workable > > case. Let's look at another case. > > 1. SequenceID: 0, ChunkID: 0 > > 2. SequenceID: 0, ChunkID: 1 > > 3. SequenceID: 0, ChunkID: 1 > > 4. SequenceID: 0, ChunkID: 2 > > Such four chunks cannot be processed correctly by the consumer. > > > > In fact, this solution is my original idea. The PR I mentioned in the > > first email above uses a similar solution and modifies the logic on > > the consumer side. > > Also, as I mentioned in the first email, this solution can only solve > > the problem of end-to-end duplication. But chunks 1, 2, 3, and 4 are > > still persisted in the topic. > > > > On Thu, Aug 24, 2023 at 3:00 PM Zike Yang <z...@apache.org> wrote: > > > > > > Hi Heesung, > > > > > > > I believe in this PIP "similar to the existing "sequence ID map", > > > to facilitate effective filtering" actually means tracking the last > > > chunkId(not all chunk ids) on the broker side. > > > > > > With this simple solution, I think we don't need to track the > > > (sequenceID, chunkID) on the broker side at all. The broker just needs > > > to apply the deduplication logic to the last chunk instead of all > > > previous chunks. This PIP actually could do that, but it will > > > introduce a new data format and compatibility issue. > > > > > > > This is still a behavior change(deduping chunk messages on the broker), > > > and I believe we need to discuss this addition as a PIP. > > > > > > Actually, we didn't specifically state the deduping chunk message > > > behavior before. The chunked message should be equally applicable to > > > the de-duplication logic as a regular message. Therefore, I think it > > > should be considered as a bug fix. But if this FIX is worth discussing > > > in depth. I have no objection to it being a new PIP. > > > > > > > I think brokers can track the last chunkMaxMessageSize for > > > each producer. > > > > > > Using different chunkMaxMessageSize is just one of the aspects. In > > > PIP-132 [0], we have included the message metadata size when checking > > > maxMessageSize. > > > The message metadata can be changed after splitting the chunks. We are > > > still uncertain about the way the chunked message is split, even using > > > the same ss chunkMaxMessageSize. > > > > > > > then the brokers can assume that the producer is resending the chunks > > > > from > > > the beginning with a different scheme(restarted with a different > > > chunkMaxMessageSize) and accept those new chunks from the beginning. > > > > > > Regarding this, it seems like we are implementing dynamic > > > configuration for the chunkMaxMessageSize. I'm afraid that this would > > > change the expected behavior and introduce more complexity to this > > > configuration. > > > > > > > > > [0] https://github.com/apache/pulsar/pull/14007 > > > > > > BR, > > > Zike Yang > > > > > > On Thu, Aug 24, 2023 at 2:21 PM Zike Yang <z...@apache.org> wrote: > > > > > > > > Hi, xiangying > > > > > > > > > it will find that the message > > > > is out of order and rewind the cursor. Loop this operation, and > > > > discard this message after it expires instead of assembling 3, 4, 5 > > > > into a message. > > > > > > > > Could you point out where the implementation for this? From my > > > > understanding, there should not be any rewind operation for the > > > > chunking feature. You can check more detail here: > > > > https://streamnative.io/blog/deep-dive-into-message-chunking-in-pulsar#how-message-chunking-is-implemented > > > > > > > > > This solution also cannot solve the out-of-order messages inside the > > > > chunks. For example, the above five messages will still be persisted. > > > > > > > > The consumer already handles this case. The above 5 messages will all > > > > be persisted but the consumer will skip message 1 and 2. > > > > For messages 3, 4, and 5. The producer can guarantee these chunks are > > > > in order. > > > > > > > > BR, > > > > Zike Yang > > > > > > > > On Thu, Aug 24, 2023 at 11:48 AM Yubiao Feng > > > > <yubiao.f...@streamnative.io.invalid> wrote: > > > > > > > > > > > 1. SequenceID: 0, ChunkID: 0 > > > > > > 2. SequenceID: 0, ChunkID: 1 > > > > > > 3. SequenceID: 0, ChunkID: 0 > > > > > > 4. SequenceID: 0, ChunkID: 1 > > > > > > 5. SequenceID: 0, ChunkID: 2 > > > > > > For the existing behavior, the consumer assembles > > > > > > messages 3,4,5 into > > > > > > the original large message. But the changes brought > > > > > > about by this PIP > > > > > > will cause the consumer to use messages 1,2,5 for > > > > > > assembly. There is > > > > > > no guarantee that the producer will split the message > > > > > > in the same way > > > > > > twice before and after. For example, the producer's > > > > > > maxMessageSize may > > > > > > be different. This may cause the consumer to > > > > > > receive a corrupt > > > > > > message. > > > > > > > > > > Good point. > > > > > > > > > > > > > > > Thanks > > > > > Yubiao Feng > > > > > > > > > > On Wed, Aug 23, 2023 at 12:34 PM Zike Yang <z...@apache.org> wrote: > > > > > > > > > > > Hi, xiangying, > > > > > > > > > > > > Thanks for your PIP. > > > > > > > > > > > > IIUC, this may change the existing behavior and may introduce > > > > > > inconsistencies. > > > > > > Suppose that we have a large message with 3 chunks. But the producer > > > > > > crashes and resends the message after sending the chunk-1. It will > > > > > > send a total of 5 messages to the Pulsar topic: > > > > > > > > > > > > 1. SequenceID: 0, ChunkID: 0 > > > > > > 2. SequenceID: 0, ChunkID: 1 > > > > > > 3. SequenceID: 0, ChunkID: 0 -> This message will be dropped > > > > > > 4. SequenceID: 0, ChunkID: 1 -> Will also be dropped > > > > > > 5. SequenceID: 0, ChunkID: 2 -> The last chunk of the message > > > > > > > > > > > > For the existing behavior, the consumer assembles messages 3,4,5 > > > > > > into > > > > > > the original large message. But the changes brought about by this > > > > > > PIP > > > > > > will cause the consumer to use messages 1,2,5 for assembly. There is > > > > > > no guarantee that the producer will split the message in the same > > > > > > way > > > > > > twice before and after. For example, the producer's maxMessageSize > > > > > > may > > > > > > be different. This may cause the consumer to receive a corrupt > > > > > > message. > > > > > > > > > > > > Also, this PIP increases the complexity of handling chunks on the > > > > > > broker side. Brokers should, in general, treat the chunk as a normal > > > > > > message. > > > > > > > > > > > > I think a simple better approach is to only check the deduplication > > > > > > for the last chunk of the large message. The consumer only gets the > > > > > > whole message after receiving the last chunk. We don't need to check > > > > > > the deduplication for all previous chunks. Also by doing this we > > > > > > only > > > > > > need bug fixes, we don't need to introduce a new PIP. > > > > > > > > > > > > BR, > > > > > > Zike Yang > > > > > > > > > > > > On Fri, Aug 18, 2023 at 7:54 PM Xiangying Meng > > > > > > <xiangy...@apache.org> > > > > > > wrote: > > > > > > > > > > > > > > Dear Community, > > > > > > > > > > > > > > I hope this email finds you well. I'd like to address an important > > > > > > > issue related to Apache Pulsar and discuss a solution I've > > > > > > > proposed on > > > > > > > GitHub. The problem pertains to the handling of Chunk Messages > > > > > > > after > > > > > > > enabling deduplication. > > > > > > > > > > > > > > In the current version of Apache Pulsar, all chunks of a Chunk > > > > > > > Message > > > > > > > share the same sequence ID. However, enabling the depublication > > > > > > > feature results in an inability to send Chunk Messages. To tackle > > > > > > > this > > > > > > > problem, I've proposed a solution [1] that ensures messages are > > > > > > > not > > > > > > > duplicated throughout end-to-end delivery. While this fix > > > > > > > addresses > > > > > > > the duplication issue for end-to-end messages, there remains a > > > > > > > possibility of duplicate chunks within topics. > > > > > > > > > > > > > > To address this concern, I believe we should introduce a "Chunk ID > > > > > > > map" at the Broker level, similar to the existing "sequence ID > > > > > > > map", > > > > > > > to facilitate effective filtering. However, implementing this has > > > > > > > led > > > > > > > to a challenge: a producer requires storage for two Long values > > > > > > > simultaneously (sequence ID and chunk ID). Because the snapshot > > > > > > > of the > > > > > > > sequence ID map is stored through the properties of the cursor > > > > > > > (Map<String, Long>), so in order to satisfy the storage of two > > > > > > > Longs > > > > > > > (sequence ID, chunk ID) corresponding to one producer, we hope to > > > > > > > add > > > > > > > a mark DeleteProperties (Map<String, Long>) String, String>) to > > > > > > > replace the properties (Map<String, Long>) field. To resolve this, > > > > > > > I've proposed an alternative proposal [2] involving the > > > > > > > introduction > > > > > > > of a "mark DeleteProperties" (Map<String, String>) to replace the > > > > > > > current properties (Map<String, Long>) field. > > > > > > > > > > > > > > I'd appreciate it if you carefully review both PRs and share your > > > > > > > valuable feedback and insights. Thank you immensely for your time > > > > > > > and > > > > > > > attention. I eagerly anticipate your valuable opinions and > > > > > > > recommendations. > > > > > > > > > > > > > > Warm regards, > > > > > > > Xiangying > > > > > > > > > > > > > > [1] https://github.com/apache/pulsar/pull/20948 > > > > > > > [2] https://github.com/apache/pulsar/pull/21027 > > > > > >