However, what If the producer jvm gets restarted after the broker persists the m1 (but before updating their sequence id in their persistence storage), and the producer is trying to resend the same msg(so m2) with the same sequence id after restarting?
On Fri, Aug 25, 2023 at 8:22 PM Xiangying Meng <xiangy...@apache.org> wrote: > Hi Heesung, > > In this case, the consumer only can receive m1. > > But it has the same content as the previous case: What should we do if > the user sends messages with the sequence ID that was used previously? > > I am afraid to introduce the incompatibility in this case, so I only > added a warning log in the PR[0] instead of throwing an exception. > Regarding this matter, what do you think? Should we throw an exception > or add error logs? > > I'm looking forward to hearing your viewpoint. > > Thanks, > Xiangying > > [0] https://github.com/apache/pulsar/pull/21047 > > On Sat, Aug 26, 2023 at 10:58 AM Heesung Sohn > <heesung.s...@streamnative.io.invalid> wrote: > > > > Actually, can we think about this case too? > > > > What happens if the cx sends the same chunked msg with the same seq id > when > > dedup is enabled? > > > > // user send a chunked msg, m1 > > s1, c0 > > s1, c1 > > s1, c2 // complete > > > > // user resend the duplicate msg, m2 > > s1, c0 > > s1, c1 > > s1, c2 //complete > > > > Do consumers receive m1 and m2(no dedup)? > > > > > > > > On Fri, Aug 25, 2023 at 6:55 PM Xiangying Meng <xiangy...@apache.org> > wrote: > > > > > Hi Heesung, > > > > > > >I think this means, for the PIP, the broker side's chunk > deduplication. > > > >I think brokers probably need to track map<uuid, last_chunk_id> to > dedup > > > > > > What is the significance of doing this? > > > My understanding is that if the client crashes and restarts after > > > sending half of a chunk message and then it resends the previous chunk > > > message, the resent chunk message should be treated as a new message > > > since it calls the producer's API again. > > > If deduplication is enabled, users should ensure that their customized > > > sequence ID is not lower than the previous sequence ID. > > > I have considered this scenario and added a warning log in PR[0]. (I'm > > > not sure whether an error log should be added or an exception thrown.) > > > If deduplication is not enabled, on the consumer side, there should be > > > an incomplete chunk message received alongside another complete chunk > > > message, each with a different UUID, and they will not interfere with > > > each other. > > > > > > My main point is that every message sent using > > > `producer.newMessage().send()` should be treated as a new message. > > > UUID is solely used for the consumer side to identify different chunk > > > messages. > > > > > > BR > > > Xiangying > > > > > > [0] https://github.com/apache/pulsar/pull/21047 > > > > > > On Sat, Aug 26, 2023 at 9:34 AM Heesung Sohn > > > <heesung.s...@streamnative.io.invalid> wrote: > > > > > > > > I think this means, for the PIP, the broker side's chunk > deduplication. > > > > I think brokers probably need to track map<uuid, last_chunk_id> to > dedup > > > > chunks on the broker side. > > > > > > > > > > > > > > > > > > > > On Fri, Aug 25, 2023 at 6:16 PM Xiangying Meng <xiangy...@apache.org > > > > > wrote: > > > > > > > > > Hi Heesung > > > > > > > > > > It is a good point. > > > > > Assume the producer application jvm restarts in the middle of > chunking > > > and > > > > > resends the message chunks from the beginning with the previous > > > sequence > > > > > id. > > > > > > > > > > For the previous version, it should be: > > > > > > > > > > Producer send: > > > > > 1. SequenceID: 0, ChunkID: 0 > > > > > 2. SequenceID: 0, ChunkID: 1 > > > > > 3. SequenceID: 0, ChunkID: 0 > > > > > 4. SequenceID: 0, ChunkID: 1 > > > > > 5. SequenceID: 0, ChunkID: 2 > > > > > > > > > > Consumer receive: > > > > > 1. SequenceID: 0, ChunkID: 0 > > > > > 2. SequenceID: 0, ChunkID: 1 > > > > > 3. SequenceID: 0, ChunkID: 0 // chunk ID out of order. Release this > > > > > chunk and recycle its `chunkedMsgCtx`. > > > > > 4. SequenceID: 0, ChunkID: 1 // chunkedMsgCtx == null Release it. > > > > > 5. SequenceID: 0, ChunkID: 2 // chunkedMsgCtx == null Release it. > > > > > > > > > > Therefore, for the previous version, this chunk message can not be > > > > > received by the consumer. It is not an incompatibility issue. > > > > > > > > > > However, the solution of optimizing the `uuid` is valuable to the > new > > > > > implementation. > > > > > I will modify this in the PR[0]. Thank you very much for your > reminder > > > > > and the provided UUID optimization solution. > > > > > > > > > > BR, > > > > > Xiangying > > > > > > > > > > [0] https://github.com/apache/pulsar/pull/20948 > > > > > > > > > > On Sat, Aug 26, 2023 at 8:48 AM Heesung Sohn > > > > > <heesung.s...@streamnative.io.invalid> wrote: > > > > > > > > > > > > Hi, I meant > > > > > > > > > > > > What if the producer application jvm restarts in the middle of > > > chunking > > > > > and > > > > > > resends the message chunks from the beginning with the previous > > > sequence > > > > > id? > > > > > > > > > > > > > > > > > > > > > > > > On Fri, Aug 25, 2023 at 5:15 PM Xiangying Meng < > xiangy...@apache.org > > > > > > > > > wrote: > > > > > > > > > > > > > Hi Heesung > > > > > > > > > > > > > > It is a good idea to cover this incompatibility case if the > > > producer > > > > > > > splits the chunk message again when retrying. > > > > > > > > > > > > > > But in fact, the producer only resents the chunks that are > > > assembled > > > > > > > to `OpSendMsg` instead of splitting the chunk message again. > > > > > > > So, there is no incompatibility issue of resenting the chunk > > > message > > > > > > > by splitting the chunk message again. > > > > > > > > > > > > > > The logic of sending chunk messages can be found here: > > > > > > > > > > > > > > > > > > > > > > > https://github.com/apache/pulsar/blob/e0c481e5f8d7fa5534d3327785928a234376789e/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L533 > > > > > > > > > > > > > > The logic of resending the message can be found here: > > > > > > > > > > > > > > > > > > > > > > > https://github.com/apache/pulsar/blob/e0c481e5f8d7fa5534d3327785928a234376789e/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L1892 > > > > > > > > > > > > > > BR, > > > > > > > Xiangying > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Sat, Aug 26, 2023 at 2:24 AM Heesung Sohn > > > > > > > <heesung.s...@streamnative.io.invalid> wrote: > > > > > > > > > > > > > > > > >> I think brokers can track the last chunkMaxMessageSize for > > > each > > > > > > > producer. > > > > > > > > > > > > > > > > > Using different chunkMaxMessageSize is just one of the > > > aspects. In > > > > > > > > PIP-132 [0], we have included the message metadata size when > > > checking > > > > > > > > maxMessageSize.The message metadata can be changed after > > > splitting > > > > > the > > > > > > > > chunks. We are still uncertain about the way the chunked > message > > > is > > > > > > > split, > > > > > > > > even using the same ss chunkMaxMessageSize. > > > > > > > > > > > > > > > > This sounds like we need to revisit chunking uuid logic. > > > > > > > > Like I commented here, > > > > > > > > > https://github.com/apache/pulsar/pull/20948/files#r1305997883 > > > > > > > > Why don't we add a chunk session id suffix to identify the > > > ongoing > > > > > > > chunking > > > > > > > > uniquely? > > > > > > > > > > > > > > > > Currently, > > > > > > > > > > > > > > > > chunking uuid = producer + sequence_id > > > > > > > > > > > > > > > > Proposal > > > > > > > > chunking uuid = producer + sequence_id + chunkingSessionId > > > > > > > > > > > > > > > > * chunkingSessionId could be a timestamp when the chunking > > > started. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Fri, Aug 25, 2023 at 6:02 AM Xiangying Meng < > > > xiangy...@apache.org > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > Hi Zike, > > > > > > > > > > > > > > > > > > >How would this happen to get two duplicated and > consecutive > > > > > ChunkID-1 > > > > > > > > > >messages? The producer should guarantee to retry the whole > > > chunked > > > > > > > > > >messages instead of some parts of the chunks. > > > > > > > > > > > > > > > > > > If the producer guarantees to retry the whole chunked > messages > > > > > instead > > > > > > > > > of some parts of the chunks, > > > > > > > > > Why doesn't the bug of the producer retry chunk messages in > > > the PR > > > > > [0] > > > > > > > > > appear? > > > > > > > > > And why do you need to set `chunkId` in `op.rePopulate`? > > > > > > > > > It will be rested when split chunk messages again if the > > > producer > > > > > > > > > guarantees to retry the whole chunked messages. > > > > > > > > > ``` > > > > > > > > > final MessageMetadata finalMsgMetadata = msgMetadata; > > > > > > > > > op.rePopulate = () -> { > > > > > > > > > if (msgMetadata.hasChunkId()) { > > > > > > > > > // The message metadata is shared between all chunks in a > large > > > > > message > > > > > > > > > // We need to reset the chunk id for each call of this > method > > > > > > > > > // It's safe to do that because there is only 1 thread to > > > > > manipulate > > > > > > > > > this message metadata > > > > > > > > > finalMsgMetadata.setChunkId(chunkId); > > > > > > > > > } > > > > > > > > > op.cmd = sendMessage(producerId, sequenceId, numMessages, > > > > > messageId, > > > > > > > > > finalMsgMetadata, > > > > > > > > > encryptedPayload); > > > > > > > > > }; > > > > > > > > > > > > > > > > > > ``` > > > > > > > > > > > > > > > > > > >> But chunks 1, 2, 3, and 4 are still persisted in the > topic. > > > > > > > > > > > > > > > > > > > >I think it's OK to persist them all on the topic. Is > there any > > > > > issue > > > > > > > > > >with doing that? > > > > > > > > > > > > > > > > > > This is just one scenario. Whether only check the sequence > ID > > > of > > > > > the > > > > > > > > > first chunk (as I used in PR[1]) or check the sequence ID > of > > > the > > > > > last > > > > > > > > > chunk (as you suggested), in reality, neither of these > methods > > > can > > > > > > > > > deduplicate chunks on the broker side because the broker > cannot > > > > > know > > > > > > > > > the chunk ID of the previous message. > > > > > > > > > > > > > > > > > > However, if combined with the optimization of consumer-side > > > logic, > > > > > > > > > end-to-end deduplication can be completed. > > > > > > > > > This is also a less-than-perfect solution I mentioned in my > > > first > > > > > > > > > email and implemented in PR[1]. > > > > > > > > > > > > > > > > > > The reason I propose this proposal is not to solve the > > > end-to-end > > > > > > > > > deduplication of chunk messages between producers and > > > consumers. > > > > > That > > > > > > > > > aspect has essentially been addressed in PR[1] and is still > > > > > undergoing > > > > > > > > > review. > > > > > > > > > > > > > > > > > > This proposal aims to ensure that no corrupt data exists > > > within the > > > > > > > > > topic, as our data might be offloaded and used elsewhere in > > > > > scenarios > > > > > > > > > where consumer logic is not optimized. > > > > > > > > > > > > > > > > > > BR, > > > > > > > > > Xiangying > > > > > > > > > > > > > > > > > > [0] https://github.com/apache/pulsar/pull/21048 > > > > > > > > > [1] https://github.com/apache/pulsar/pull/20948 > > > > > > > > > > > > > > > > > > On Fri, Aug 25, 2023 at 5:18 PM Zike Yang <z...@apache.org > > > > > wrote: > > > > > > > > > > > > > > > > > > > > HI xiangying > > > > > > > > > > > > > > > > > > > > > The rewind operation is seen in the test log. > > > > > > > > > > > > > > > > > > > > That seems weird. Not sure if this rewind is related to > the > > > chunk > > > > > > > > > consuming. > > > > > > > > > > > > > > > > > > > > > 1. SequenceID: 0, ChunkID: 0 > > > > > > > > > > 2. SequenceID: 0, ChunkID: 1 > > > > > > > > > > 3. SequenceID: 0, ChunkID: 1 > > > > > > > > > > 4. SequenceID: 0, ChunkID: 2 > > > > > > > > > > Such four chunks cannot be processed correctly by the > > > consumer. > > > > > > > > > > > > > > > > > > > > How would this happen to get two duplicated and > consecutive > > > > > ChunkID-1 > > > > > > > > > > messages? The producer should guarantee to retry the > whole > > > > > chunked > > > > > > > > > > messages instead of some parts of the chunks. > > > > > > > > > > > > > > > > > > > > > But chunks 1, 2, 3, and 4 are still persisted in the > topic. > > > > > > > > > > > > > > > > > > > > I think it's OK to persist them all in the topic. Is > there > > > any > > > > > issue > > > > > > > > > > with doing that? > > > > > > > > > > > > > > > > > > > > > There is another point. The resend of the chunk message > > > has a > > > > > bug > > > > > > > that > > > > > > > > > > I shared with you, and you fixed in PR [0]. It will make > this > > > > > case > > > > > > > > > > happen in another way. > > > > > > > > > > > > > > > > > > > > If the user sets the sequence ID manually, the case > could be > > > > > > > reproduced. > > > > > > > > > > > > > > > > > > > > BR, > > > > > > > > > > Zike Yang > > > > > > > > > > > > > > > > > > > > On Thu, Aug 24, 2023 at 8:48 PM Xiangying Meng < > > > > > xiangy...@apache.org > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > >IIUC, this may change the existing behavior and may > > > introduce > > > > > > > > > inconsistencies. > > > > > > > > > > > >Suppose that we have a large message with 3 chunks. > But > > > the > > > > > > > producer > > > > > > > > > > > >crashes and resends the message after sending the > > > chunk-1. It > > > > > will > > > > > > > > > > > >send a total of 5 messages to the Pulsar topic: > > > > > > > > > > > > > > > > > > > > > > > >1. SequenceID: 0, ChunkID: 0 > > > > > > > > > > > >2. SequenceID: 0, ChunkID: 1 > > > > > > > > > > > >3. SequenceID: 0, ChunkID: 0 -> This message will be > > > dropped > > > > > > > > > > > >4. SequenceID: 0, ChunkID: 1 -> Will also be > dropped > > > > > > > > > > > >5. SequenceID: 0, ChunkID: 2 -> The last chunk of > the > > > > > message > > > > > > > > > > > > > > > > > > > > > > Hi Zike > > > > > > > > > > > There is another point. The resend of the chunk message > > > has a > > > > > bug > > > > > > > that > > > > > > > > > > > I shared with you, and you fixed in PR [0]. It will > make > > > this > > > > > case > > > > > > > > > > > happen in another way. > > > > > > > > > > > Sample description for the bug: > > > > > > > > > > > Because the chunk message uses the same message > metadata, > > > if > > > > > the > > > > > > > chunk > > > > > > > > > > > is not sent out immediately. Then, when resending, all > > > chunks > > > > > of > > > > > > > the > > > > > > > > > > > same chunk message use the chunk ID of the last chunk. > > > > > > > > > > > In this case, It should happen as: > > > > > > > > > > > 1. SequenceID: 0, ChunkID: 0 (Put op1 into > > > `pendingMessages` > > > > > and > > > > > > > send) > > > > > > > > > > > 2. SequenceID: 0, ChunkID: 1 (Put op2 into > > > `pendingMessages` > > > > > and > > > > > > > send) > > > > > > > > > > > 3. SequenceID: 0, ChunkID: 2 -> (Put op3 into > > > > > `pendingMessages`) > > > > > > > > > > > 4. SequenceID: 0, ChunkID: 2 -> (Resend op1) > > > > > > > > > > > 5. SequenceID: 0, ChunkID: 2 -> (Resend op2) > > > > > > > > > > > 6. SequenceID: 0, ChunkID: 2 -> (Send op3) > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > BR, > > > > > > > > > > > Xiangying > > > > > > > > > > > > > > > > > > > > > > [0] - https://github.com/apache/pulsar/pull/21048 > > > > > > > > > > > > > > > > > > > > > > On Thu, Aug 24, 2023 at 8:09 PM Xiangying Meng < > > > > > > > xiangy...@apache.org> > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > >> This solution also cannot solve the out-of-order > > > messages > > > > > > > inside > > > > > > > > > the > > > > > > > > > > > > >>chunks. For example, the above five messages will > > > still be > > > > > > > > > persisted. > > > > > > > > > > > > >The consumer already handles this case. The above 5 > > > messages > > > > > > > will > > > > > > > > > all > > > > > > > > > > > > >be persisted but the consumer will skip message 1 > and 2. > > > > > > > > > > > > >For messages 3, 4, and 5. The producer can guarantee > > > these > > > > > > > chunks > > > > > > > > > are in order. > > > > > > > > > > > > > > > > > > > > > > > > The rewind operation is seen in the test log. Every > time > > > an > > > > > > > incorrect > > > > > > > > > > > > chunk message is received, it will rewind, and the > code > > > has > > > > > yet > > > > > > > to be > > > > > > > > > > > > studied in depth. > > > > > > > > > > > > If it does not call rewind, then this case is > considered > > > a > > > > > > > workable > > > > > > > > > > > > case. Let's look at another case. > > > > > > > > > > > > 1. SequenceID: 0, ChunkID: 0 > > > > > > > > > > > > 2. SequenceID: 0, ChunkID: 1 > > > > > > > > > > > > 3. SequenceID: 0, ChunkID: 1 > > > > > > > > > > > > 4. SequenceID: 0, ChunkID: 2 > > > > > > > > > > > > Such four chunks cannot be processed correctly by the > > > > > consumer. > > > > > > > > > > > > > > > > > > > > > > > > In fact, this solution is my original idea. The PR I > > > > > mentioned > > > > > > > in the > > > > > > > > > > > > first email above uses a similar solution and > modifies > > > the > > > > > logic > > > > > > > on > > > > > > > > > > > > the consumer side. > > > > > > > > > > > > Also, as I mentioned in the first email, this > solution > > > can > > > > > only > > > > > > > solve > > > > > > > > > > > > the problem of end-to-end duplication. But chunks 1, > 2, > > > 3, > > > > > and 4 > > > > > > > are > > > > > > > > > > > > still persisted in the topic. > > > > > > > > > > > > > > > > > > > > > > > > On Thu, Aug 24, 2023 at 3:00 PM Zike Yang < > > > z...@apache.org> > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > Hi Heesung, > > > > > > > > > > > > > > > > > > > > > > > > > > > I believe in this PIP "similar to the existing > > > "sequence > > > > > ID > > > > > > > map", > > > > > > > > > > > > > to facilitate effective filtering" actually means > > > tracking > > > > > the > > > > > > > last > > > > > > > > > > > > > chunkId(not all chunk ids) on the broker side. > > > > > > > > > > > > > > > > > > > > > > > > > > With this simple solution, I think we don't need to > > > track > > > > > the > > > > > > > > > > > > > (sequenceID, chunkID) on the broker side at all. > The > > > broker > > > > > > > just > > > > > > > > > needs > > > > > > > > > > > > > to apply the deduplication logic to the last chunk > > > instead > > > > > of > > > > > > > all > > > > > > > > > > > > > previous chunks. This PIP actually could do that, > but > > > it > > > > > will > > > > > > > > > > > > > introduce a new data format and compatibility > issue. > > > > > > > > > > > > > > > > > > > > > > > > > > > This is still a behavior change(deduping chunk > > > messages > > > > > on > > > > > > > the > > > > > > > > > broker), > > > > > > > > > > > > > and I believe we need to discuss this addition as a > > > PIP. > > > > > > > > > > > > > > > > > > > > > > > > > > Actually, we didn't specifically state the deduping > > > chunk > > > > > > > message > > > > > > > > > > > > > behavior before. The chunked message should be > equally > > > > > > > applicable > > > > > > > > > to > > > > > > > > > > > > > the de-duplication logic as a regular message. > > > Therefore, I > > > > > > > think > > > > > > > > > it > > > > > > > > > > > > > should be considered as a bug fix. But if this FIX > is > > > worth > > > > > > > > > discussing > > > > > > > > > > > > > in depth. I have no objection to it being a new > PIP. > > > > > > > > > > > > > > > > > > > > > > > > > > > I think brokers can track the last > > > chunkMaxMessageSize > > > > > for > > > > > > > > > > > > > each producer. > > > > > > > > > > > > > > > > > > > > > > > > > > Using different chunkMaxMessageSize is just one of > the > > > > > > > aspects. In > > > > > > > > > > > > > PIP-132 [0], we have included the message metadata > size > > > > > when > > > > > > > > > checking > > > > > > > > > > > > > maxMessageSize. > > > > > > > > > > > > > The message metadata can be changed after > splitting the > > > > > > > chunks. We > > > > > > > > > are > > > > > > > > > > > > > still uncertain about the way the chunked message > is > > > split, > > > > > > > even > > > > > > > > > using > > > > > > > > > > > > > the same ss chunkMaxMessageSize. > > > > > > > > > > > > > > > > > > > > > > > > > > > then the brokers can assume that the producer is > > > > > resending > > > > > > > the > > > > > > > > > chunks from > > > > > > > > > > > > > the beginning with a different scheme(restarted > with a > > > > > > > different > > > > > > > > > > > > > chunkMaxMessageSize) and accept those new chunks > from > > > the > > > > > > > > > beginning. > > > > > > > > > > > > > > > > > > > > > > > > > > Regarding this, it seems like we are implementing > > > dynamic > > > > > > > > > > > > > configuration for the chunkMaxMessageSize. I'm > afraid > > > that > > > > > this > > > > > > > > > would > > > > > > > > > > > > > change the expected behavior and introduce more > > > complexity > > > > > to > > > > > > > this > > > > > > > > > > > > > configuration. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > [0] https://github.com/apache/pulsar/pull/14007 > > > > > > > > > > > > > > > > > > > > > > > > > > BR, > > > > > > > > > > > > > Zike Yang > > > > > > > > > > > > > > > > > > > > > > > > > > On Thu, Aug 24, 2023 at 2:21 PM Zike Yang < > > > z...@apache.org > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hi, xiangying > > > > > > > > > > > > > > > > > > > > > > > > > > > > > it will find that the message > > > > > > > > > > > > > > is out of order and rewind the cursor. Loop this > > > > > operation, > > > > > > > and > > > > > > > > > > > > > > discard this message after it expires instead of > > > > > assembling > > > > > > > 3, > > > > > > > > > 4, 5 > > > > > > > > > > > > > > into a message. > > > > > > > > > > > > > > > > > > > > > > > > > > > > Could you point out where the implementation for > > > this? > > > > > From > > > > > > > my > > > > > > > > > > > > > > understanding, there should not be any rewind > > > operation > > > > > for > > > > > > > the > > > > > > > > > > > > > > chunking feature. You can check more detail here: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://streamnative.io/blog/deep-dive-into-message-chunking-in-pulsar#how-message-chunking-is-implemented > > > > > > > > > > > > > > > > > > > > > > > > > > > > > This solution also cannot solve the > out-of-order > > > > > messages > > > > > > > > > inside the > > > > > > > > > > > > > > chunks. For example, the above five messages will > > > still > > > > > be > > > > > > > > > persisted. > > > > > > > > > > > > > > > > > > > > > > > > > > > > The consumer already handles this case. The > above 5 > > > > > messages > > > > > > > > > will all > > > > > > > > > > > > > > be persisted but the consumer will skip message 1 > > > and 2. > > > > > > > > > > > > > > For messages 3, 4, and 5. The producer can > guarantee > > > > > these > > > > > > > > > chunks are in order. > > > > > > > > > > > > > > > > > > > > > > > > > > > > BR, > > > > > > > > > > > > > > Zike Yang > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Thu, Aug 24, 2023 at 11:48 AM Yubiao Feng > > > > > > > > > > > > > > <yubiao.f...@streamnative.io.invalid> wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 1. SequenceID: 0, ChunkID: 0 > > > > > > > > > > > > > > > > 2. SequenceID: 0, ChunkID: 1 > > > > > > > > > > > > > > > > 3. SequenceID: 0, ChunkID: 0 > > > > > > > > > > > > > > > > 4. SequenceID: 0, ChunkID: 1 > > > > > > > > > > > > > > > > 5. SequenceID: 0, ChunkID: 2 > > > > > > > > > > > > > > > > For the existing behavior, the consumer > assembles > > > > > > > > > > > > > > > > messages 3,4,5 into > > > > > > > > > > > > > > > > the original large message. But the changes > > > brought > > > > > > > > > > > > > > > > about by this PIP > > > > > > > > > > > > > > > > will cause the consumer to use messages > 1,2,5 for > > > > > > > > > > > > > > > > assembly. There is > > > > > > > > > > > > > > > > no guarantee that the producer will split the > > > message > > > > > > > > > > > > > > > > in the same way > > > > > > > > > > > > > > > > twice before and after. For example, the > > > producer's > > > > > > > > > > > > > > > > maxMessageSize may > > > > > > > > > > > > > > > > be different. This may cause the consumer to > > > > > > > > > > > > > > > > receive a corrupt > > > > > > > > > > > > > > > > message. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Good point. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks > > > > > > > > > > > > > > > Yubiao Feng > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Wed, Aug 23, 2023 at 12:34 PM Zike Yang < > > > > > > > z...@apache.org> > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hi, xiangying, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks for your PIP. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > IIUC, this may change the existing behavior > and > > > may > > > > > > > introduce > > > > > > > > > > > > > > > > inconsistencies. > > > > > > > > > > > > > > > > Suppose that we have a large message with 3 > > > chunks. > > > > > But > > > > > > > the > > > > > > > > > producer > > > > > > > > > > > > > > > > crashes and resends the message after > sending the > > > > > > > chunk-1. > > > > > > > > > It will > > > > > > > > > > > > > > > > send a total of 5 messages to the Pulsar > topic: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 1. SequenceID: 0, ChunkID: 0 > > > > > > > > > > > > > > > > 2. SequenceID: 0, ChunkID: 1 > > > > > > > > > > > > > > > > 3. SequenceID: 0, ChunkID: 0 -> This > message > > > will > > > > > be > > > > > > > > > dropped > > > > > > > > > > > > > > > > 4. SequenceID: 0, ChunkID: 1 -> Will also > be > > > > > dropped > > > > > > > > > > > > > > > > 5. SequenceID: 0, ChunkID: 2 -> The last > > > chunk of > > > > > the > > > > > > > > > message > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > For the existing behavior, the consumer > assembles > > > > > > > messages > > > > > > > > > 3,4,5 into > > > > > > > > > > > > > > > > the original large message. But the changes > > > brought > > > > > > > about by > > > > > > > > > this PIP > > > > > > > > > > > > > > > > will cause the consumer to use messages > 1,2,5 for > > > > > > > assembly. > > > > > > > > > There is > > > > > > > > > > > > > > > > no guarantee that the producer will split the > > > > > message in > > > > > > > the > > > > > > > > > same way > > > > > > > > > > > > > > > > twice before and after. For example, the > > > producer's > > > > > > > > > maxMessageSize may > > > > > > > > > > > > > > > > be different. This may cause the consumer to > > > receive > > > > > a > > > > > > > > > corrupt > > > > > > > > > > > > > > > > message. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Also, this PIP increases the complexity of > > > handling > > > > > > > chunks > > > > > > > > > on the > > > > > > > > > > > > > > > > broker side. Brokers should, in general, > treat > > > the > > > > > chunk > > > > > > > as > > > > > > > > > a normal > > > > > > > > > > > > > > > > message. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I think a simple better approach is to only > > > check the > > > > > > > > > deduplication > > > > > > > > > > > > > > > > for the last chunk of the large message. The > > > consumer > > > > > > > only > > > > > > > > > gets the > > > > > > > > > > > > > > > > whole message after receiving the last > chunk. We > > > > > don't > > > > > > > need > > > > > > > > > to check > > > > > > > > > > > > > > > > the deduplication for all previous chunks. > Also > > > by > > > > > doing > > > > > > > > > this we only > > > > > > > > > > > > > > > > need bug fixes, we don't need to introduce a > new > > > PIP. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > BR, > > > > > > > > > > > > > > > > Zike Yang > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Fri, Aug 18, 2023 at 7:54 PM Xiangying > Meng < > > > > > > > > > xiangy...@apache.org> > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Dear Community, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I hope this email finds you well. I'd like > to > > > > > address > > > > > > > an > > > > > > > > > important > > > > > > > > > > > > > > > > > issue related to Apache Pulsar and discuss > a > > > > > solution > > > > > > > I've > > > > > > > > > proposed on > > > > > > > > > > > > > > > > > GitHub. The problem pertains to the > handling of > > > > > Chunk > > > > > > > > > Messages after > > > > > > > > > > > > > > > > > enabling deduplication. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > In the current version of Apache Pulsar, > all > > > > > chunks of > > > > > > > a > > > > > > > > > Chunk Message > > > > > > > > > > > > > > > > > share the same sequence ID. However, > enabling > > > the > > > > > > > > > depublication > > > > > > > > > > > > > > > > > feature results in an inability to send > Chunk > > > > > > > Messages. To > > > > > > > > > tackle this > > > > > > > > > > > > > > > > > problem, I've proposed a solution [1] that > > > ensures > > > > > > > > > messages are not > > > > > > > > > > > > > > > > > duplicated throughout end-to-end delivery. > > > While > > > > > this > > > > > > > fix > > > > > > > > > addresses > > > > > > > > > > > > > > > > > the duplication issue for end-to-end > messages, > > > > > there > > > > > > > > > remains a > > > > > > > > > > > > > > > > > possibility of duplicate chunks within > topics. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > To address this concern, I believe we > should > > > > > introduce > > > > > > > a > > > > > > > > > "Chunk ID > > > > > > > > > > > > > > > > > map" at the Broker level, similar to the > > > existing > > > > > > > > > "sequence ID map", > > > > > > > > > > > > > > > > > to facilitate effective filtering. However, > > > > > > > implementing > > > > > > > > > this has led > > > > > > > > > > > > > > > > > to a challenge: a producer requires storage > > > for two > > > > > > > Long > > > > > > > > > values > > > > > > > > > > > > > > > > > simultaneously (sequence ID and chunk ID). > > > Because > > > > > the > > > > > > > > > snapshot of the > > > > > > > > > > > > > > > > > sequence ID map is stored through the > > > properties > > > > > of the > > > > > > > > > cursor > > > > > > > > > > > > > > > > > (Map<String, Long>), so in order to > satisfy the > > > > > > > storage of > > > > > > > > > two Longs > > > > > > > > > > > > > > > > > (sequence ID, chunk ID) corresponding to > one > > > > > producer, > > > > > > > we > > > > > > > > > hope to add > > > > > > > > > > > > > > > > > a mark DeleteProperties (Map<String, Long>) > > > String, > > > > > > > > > String>) to > > > > > > > > > > > > > > > > > replace the properties (Map<String, Long>) > > > field. > > > > > To > > > > > > > > > resolve this, > > > > > > > > > > > > > > > > > I've proposed an alternative proposal [2] > > > > > involving the > > > > > > > > > introduction > > > > > > > > > > > > > > > > > of a "mark DeleteProperties" (Map<String, > > > String>) > > > > > to > > > > > > > > > replace the > > > > > > > > > > > > > > > > > current properties (Map<String, Long>) > field. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I'd appreciate it if you carefully review > both > > > PRs > > > > > and > > > > > > > > > share your > > > > > > > > > > > > > > > > > valuable feedback and insights. Thank you > > > > > immensely for > > > > > > > > > your time and > > > > > > > > > > > > > > > > > attention. I eagerly anticipate your > valuable > > > > > opinions > > > > > > > and > > > > > > > > > > > > > > > > > recommendations. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Warm regards, > > > > > > > > > > > > > > > > > Xiangying > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > [1] > > > https://github.com/apache/pulsar/pull/20948 > > > > > > > > > > > > > > > > > [2] > > > https://github.com/apache/pulsar/pull/21027 > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >