Hi Heesung, > I believe in this PIP "similar to the existing "sequence ID map", to facilitate effective filtering" actually means tracking the last chunkId(not all chunk ids) on the broker side.
With this simple solution, I think we don't need to track the (sequenceID, chunkID) on the broker side at all. The broker just needs to apply the deduplication logic to the last chunk instead of all previous chunks. This PIP actually could do that, but it will introduce a new data format and compatibility issue. > This is still a behavior change(deduping chunk messages on the broker), and I believe we need to discuss this addition as a PIP. Actually, we didn't specifically state the deduping chunk message behavior before. The chunked message should be equally applicable to the de-duplication logic as a regular message. Therefore, I think it should be considered as a bug fix. But if this FIX is worth discussing in depth. I have no objection to it being a new PIP. > I think brokers can track the last chunkMaxMessageSize for each producer. Using different chunkMaxMessageSize is just one of the aspects. In PIP-132 [0], we have included the message metadata size when checking maxMessageSize. The message metadata can be changed after splitting the chunks. We are still uncertain about the way the chunked message is split, even using the same ss chunkMaxMessageSize. > then the brokers can assume that the producer is resending the chunks from the beginning with a different scheme(restarted with a different chunkMaxMessageSize) and accept those new chunks from the beginning. Regarding this, it seems like we are implementing dynamic configuration for the chunkMaxMessageSize. I'm afraid that this would change the expected behavior and introduce more complexity to this configuration. [0] https://github.com/apache/pulsar/pull/14007 BR, Zike Yang On Thu, Aug 24, 2023 at 2:21 PM Zike Yang <z...@apache.org> wrote: > > Hi, xiangying > > > it will find that the message > is out of order and rewind the cursor. Loop this operation, and > discard this message after it expires instead of assembling 3, 4, 5 > into a message. > > Could you point out where the implementation for this? From my > understanding, there should not be any rewind operation for the > chunking feature. You can check more detail here: > https://streamnative.io/blog/deep-dive-into-message-chunking-in-pulsar#how-message-chunking-is-implemented > > > This solution also cannot solve the out-of-order messages inside the > chunks. For example, the above five messages will still be persisted. > > The consumer already handles this case. The above 5 messages will all > be persisted but the consumer will skip message 1 and 2. > For messages 3, 4, and 5. The producer can guarantee these chunks are in > order. > > BR, > Zike Yang > > On Thu, Aug 24, 2023 at 11:48 AM Yubiao Feng > <yubiao.f...@streamnative.io.invalid> wrote: > > > > > 1. SequenceID: 0, ChunkID: 0 > > > 2. SequenceID: 0, ChunkID: 1 > > > 3. SequenceID: 0, ChunkID: 0 > > > 4. SequenceID: 0, ChunkID: 1 > > > 5. SequenceID: 0, ChunkID: 2 > > > For the existing behavior, the consumer assembles > > > messages 3,4,5 into > > > the original large message. But the changes brought > > > about by this PIP > > > will cause the consumer to use messages 1,2,5 for > > > assembly. There is > > > no guarantee that the producer will split the message > > > in the same way > > > twice before and after. For example, the producer's > > > maxMessageSize may > > > be different. This may cause the consumer to > > > receive a corrupt > > > message. > > > > Good point. > > > > > > Thanks > > Yubiao Feng > > > > On Wed, Aug 23, 2023 at 12:34 PM Zike Yang <z...@apache.org> wrote: > > > > > Hi, xiangying, > > > > > > Thanks for your PIP. > > > > > > IIUC, this may change the existing behavior and may introduce > > > inconsistencies. > > > Suppose that we have a large message with 3 chunks. But the producer > > > crashes and resends the message after sending the chunk-1. It will > > > send a total of 5 messages to the Pulsar topic: > > > > > > 1. SequenceID: 0, ChunkID: 0 > > > 2. SequenceID: 0, ChunkID: 1 > > > 3. SequenceID: 0, ChunkID: 0 -> This message will be dropped > > > 4. SequenceID: 0, ChunkID: 1 -> Will also be dropped > > > 5. SequenceID: 0, ChunkID: 2 -> The last chunk of the message > > > > > > For the existing behavior, the consumer assembles messages 3,4,5 into > > > the original large message. But the changes brought about by this PIP > > > will cause the consumer to use messages 1,2,5 for assembly. There is > > > no guarantee that the producer will split the message in the same way > > > twice before and after. For example, the producer's maxMessageSize may > > > be different. This may cause the consumer to receive a corrupt > > > message. > > > > > > Also, this PIP increases the complexity of handling chunks on the > > > broker side. Brokers should, in general, treat the chunk as a normal > > > message. > > > > > > I think a simple better approach is to only check the deduplication > > > for the last chunk of the large message. The consumer only gets the > > > whole message after receiving the last chunk. We don't need to check > > > the deduplication for all previous chunks. Also by doing this we only > > > need bug fixes, we don't need to introduce a new PIP. > > > > > > BR, > > > Zike Yang > > > > > > On Fri, Aug 18, 2023 at 7:54 PM Xiangying Meng <xiangy...@apache.org> > > > wrote: > > > > > > > > Dear Community, > > > > > > > > I hope this email finds you well. I'd like to address an important > > > > issue related to Apache Pulsar and discuss a solution I've proposed on > > > > GitHub. The problem pertains to the handling of Chunk Messages after > > > > enabling deduplication. > > > > > > > > In the current version of Apache Pulsar, all chunks of a Chunk Message > > > > share the same sequence ID. However, enabling the depublication > > > > feature results in an inability to send Chunk Messages. To tackle this > > > > problem, I've proposed a solution [1] that ensures messages are not > > > > duplicated throughout end-to-end delivery. While this fix addresses > > > > the duplication issue for end-to-end messages, there remains a > > > > possibility of duplicate chunks within topics. > > > > > > > > To address this concern, I believe we should introduce a "Chunk ID > > > > map" at the Broker level, similar to the existing "sequence ID map", > > > > to facilitate effective filtering. However, implementing this has led > > > > to a challenge: a producer requires storage for two Long values > > > > simultaneously (sequence ID and chunk ID). Because the snapshot of the > > > > sequence ID map is stored through the properties of the cursor > > > > (Map<String, Long>), so in order to satisfy the storage of two Longs > > > > (sequence ID, chunk ID) corresponding to one producer, we hope to add > > > > a mark DeleteProperties (Map<String, Long>) String, String>) to > > > > replace the properties (Map<String, Long>) field. To resolve this, > > > > I've proposed an alternative proposal [2] involving the introduction > > > > of a "mark DeleteProperties" (Map<String, String>) to replace the > > > > current properties (Map<String, Long>) field. > > > > > > > > I'd appreciate it if you carefully review both PRs and share your > > > > valuable feedback and insights. Thank you immensely for your time and > > > > attention. I eagerly anticipate your valuable opinions and > > > > recommendations. > > > > > > > > Warm regards, > > > > Xiangying > > > > > > > > [1] https://github.com/apache/pulsar/pull/20948 > > > > [2] https://github.com/apache/pulsar/pull/21027 > > >