Don't we still need the broker dedup logic for the above case? Then, probably brokers need to track the following. Map<Producer, LastSeqId> // additionally track Map<Producer, ChunkingContext> ChunkingContext{ uuid, numChunks, lastChunkId }
The chunked msg dedup logic might be like: For any incoming chunked msg, m : If m.currentSeqid < LastSeqId, dedup. If m.currentSeqid == LastSeqId, 1. if m.uuid is different from ctx.uuid and m.currentChunkId=0, then this is a new chunking context, no dedup, and reset the context (previously persisted incomplete chunks need to be ignored on the consumers) 2. if ctx.numChunks -1 = ctx.lastChunkId, the chunked msg already persisted. So, dedup 3. if m.currentChunkId <= ctx.lastChunkId , dedup. On Fri, Aug 25, 2023 at 7:56 PM Heesung Sohn <heesung.s...@streamnative.io> wrote: > Actually, can we think about this case too? > > What happens if the cx sends the same chunked msg with the same seq id > when dedup is enabled? > > // user send a chunked msg, m1 > s1, c0 > s1, c1 > s1, c2 // complete > > // user resend the duplicate msg, m2 > s1, c0 > s1, c1 > s1, c2 //complete > > Do consumers receive m1 and m2(no dedup)? > > > > On Fri, Aug 25, 2023 at 6:55 PM Xiangying Meng <xiangy...@apache.org> > wrote: > >> Hi Heesung, >> >> >I think this means, for the PIP, the broker side's chunk deduplication. >> >I think brokers probably need to track map<uuid, last_chunk_id> to dedup >> >> What is the significance of doing this? >> My understanding is that if the client crashes and restarts after >> sending half of a chunk message and then it resends the previous chunk >> message, the resent chunk message should be treated as a new message >> since it calls the producer's API again. >> If deduplication is enabled, users should ensure that their customized >> sequence ID is not lower than the previous sequence ID. >> I have considered this scenario and added a warning log in PR[0]. (I'm >> not sure whether an error log should be added or an exception thrown.) >> If deduplication is not enabled, on the consumer side, there should be >> an incomplete chunk message received alongside another complete chunk >> message, each with a different UUID, and they will not interfere with >> each other. >> >> My main point is that every message sent using >> `producer.newMessage().send()` should be treated as a new message. >> UUID is solely used for the consumer side to identify different chunk >> messages. >> >> BR >> Xiangying >> >> [0] https://github.com/apache/pulsar/pull/21047 >> >> On Sat, Aug 26, 2023 at 9:34 AM Heesung Sohn >> <heesung.s...@streamnative.io.invalid> wrote: >> > >> > I think this means, for the PIP, the broker side's chunk deduplication. >> > I think brokers probably need to track map<uuid, last_chunk_id> to dedup >> > chunks on the broker side. >> > >> > >> > >> > >> > On Fri, Aug 25, 2023 at 6:16 PM Xiangying Meng <xiangy...@apache.org> >> wrote: >> > >> > > Hi Heesung >> > > >> > > It is a good point. >> > > Assume the producer application jvm restarts in the middle of >> chunking and >> > > resends the message chunks from the beginning with the previous >> sequence >> > > id. >> > > >> > > For the previous version, it should be: >> > > >> > > Producer send: >> > > 1. SequenceID: 0, ChunkID: 0 >> > > 2. SequenceID: 0, ChunkID: 1 >> > > 3. SequenceID: 0, ChunkID: 0 >> > > 4. SequenceID: 0, ChunkID: 1 >> > > 5. SequenceID: 0, ChunkID: 2 >> > > >> > > Consumer receive: >> > > 1. SequenceID: 0, ChunkID: 0 >> > > 2. SequenceID: 0, ChunkID: 1 >> > > 3. SequenceID: 0, ChunkID: 0 // chunk ID out of order. Release this >> > > chunk and recycle its `chunkedMsgCtx`. >> > > 4. SequenceID: 0, ChunkID: 1 // chunkedMsgCtx == null Release it. >> > > 5. SequenceID: 0, ChunkID: 2 // chunkedMsgCtx == null Release it. >> > > >> > > Therefore, for the previous version, this chunk message can not be >> > > received by the consumer. It is not an incompatibility issue. >> > > >> > > However, the solution of optimizing the `uuid` is valuable to the new >> > > implementation. >> > > I will modify this in the PR[0]. Thank you very much for your reminder >> > > and the provided UUID optimization solution. >> > > >> > > BR, >> > > Xiangying >> > > >> > > [0] https://github.com/apache/pulsar/pull/20948 >> > > >> > > On Sat, Aug 26, 2023 at 8:48 AM Heesung Sohn >> > > <heesung.s...@streamnative.io.invalid> wrote: >> > > > >> > > > Hi, I meant >> > > > >> > > > What if the producer application jvm restarts in the middle of >> chunking >> > > and >> > > > resends the message chunks from the beginning with the previous >> sequence >> > > id? >> > > > >> > > > >> > > > >> > > > On Fri, Aug 25, 2023 at 5:15 PM Xiangying Meng < >> xiangy...@apache.org> >> > > wrote: >> > > > >> > > > > Hi Heesung >> > > > > >> > > > > It is a good idea to cover this incompatibility case if the >> producer >> > > > > splits the chunk message again when retrying. >> > > > > >> > > > > But in fact, the producer only resents the chunks that are >> assembled >> > > > > to `OpSendMsg` instead of splitting the chunk message again. >> > > > > So, there is no incompatibility issue of resenting the chunk >> message >> > > > > by splitting the chunk message again. >> > > > > >> > > > > The logic of sending chunk messages can be found here: >> > > > > >> > > > > >> > > >> https://github.com/apache/pulsar/blob/e0c481e5f8d7fa5534d3327785928a234376789e/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L533 >> > > > > >> > > > > The logic of resending the message can be found here: >> > > > > >> > > > > >> > > >> https://github.com/apache/pulsar/blob/e0c481e5f8d7fa5534d3327785928a234376789e/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L1892 >> > > > > >> > > > > BR, >> > > > > Xiangying >> > > > > >> > > > > >> > > > > >> > > > > >> > > > > >> > > > > >> > > > > >> > > > > On Sat, Aug 26, 2023 at 2:24 AM Heesung Sohn >> > > > > <heesung.s...@streamnative.io.invalid> wrote: >> > > > > > >> > > > > > >> I think brokers can track the last chunkMaxMessageSize for >> each >> > > > > producer. >> > > > > > >> > > > > > > Using different chunkMaxMessageSize is just one of the >> aspects. In >> > > > > > PIP-132 [0], we have included the message metadata size when >> checking >> > > > > > maxMessageSize.The message metadata can be changed after >> splitting >> > > the >> > > > > > chunks. We are still uncertain about the way the chunked >> message is >> > > > > split, >> > > > > > even using the same ss chunkMaxMessageSize. >> > > > > > >> > > > > > This sounds like we need to revisit chunking uuid logic. >> > > > > > Like I commented here, >> > > > > > https://github.com/apache/pulsar/pull/20948/files#r1305997883 >> > > > > > Why don't we add a chunk session id suffix to identify the >> ongoing >> > > > > chunking >> > > > > > uniquely? >> > > > > > >> > > > > > Currently, >> > > > > > >> > > > > > chunking uuid = producer + sequence_id >> > > > > > >> > > > > > Proposal >> > > > > > chunking uuid = producer + sequence_id + chunkingSessionId >> > > > > > >> > > > > > * chunkingSessionId could be a timestamp when the chunking >> started. >> > > > > > >> > > > > > >> > > > > > >> > > > > > On Fri, Aug 25, 2023 at 6:02 AM Xiangying Meng < >> xiangy...@apache.org >> > > > >> > > > > wrote: >> > > > > > >> > > > > > > Hi Zike, >> > > > > > > >> > > > > > > >How would this happen to get two duplicated and consecutive >> > > ChunkID-1 >> > > > > > > >messages? The producer should guarantee to retry the whole >> chunked >> > > > > > > >messages instead of some parts of the chunks. >> > > > > > > >> > > > > > > If the producer guarantees to retry the whole chunked messages >> > > instead >> > > > > > > of some parts of the chunks, >> > > > > > > Why doesn't the bug of the producer retry chunk messages in >> the PR >> > > [0] >> > > > > > > appear? >> > > > > > > And why do you need to set `chunkId` in `op.rePopulate`? >> > > > > > > It will be rested when split chunk messages again if the >> producer >> > > > > > > guarantees to retry the whole chunked messages. >> > > > > > > ``` >> > > > > > > final MessageMetadata finalMsgMetadata = msgMetadata; >> > > > > > > op.rePopulate = () -> { >> > > > > > > if (msgMetadata.hasChunkId()) { >> > > > > > > // The message metadata is shared between all chunks in a >> large >> > > message >> > > > > > > // We need to reset the chunk id for each call of this method >> > > > > > > // It's safe to do that because there is only 1 thread to >> > > manipulate >> > > > > > > this message metadata >> > > > > > > finalMsgMetadata.setChunkId(chunkId); >> > > > > > > } >> > > > > > > op.cmd = sendMessage(producerId, sequenceId, numMessages, >> > > messageId, >> > > > > > > finalMsgMetadata, >> > > > > > > encryptedPayload); >> > > > > > > }; >> > > > > > > >> > > > > > > ``` >> > > > > > > >> > > > > > > >> But chunks 1, 2, 3, and 4 are still persisted in the topic. >> > > > > > > > >> > > > > > > >I think it's OK to persist them all on the topic. Is there >> any >> > > issue >> > > > > > > >with doing that? >> > > > > > > >> > > > > > > This is just one scenario. Whether only check the sequence ID >> of >> > > the >> > > > > > > first chunk (as I used in PR[1]) or check the sequence ID of >> the >> > > last >> > > > > > > chunk (as you suggested), in reality, neither of these >> methods can >> > > > > > > deduplicate chunks on the broker side because the broker >> cannot >> > > know >> > > > > > > the chunk ID of the previous message. >> > > > > > > >> > > > > > > However, if combined with the optimization of consumer-side >> logic, >> > > > > > > end-to-end deduplication can be completed. >> > > > > > > This is also a less-than-perfect solution I mentioned in my >> first >> > > > > > > email and implemented in PR[1]. >> > > > > > > >> > > > > > > The reason I propose this proposal is not to solve the >> end-to-end >> > > > > > > deduplication of chunk messages between producers and >> consumers. >> > > That >> > > > > > > aspect has essentially been addressed in PR[1] and is still >> > > undergoing >> > > > > > > review. >> > > > > > > >> > > > > > > This proposal aims to ensure that no corrupt data exists >> within the >> > > > > > > topic, as our data might be offloaded and used elsewhere in >> > > scenarios >> > > > > > > where consumer logic is not optimized. >> > > > > > > >> > > > > > > BR, >> > > > > > > Xiangying >> > > > > > > >> > > > > > > [0] https://github.com/apache/pulsar/pull/21048 >> > > > > > > [1] https://github.com/apache/pulsar/pull/20948 >> > > > > > > >> > > > > > > On Fri, Aug 25, 2023 at 5:18 PM Zike Yang <z...@apache.org> >> wrote: >> > > > > > > > >> > > > > > > > HI xiangying >> > > > > > > > >> > > > > > > > > The rewind operation is seen in the test log. >> > > > > > > > >> > > > > > > > That seems weird. Not sure if this rewind is related to the >> chunk >> > > > > > > consuming. >> > > > > > > > >> > > > > > > > > 1. SequenceID: 0, ChunkID: 0 >> > > > > > > > 2. SequenceID: 0, ChunkID: 1 >> > > > > > > > 3. SequenceID: 0, ChunkID: 1 >> > > > > > > > 4. SequenceID: 0, ChunkID: 2 >> > > > > > > > Such four chunks cannot be processed correctly by the >> consumer. >> > > > > > > > >> > > > > > > > How would this happen to get two duplicated and consecutive >> > > ChunkID-1 >> > > > > > > > messages? The producer should guarantee to retry the whole >> > > chunked >> > > > > > > > messages instead of some parts of the chunks. >> > > > > > > > >> > > > > > > > > But chunks 1, 2, 3, and 4 are still persisted in the >> topic. >> > > > > > > > >> > > > > > > > I think it's OK to persist them all in the topic. Is there >> any >> > > issue >> > > > > > > > with doing that? >> > > > > > > > >> > > > > > > > > There is another point. The resend of the chunk message >> has a >> > > bug >> > > > > that >> > > > > > > > I shared with you, and you fixed in PR [0]. It will make >> this >> > > case >> > > > > > > > happen in another way. >> > > > > > > > >> > > > > > > > If the user sets the sequence ID manually, the case could be >> > > > > reproduced. >> > > > > > > > >> > > > > > > > BR, >> > > > > > > > Zike Yang >> > > > > > > > >> > > > > > > > On Thu, Aug 24, 2023 at 8:48 PM Xiangying Meng < >> > > xiangy...@apache.org >> > > > > > >> > > > > > > wrote: >> > > > > > > > > >> > > > > > > > > >IIUC, this may change the existing behavior and may >> introduce >> > > > > > > inconsistencies. >> > > > > > > > > >Suppose that we have a large message with 3 chunks. But >> the >> > > > > producer >> > > > > > > > > >crashes and resends the message after sending the >> chunk-1. It >> > > will >> > > > > > > > > >send a total of 5 messages to the Pulsar topic: >> > > > > > > > > > >> > > > > > > > > >1. SequenceID: 0, ChunkID: 0 >> > > > > > > > > >2. SequenceID: 0, ChunkID: 1 >> > > > > > > > > >3. SequenceID: 0, ChunkID: 0 -> This message will be >> dropped >> > > > > > > > > >4. SequenceID: 0, ChunkID: 1 -> Will also be dropped >> > > > > > > > > >5. SequenceID: 0, ChunkID: 2 -> The last chunk of the >> > > message >> > > > > > > > > >> > > > > > > > > Hi Zike >> > > > > > > > > There is another point. The resend of the chunk message >> has a >> > > bug >> > > > > that >> > > > > > > > > I shared with you, and you fixed in PR [0]. It will make >> this >> > > case >> > > > > > > > > happen in another way. >> > > > > > > > > Sample description for the bug: >> > > > > > > > > Because the chunk message uses the same message metadata, >> if >> > > the >> > > > > chunk >> > > > > > > > > is not sent out immediately. Then, when resending, all >> chunks >> > > of >> > > > > the >> > > > > > > > > same chunk message use the chunk ID of the last chunk. >> > > > > > > > > In this case, It should happen as: >> > > > > > > > > 1. SequenceID: 0, ChunkID: 0 (Put op1 into >> `pendingMessages` >> > > and >> > > > > send) >> > > > > > > > > 2. SequenceID: 0, ChunkID: 1 (Put op2 into >> `pendingMessages` >> > > and >> > > > > send) >> > > > > > > > > 3. SequenceID: 0, ChunkID: 2 -> (Put op3 into >> > > `pendingMessages`) >> > > > > > > > > 4. SequenceID: 0, ChunkID: 2 -> (Resend op1) >> > > > > > > > > 5. SequenceID: 0, ChunkID: 2 -> (Resend op2) >> > > > > > > > > 6. SequenceID: 0, ChunkID: 2 -> (Send op3) >> > > > > > > > > >> > > > > > > > > >> > > > > > > > > BR, >> > > > > > > > > Xiangying >> > > > > > > > > >> > > > > > > > > [0] - https://github.com/apache/pulsar/pull/21048 >> > > > > > > > > >> > > > > > > > > On Thu, Aug 24, 2023 at 8:09 PM Xiangying Meng < >> > > > > xiangy...@apache.org> >> > > > > > > wrote: >> > > > > > > > > > >> > > > > > > > > > >> This solution also cannot solve the out-of-order >> messages >> > > > > inside >> > > > > > > the >> > > > > > > > > > >>chunks. For example, the above five messages will >> still be >> > > > > > > persisted. >> > > > > > > > > > >The consumer already handles this case. The above 5 >> messages >> > > > > will >> > > > > > > all >> > > > > > > > > > >be persisted but the consumer will skip message 1 and >> 2. >> > > > > > > > > > >For messages 3, 4, and 5. The producer can guarantee >> these >> > > > > chunks >> > > > > > > are in order. >> > > > > > > > > > >> > > > > > > > > > The rewind operation is seen in the test log. Every >> time an >> > > > > incorrect >> > > > > > > > > > chunk message is received, it will rewind, and the code >> has >> > > yet >> > > > > to be >> > > > > > > > > > studied in depth. >> > > > > > > > > > If it does not call rewind, then this case is >> considered a >> > > > > workable >> > > > > > > > > > case. Let's look at another case. >> > > > > > > > > > 1. SequenceID: 0, ChunkID: 0 >> > > > > > > > > > 2. SequenceID: 0, ChunkID: 1 >> > > > > > > > > > 3. SequenceID: 0, ChunkID: 1 >> > > > > > > > > > 4. SequenceID: 0, ChunkID: 2 >> > > > > > > > > > Such four chunks cannot be processed correctly by the >> > > consumer. >> > > > > > > > > > >> > > > > > > > > > In fact, this solution is my original idea. The PR I >> > > mentioned >> > > > > in the >> > > > > > > > > > first email above uses a similar solution and modifies >> the >> > > logic >> > > > > on >> > > > > > > > > > the consumer side. >> > > > > > > > > > Also, as I mentioned in the first email, this solution >> can >> > > only >> > > > > solve >> > > > > > > > > > the problem of end-to-end duplication. But chunks 1, 2, >> 3, >> > > and 4 >> > > > > are >> > > > > > > > > > still persisted in the topic. >> > > > > > > > > > >> > > > > > > > > > On Thu, Aug 24, 2023 at 3:00 PM Zike Yang < >> z...@apache.org> >> > > > > wrote: >> > > > > > > > > > > >> > > > > > > > > > > Hi Heesung, >> > > > > > > > > > > >> > > > > > > > > > > > I believe in this PIP "similar to the existing >> "sequence >> > > ID >> > > > > map", >> > > > > > > > > > > to facilitate effective filtering" actually means >> tracking >> > > the >> > > > > last >> > > > > > > > > > > chunkId(not all chunk ids) on the broker side. >> > > > > > > > > > > >> > > > > > > > > > > With this simple solution, I think we don't need to >> track >> > > the >> > > > > > > > > > > (sequenceID, chunkID) on the broker side at all. The >> broker >> > > > > just >> > > > > > > needs >> > > > > > > > > > > to apply the deduplication logic to the last chunk >> instead >> > > of >> > > > > all >> > > > > > > > > > > previous chunks. This PIP actually could do that, but >> it >> > > will >> > > > > > > > > > > introduce a new data format and compatibility issue. >> > > > > > > > > > > >> > > > > > > > > > > > This is still a behavior change(deduping chunk >> messages >> > > on >> > > > > the >> > > > > > > broker), >> > > > > > > > > > > and I believe we need to discuss this addition as a >> PIP. >> > > > > > > > > > > >> > > > > > > > > > > Actually, we didn't specifically state the deduping >> chunk >> > > > > message >> > > > > > > > > > > behavior before. The chunked message should be equally >> > > > > applicable >> > > > > > > to >> > > > > > > > > > > the de-duplication logic as a regular message. >> Therefore, I >> > > > > think >> > > > > > > it >> > > > > > > > > > > should be considered as a bug fix. But if this FIX is >> worth >> > > > > > > discussing >> > > > > > > > > > > in depth. I have no objection to it being a new PIP. >> > > > > > > > > > > >> > > > > > > > > > > > I think brokers can track the last >> chunkMaxMessageSize >> > > for >> > > > > > > > > > > each producer. >> > > > > > > > > > > >> > > > > > > > > > > Using different chunkMaxMessageSize is just one of the >> > > > > aspects. In >> > > > > > > > > > > PIP-132 [0], we have included the message metadata >> size >> > > when >> > > > > > > checking >> > > > > > > > > > > maxMessageSize. >> > > > > > > > > > > The message metadata can be changed after splitting >> the >> > > > > chunks. We >> > > > > > > are >> > > > > > > > > > > still uncertain about the way the chunked message is >> split, >> > > > > even >> > > > > > > using >> > > > > > > > > > > the same ss chunkMaxMessageSize. >> > > > > > > > > > > >> > > > > > > > > > > > then the brokers can assume that the producer is >> > > resending >> > > > > the >> > > > > > > chunks from >> > > > > > > > > > > the beginning with a different scheme(restarted with a >> > > > > different >> > > > > > > > > > > chunkMaxMessageSize) and accept those new chunks from >> the >> > > > > > > beginning. >> > > > > > > > > > > >> > > > > > > > > > > Regarding this, it seems like we are implementing >> dynamic >> > > > > > > > > > > configuration for the chunkMaxMessageSize. I'm afraid >> that >> > > this >> > > > > > > would >> > > > > > > > > > > change the expected behavior and introduce more >> complexity >> > > to >> > > > > this >> > > > > > > > > > > configuration. >> > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > [0] https://github.com/apache/pulsar/pull/14007 >> > > > > > > > > > > >> > > > > > > > > > > BR, >> > > > > > > > > > > Zike Yang >> > > > > > > > > > > >> > > > > > > > > > > On Thu, Aug 24, 2023 at 2:21 PM Zike Yang < >> z...@apache.org >> > > > >> > > > > wrote: >> > > > > > > > > > > > >> > > > > > > > > > > > Hi, xiangying >> > > > > > > > > > > > >> > > > > > > > > > > > > it will find that the message >> > > > > > > > > > > > is out of order and rewind the cursor. Loop this >> > > operation, >> > > > > and >> > > > > > > > > > > > discard this message after it expires instead of >> > > assembling >> > > > > 3, >> > > > > > > 4, 5 >> > > > > > > > > > > > into a message. >> > > > > > > > > > > > >> > > > > > > > > > > > Could you point out where the implementation for >> this? >> > > From >> > > > > my >> > > > > > > > > > > > understanding, there should not be any rewind >> operation >> > > for >> > > > > the >> > > > > > > > > > > > chunking feature. You can check more detail here: >> > > > > > > > > > > > >> > > > > > > >> > > > > >> > > >> https://streamnative.io/blog/deep-dive-into-message-chunking-in-pulsar#how-message-chunking-is-implemented >> > > > > > > > > > > > >> > > > > > > > > > > > > This solution also cannot solve the out-of-order >> > > messages >> > > > > > > inside the >> > > > > > > > > > > > chunks. For example, the above five messages will >> still >> > > be >> > > > > > > persisted. >> > > > > > > > > > > > >> > > > > > > > > > > > The consumer already handles this case. The above 5 >> > > messages >> > > > > > > will all >> > > > > > > > > > > > be persisted but the consumer will skip message 1 >> and 2. >> > > > > > > > > > > > For messages 3, 4, and 5. The producer can guarantee >> > > these >> > > > > > > chunks are in order. >> > > > > > > > > > > > >> > > > > > > > > > > > BR, >> > > > > > > > > > > > Zike Yang >> > > > > > > > > > > > >> > > > > > > > > > > > On Thu, Aug 24, 2023 at 11:48 AM Yubiao Feng >> > > > > > > > > > > > <yubiao.f...@streamnative.io.invalid> wrote: >> > > > > > > > > > > > > >> > > > > > > > > > > > > > 1. SequenceID: 0, ChunkID: 0 >> > > > > > > > > > > > > > 2. SequenceID: 0, ChunkID: 1 >> > > > > > > > > > > > > > 3. SequenceID: 0, ChunkID: 0 >> > > > > > > > > > > > > > 4. SequenceID: 0, ChunkID: 1 >> > > > > > > > > > > > > > 5. SequenceID: 0, ChunkID: 2 >> > > > > > > > > > > > > > For the existing behavior, the consumer >> assembles >> > > > > > > > > > > > > > messages 3,4,5 into >> > > > > > > > > > > > > > the original large message. But the changes >> brought >> > > > > > > > > > > > > > about by this PIP >> > > > > > > > > > > > > > will cause the consumer to use messages 1,2,5 >> for >> > > > > > > > > > > > > > assembly. There is >> > > > > > > > > > > > > > no guarantee that the producer will split the >> message >> > > > > > > > > > > > > > in the same way >> > > > > > > > > > > > > > twice before and after. For example, the >> producer's >> > > > > > > > > > > > > > maxMessageSize may >> > > > > > > > > > > > > > be different. This may cause the consumer to >> > > > > > > > > > > > > > receive a corrupt >> > > > > > > > > > > > > > message. >> > > > > > > > > > > > > >> > > > > > > > > > > > > Good point. >> > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > > Thanks >> > > > > > > > > > > > > Yubiao Feng >> > > > > > > > > > > > > >> > > > > > > > > > > > > On Wed, Aug 23, 2023 at 12:34 PM Zike Yang < >> > > > > z...@apache.org> >> > > > > > > wrote: >> > > > > > > > > > > > > >> > > > > > > > > > > > > > Hi, xiangying, >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > Thanks for your PIP. >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > IIUC, this may change the existing behavior and >> may >> > > > > introduce >> > > > > > > > > > > > > > inconsistencies. >> > > > > > > > > > > > > > Suppose that we have a large message with 3 >> chunks. >> > > But >> > > > > the >> > > > > > > producer >> > > > > > > > > > > > > > crashes and resends the message after sending >> the >> > > > > chunk-1. >> > > > > > > It will >> > > > > > > > > > > > > > send a total of 5 messages to the Pulsar topic: >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > 1. SequenceID: 0, ChunkID: 0 >> > > > > > > > > > > > > > 2. SequenceID: 0, ChunkID: 1 >> > > > > > > > > > > > > > 3. SequenceID: 0, ChunkID: 0 -> This message >> will >> > > be >> > > > > > > dropped >> > > > > > > > > > > > > > 4. SequenceID: 0, ChunkID: 1 -> Will also be >> > > dropped >> > > > > > > > > > > > > > 5. SequenceID: 0, ChunkID: 2 -> The last >> chunk of >> > > the >> > > > > > > message >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > For the existing behavior, the consumer >> assembles >> > > > > messages >> > > > > > > 3,4,5 into >> > > > > > > > > > > > > > the original large message. But the changes >> brought >> > > > > about by >> > > > > > > this PIP >> > > > > > > > > > > > > > will cause the consumer to use messages 1,2,5 >> for >> > > > > assembly. >> > > > > > > There is >> > > > > > > > > > > > > > no guarantee that the producer will split the >> > > message in >> > > > > the >> > > > > > > same way >> > > > > > > > > > > > > > twice before and after. For example, the >> producer's >> > > > > > > maxMessageSize may >> > > > > > > > > > > > > > be different. This may cause the consumer to >> receive >> > > a >> > > > > > > corrupt >> > > > > > > > > > > > > > message. >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > Also, this PIP increases the complexity of >> handling >> > > > > chunks >> > > > > > > on the >> > > > > > > > > > > > > > broker side. Brokers should, in general, treat >> the >> > > chunk >> > > > > as >> > > > > > > a normal >> > > > > > > > > > > > > > message. >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > I think a simple better approach is to only >> check the >> > > > > > > deduplication >> > > > > > > > > > > > > > for the last chunk of the large message. The >> consumer >> > > > > only >> > > > > > > gets the >> > > > > > > > > > > > > > whole message after receiving the last chunk. We >> > > don't >> > > > > need >> > > > > > > to check >> > > > > > > > > > > > > > the deduplication for all previous chunks. Also >> by >> > > doing >> > > > > > > this we only >> > > > > > > > > > > > > > need bug fixes, we don't need to introduce a >> new PIP. >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > BR, >> > > > > > > > > > > > > > Zike Yang >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > On Fri, Aug 18, 2023 at 7:54 PM Xiangying Meng < >> > > > > > > xiangy...@apache.org> >> > > > > > > > > > > > > > wrote: >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > Dear Community, >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > I hope this email finds you well. I'd like to >> > > address >> > > > > an >> > > > > > > important >> > > > > > > > > > > > > > > issue related to Apache Pulsar and discuss a >> > > solution >> > > > > I've >> > > > > > > proposed on >> > > > > > > > > > > > > > > GitHub. The problem pertains to the handling >> of >> > > Chunk >> > > > > > > Messages after >> > > > > > > > > > > > > > > enabling deduplication. >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > In the current version of Apache Pulsar, all >> > > chunks of >> > > > > a >> > > > > > > Chunk Message >> > > > > > > > > > > > > > > share the same sequence ID. However, enabling >> the >> > > > > > > depublication >> > > > > > > > > > > > > > > feature results in an inability to send Chunk >> > > > > Messages. To >> > > > > > > tackle this >> > > > > > > > > > > > > > > problem, I've proposed a solution [1] that >> ensures >> > > > > > > messages are not >> > > > > > > > > > > > > > > duplicated throughout end-to-end delivery. >> While >> > > this >> > > > > fix >> > > > > > > addresses >> > > > > > > > > > > > > > > the duplication issue for end-to-end messages, >> > > there >> > > > > > > remains a >> > > > > > > > > > > > > > > possibility of duplicate chunks within topics. >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > To address this concern, I believe we should >> > > introduce >> > > > > a >> > > > > > > "Chunk ID >> > > > > > > > > > > > > > > map" at the Broker level, similar to the >> existing >> > > > > > > "sequence ID map", >> > > > > > > > > > > > > > > to facilitate effective filtering. However, >> > > > > implementing >> > > > > > > this has led >> > > > > > > > > > > > > > > to a challenge: a producer requires storage >> for two >> > > > > Long >> > > > > > > values >> > > > > > > > > > > > > > > simultaneously (sequence ID and chunk ID). >> Because >> > > the >> > > > > > > snapshot of the >> > > > > > > > > > > > > > > sequence ID map is stored through the >> properties >> > > of the >> > > > > > > cursor >> > > > > > > > > > > > > > > (Map<String, Long>), so in order to satisfy >> the >> > > > > storage of >> > > > > > > two Longs >> > > > > > > > > > > > > > > (sequence ID, chunk ID) corresponding to one >> > > producer, >> > > > > we >> > > > > > > hope to add >> > > > > > > > > > > > > > > a mark DeleteProperties (Map<String, Long>) >> String, >> > > > > > > String>) to >> > > > > > > > > > > > > > > replace the properties (Map<String, Long>) >> field. >> > > To >> > > > > > > resolve this, >> > > > > > > > > > > > > > > I've proposed an alternative proposal [2] >> > > involving the >> > > > > > > introduction >> > > > > > > > > > > > > > > of a "mark DeleteProperties" (Map<String, >> String>) >> > > to >> > > > > > > replace the >> > > > > > > > > > > > > > > current properties (Map<String, Long>) field. >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > I'd appreciate it if you carefully review >> both PRs >> > > and >> > > > > > > share your >> > > > > > > > > > > > > > > valuable feedback and insights. Thank you >> > > immensely for >> > > > > > > your time and >> > > > > > > > > > > > > > > attention. I eagerly anticipate your valuable >> > > opinions >> > > > > and >> > > > > > > > > > > > > > > recommendations. >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > Warm regards, >> > > > > > > > > > > > > > > Xiangying >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > [1] >> https://github.com/apache/pulsar/pull/20948 >> > > > > > > > > > > > > > > [2] >> https://github.com/apache/pulsar/pull/21027 >> > > > > > > > > > > > > > >> > > > > > > >> > > > > >> > > >> >