Don't we still need the broker dedup logic for the above case?

Then, probably brokers need to track the following.
Map<Producer, LastSeqId>
// additionally track
Map<Producer, ChunkingContext>
ChunkingContext{
uuid,
numChunks,
lastChunkId
}

The chunked msg dedup logic might be like:

For any incoming chunked msg, m :
  If m.currentSeqid < LastSeqId, dedup.
  If m.currentSeqid == LastSeqId,
    1. if m.uuid is different from ctx.uuid and m.currentChunkId=0, then
this is a new chunking context, no dedup, and reset the context (previously
persisted incomplete chunks need to be ignored on the consumers)
    2. if ctx.numChunks -1 = ctx.lastChunkId, the chunked msg already
persisted. So, dedup
    3. if m.currentChunkId <= ctx.lastChunkId , dedup.




On Fri, Aug 25, 2023 at 7:56 PM Heesung Sohn <heesung.s...@streamnative.io>
wrote:

> Actually, can we think about this case too?
>
> What happens if the cx sends the same chunked msg with the same seq id
> when dedup is enabled?
>
> // user send a chunked msg, m1
> s1, c0
> s1, c1
> s1, c2 // complete
>
> // user resend the duplicate msg, m2
> s1, c0
> s1, c1
> s1, c2 //complete
>
> Do consumers receive m1 and m2(no dedup)?
>
>
>
> On Fri, Aug 25, 2023 at 6:55 PM Xiangying Meng <xiangy...@apache.org>
> wrote:
>
>> Hi Heesung,
>>
>> >I think this means, for the PIP, the broker side's chunk deduplication.
>> >I think brokers probably need to track map<uuid, last_chunk_id> to dedup
>>
>> What is the significance of doing this?
>> My understanding is that if the client crashes and restarts after
>> sending half of a chunk message and then it resends the previous chunk
>> message, the resent chunk message should be treated as a new message
>> since it calls the producer's API again.
>> If deduplication is enabled, users should ensure that their customized
>> sequence ID is not lower than the previous sequence ID.
>> I have considered this scenario and added a warning log in PR[0]. (I'm
>> not sure whether an error log should be added or an exception thrown.)
>> If deduplication is not enabled, on the consumer side, there should be
>> an incomplete chunk message received alongside another complete chunk
>> message, each with a different UUID, and they will not interfere with
>> each other.
>>
>> My main point is that every message sent using
>> `producer.newMessage().send()` should be treated as a new message.
>> UUID is solely used for the consumer side to identify different chunk
>> messages.
>>
>> BR
>> Xiangying
>>
>> [0] https://github.com/apache/pulsar/pull/21047
>>
>> On Sat, Aug 26, 2023 at 9:34 AM Heesung Sohn
>> <heesung.s...@streamnative.io.invalid> wrote:
>> >
>> > I think this means, for the PIP, the broker side's chunk deduplication.
>> > I think brokers probably need to track map<uuid, last_chunk_id> to dedup
>> > chunks on the broker side.
>> >
>> >
>> >
>> >
>> > On Fri, Aug 25, 2023 at 6:16 PM Xiangying Meng <xiangy...@apache.org>
>> wrote:
>> >
>> > > Hi Heesung
>> > >
>> > > It is a good point.
>> > > Assume the producer application jvm restarts in the middle of
>> chunking and
>> > > resends the message chunks from the beginning with the previous
>> sequence
>> > > id.
>> > >
>> > > For the previous version, it should be:
>> > >
>> > > Producer send:
>> > > 1. SequenceID: 0, ChunkID: 0
>> > > 2. SequenceID: 0, ChunkID: 1
>> > > 3. SequenceID: 0, ChunkID: 0
>> > > 4. SequenceID: 0, ChunkID: 1
>> > > 5. SequenceID: 0, ChunkID: 2
>> > >
>> > > Consumer receive:
>> > > 1. SequenceID: 0, ChunkID: 0
>> > > 2. SequenceID: 0, ChunkID: 1
>> > > 3. SequenceID: 0, ChunkID: 0 // chunk ID out of order. Release this
>> > > chunk and recycle its `chunkedMsgCtx`.
>> > > 4. SequenceID: 0, ChunkID: 1  // chunkedMsgCtx == null Release it.
>> > > 5. SequenceID: 0, ChunkID: 2  // chunkedMsgCtx == null Release it.
>> > >
>> > > Therefore, for the previous version, this chunk message can not be
>> > > received by the consumer. It is not an incompatibility issue.
>> > >
>> > > However, the solution of optimizing the `uuid` is valuable to the new
>> > > implementation.
>> > > I will modify this in the PR[0]. Thank you very much for your reminder
>> > > and the provided UUID optimization solution.
>> > >
>> > > BR,
>> > > Xiangying
>> > >
>> > > [0] https://github.com/apache/pulsar/pull/20948
>> > >
>> > > On Sat, Aug 26, 2023 at 8:48 AM Heesung Sohn
>> > > <heesung.s...@streamnative.io.invalid> wrote:
>> > > >
>> > > > Hi, I meant
>> > > >
>> > > > What if the producer application jvm restarts in the middle of
>> chunking
>> > > and
>> > > > resends the message chunks from the beginning with the previous
>> sequence
>> > > id?
>> > > >
>> > > >
>> > > >
>> > > > On Fri, Aug 25, 2023 at 5:15 PM Xiangying Meng <
>> xiangy...@apache.org>
>> > > wrote:
>> > > >
>> > > > > Hi Heesung
>> > > > >
>> > > > > It is a good idea to cover this incompatibility case if the
>> producer
>> > > > > splits the chunk message again when retrying.
>> > > > >
>> > > > > But in fact, the producer only resents the chunks that are
>> assembled
>> > > > > to `OpSendMsg` instead of splitting the chunk message again.
>> > > > > So, there is no incompatibility issue of resenting the chunk
>> message
>> > > > > by splitting the chunk message again.
>> > > > >
>> > > > > The logic of sending chunk messages can be found here:
>> > > > >
>> > > > >
>> > >
>> https://github.com/apache/pulsar/blob/e0c481e5f8d7fa5534d3327785928a234376789e/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L533
>> > > > >
>> > > > > The logic of resending the message can be found here:
>> > > > >
>> > > > >
>> > >
>> https://github.com/apache/pulsar/blob/e0c481e5f8d7fa5534d3327785928a234376789e/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L1892
>> > > > >
>> > > > > BR,
>> > > > > Xiangying
>> > > > >
>> > > > >
>> > > > >
>> > > > >
>> > > > >
>> > > > >
>> > > > >
>> > > > > On Sat, Aug 26, 2023 at 2:24 AM Heesung Sohn
>> > > > > <heesung.s...@streamnative.io.invalid> wrote:
>> > > > > >
>> > > > > > >> I think brokers can track the last chunkMaxMessageSize for
>> each
>> > > > > producer.
>> > > > > >
>> > > > > > > Using different chunkMaxMessageSize is just one of the
>> aspects. In
>> > > > > > PIP-132 [0], we have included the message metadata size when
>> checking
>> > > > > > maxMessageSize.The message metadata can be changed after
>> splitting
>> > > the
>> > > > > > chunks. We are still uncertain about the way the chunked
>> message is
>> > > > > split,
>> > > > > > even using the same ss chunkMaxMessageSize.
>> > > > > >
>> > > > > > This sounds like we need to revisit chunking uuid logic.
>> > > > > > Like I commented here,
>> > > > > > https://github.com/apache/pulsar/pull/20948/files#r1305997883
>> > > > > > Why don't we add a chunk session id suffix to identify the
>> ongoing
>> > > > > chunking
>> > > > > > uniquely?
>> > > > > >
>> > > > > > Currently,
>> > > > > >
>> > > > > > chunking uuid = producer + sequence_id
>> > > > > >
>> > > > > > Proposal
>> > > > > > chunking  uuid = producer + sequence_id + chunkingSessionId
>> > > > > >
>> > > > > > * chunkingSessionId could be a timestamp when the chunking
>> started.
>> > > > > >
>> > > > > >
>> > > > > >
>> > > > > > On Fri, Aug 25, 2023 at 6:02 AM Xiangying Meng <
>> xiangy...@apache.org
>> > > >
>> > > > > wrote:
>> > > > > >
>> > > > > > > Hi Zike,
>> > > > > > >
>> > > > > > > >How would this happen to get two duplicated and consecutive
>> > > ChunkID-1
>> > > > > > > >messages? The producer should guarantee to retry the whole
>> chunked
>> > > > > > > >messages instead of some parts of the chunks.
>> > > > > > >
>> > > > > > > If the producer guarantees to retry the whole chunked messages
>> > > instead
>> > > > > > > of some parts of the chunks,
>> > > > > > > Why doesn't the bug of the producer retry chunk messages in
>> the PR
>> > > [0]
>> > > > > > > appear?
>> > > > > > > And why do you need to set `chunkId` in `op.rePopulate`?
>> > > > > > > It will be rested when split chunk messages again if the
>> producer
>> > > > > > > guarantees to retry the whole chunked messages.
>> > > > > > > ```
>> > > > > > > final MessageMetadata finalMsgMetadata = msgMetadata;
>> > > > > > > op.rePopulate = () -> {
>> > > > > > > if (msgMetadata.hasChunkId()) {
>> > > > > > > // The message metadata is shared between all chunks in a
>> large
>> > > message
>> > > > > > > // We need to reset the chunk id for each call of this method
>> > > > > > > // It's safe to do that because there is only 1 thread to
>> > > manipulate
>> > > > > > > this message metadata
>> > > > > > > finalMsgMetadata.setChunkId(chunkId);
>> > > > > > > }
>> > > > > > > op.cmd = sendMessage(producerId, sequenceId, numMessages,
>> > > messageId,
>> > > > > > > finalMsgMetadata,
>> > > > > > > encryptedPayload);
>> > > > > > > };
>> > > > > > >
>> > > > > > > ```
>> > > > > > >
>> > > > > > > >> But chunks 1, 2, 3, and 4 are still persisted in the topic.
>> > > > > > > >
>> > > > > > > >I think it's OK to persist them all on the topic. Is there
>> any
>> > > issue
>> > > > > > > >with doing that?
>> > > > > > >
>> > > > > > > This is just one scenario. Whether only check the sequence ID
>> of
>> > > the
>> > > > > > > first chunk (as I used in PR[1]) or check the sequence ID of
>> the
>> > > last
>> > > > > > > chunk (as you suggested), in reality, neither of these
>> methods can
>> > > > > > > deduplicate chunks on the broker side because the broker
>> cannot
>> > > know
>> > > > > > > the chunk ID of the previous message.
>> > > > > > >
>> > > > > > > However, if combined with the optimization of consumer-side
>> logic,
>> > > > > > > end-to-end deduplication can be completed.
>> > > > > > > This is also a less-than-perfect solution I mentioned in my
>> first
>> > > > > > > email and implemented in PR[1].
>> > > > > > >
>> > > > > > > The reason I propose this proposal is not to solve the
>> end-to-end
>> > > > > > > deduplication of chunk messages between producers and
>> consumers.
>> > > That
>> > > > > > > aspect has essentially been addressed in PR[1] and is still
>> > > undergoing
>> > > > > > > review.
>> > > > > > >
>> > > > > > > This proposal aims to ensure that no corrupt data exists
>> within the
>> > > > > > > topic, as our data might be offloaded and used elsewhere in
>> > > scenarios
>> > > > > > > where consumer logic is not optimized.
>> > > > > > >
>> > > > > > > BR,
>> > > > > > > Xiangying
>> > > > > > >
>> > > > > > > [0] https://github.com/apache/pulsar/pull/21048
>> > > > > > > [1] https://github.com/apache/pulsar/pull/20948
>> > > > > > >
>> > > > > > > On Fri, Aug 25, 2023 at 5:18 PM Zike Yang <z...@apache.org>
>> wrote:
>> > > > > > > >
>> > > > > > > > HI xiangying
>> > > > > > > >
>> > > > > > > > > The rewind operation is seen in the test log.
>> > > > > > > >
>> > > > > > > > That seems weird. Not sure if this rewind is related to the
>> chunk
>> > > > > > > consuming.
>> > > > > > > >
>> > > > > > > > > 1. SequenceID: 0, ChunkID: 0
>> > > > > > > > 2. SequenceID: 0, ChunkID: 1
>> > > > > > > > 3. SequenceID: 0, ChunkID: 1
>> > > > > > > > 4. SequenceID: 0, ChunkID: 2
>> > > > > > > > Such four chunks cannot be processed correctly by the
>> consumer.
>> > > > > > > >
>> > > > > > > > How would this happen to get two duplicated and consecutive
>> > > ChunkID-1
>> > > > > > > > messages? The producer should guarantee to retry the whole
>> > > chunked
>> > > > > > > > messages instead of some parts of the chunks.
>> > > > > > > >
>> > > > > > > > > But chunks 1, 2, 3, and 4 are still persisted in the
>> topic.
>> > > > > > > >
>> > > > > > > > I think it's OK to persist them all in the topic. Is there
>> any
>> > > issue
>> > > > > > > > with doing that?
>> > > > > > > >
>> > > > > > > > > There is another point. The resend of the chunk message
>> has a
>> > > bug
>> > > > > that
>> > > > > > > > I shared with you, and you fixed in PR [0]. It will make
>> this
>> > > case
>> > > > > > > > happen in another way.
>> > > > > > > >
>> > > > > > > > If the user sets the sequence ID manually, the case could be
>> > > > > reproduced.
>> > > > > > > >
>> > > > > > > > BR,
>> > > > > > > > Zike Yang
>> > > > > > > >
>> > > > > > > > On Thu, Aug 24, 2023 at 8:48 PM Xiangying Meng <
>> > > xiangy...@apache.org
>> > > > > >
>> > > > > > > wrote:
>> > > > > > > > >
>> > > > > > > > > >IIUC, this may change the existing behavior and may
>> introduce
>> > > > > > > inconsistencies.
>> > > > > > > > > >Suppose that we have a large message with 3 chunks. But
>> the
>> > > > > producer
>> > > > > > > > > >crashes and resends the message after sending the
>> chunk-1. It
>> > > will
>> > > > > > > > > >send a total of 5 messages to the Pulsar topic:
>> > > > > > > > > >
>> > > > > > > > > >1. SequenceID: 0, ChunkID: 0
>> > > > > > > > > >2. SequenceID: 0, ChunkID: 1
>> > > > > > > > > >3. SequenceID: 0, ChunkID: 0   -> This message will be
>> dropped
>> > > > > > > > > >4. SequenceID: 0, ChunkID: 1    -> Will also be dropped
>> > > > > > > > > >5. SequenceID: 0, ChunkID: 2    -> The last chunk of the
>> > > message
>> > > > > > > > >
>> > > > > > > > > Hi Zike
>> > > > > > > > > There is another point. The resend of the chunk message
>> has a
>> > > bug
>> > > > > that
>> > > > > > > > > I shared with you, and you fixed in PR [0]. It will make
>> this
>> > > case
>> > > > > > > > > happen in another way.
>> > > > > > > > > Sample description for the  bug:
>> > > > > > > > > Because the chunk message uses the same message metadata,
>> if
>> > > the
>> > > > > chunk
>> > > > > > > > > is not sent out immediately. Then, when resending, all
>> chunks
>> > > of
>> > > > > the
>> > > > > > > > > same chunk message use the chunk ID of the last chunk.
>> > > > > > > > > In this case, It should happen as:
>> > > > > > > > > 1. SequenceID: 0, ChunkID: 0 (Put op1 into
>> `pendingMessages`
>> > > and
>> > > > > send)
>> > > > > > > > > 2. SequenceID: 0, ChunkID: 1 (Put op2 into
>> `pendingMessages`
>> > > and
>> > > > > send)
>> > > > > > > > > 3. SequenceID: 0, ChunkID: 2   -> (Put op3 into
>> > > `pendingMessages`)
>> > > > > > > > > 4. SequenceID: 0, ChunkID: 2   -> (Resend op1)
>> > > > > > > > > 5. SequenceID: 0, ChunkID: 2   -> (Resend op2)
>> > > > > > > > > 6. SequenceID: 0, ChunkID: 2   -> (Send op3)
>> > > > > > > > >
>> > > > > > > > >
>> > > > > > > > > BR,
>> > > > > > > > > Xiangying
>> > > > > > > > >
>> > > > > > > > > [0] - https://github.com/apache/pulsar/pull/21048
>> > > > > > > > >
>> > > > > > > > > On Thu, Aug 24, 2023 at 8:09 PM Xiangying Meng <
>> > > > > xiangy...@apache.org>
>> > > > > > > wrote:
>> > > > > > > > > >
>> > > > > > > > > > >> This solution also cannot solve the out-of-order
>> messages
>> > > > > inside
>> > > > > > > the
>> > > > > > > > > > >>chunks. For example, the above five messages will
>> still be
>> > > > > > > persisted.
>> > > > > > > > > > >The consumer already handles this case. The above 5
>> messages
>> > > > > will
>> > > > > > > all
>> > > > > > > > > > >be persisted but the consumer will skip message 1 and
>> 2.
>> > > > > > > > > > >For messages 3, 4, and 5. The producer can guarantee
>> these
>> > > > > chunks
>> > > > > > > are in order.
>> > > > > > > > > >
>> > > > > > > > > > The rewind operation is seen in the test log. Every
>> time an
>> > > > > incorrect
>> > > > > > > > > > chunk message is received, it will rewind, and the code
>> has
>> > > yet
>> > > > > to be
>> > > > > > > > > > studied in depth.
>> > > > > > > > > > If it does not call rewind, then this case is
>> considered a
>> > > > > workable
>> > > > > > > > > > case. Let's look at another case.
>> > > > > > > > > > 1. SequenceID: 0, ChunkID: 0
>> > > > > > > > > > 2. SequenceID: 0, ChunkID: 1
>> > > > > > > > > > 3. SequenceID: 0, ChunkID: 1
>> > > > > > > > > > 4. SequenceID: 0, ChunkID: 2
>> > > > > > > > > > Such four chunks cannot be processed correctly by the
>> > > consumer.
>> > > > > > > > > >
>> > > > > > > > > > In fact, this solution is my original idea. The PR I
>> > > mentioned
>> > > > > in the
>> > > > > > > > > > first email above uses a similar solution and modifies
>> the
>> > > logic
>> > > > > on
>> > > > > > > > > > the consumer side.
>> > > > > > > > > > Also, as I mentioned in the first email, this solution
>> can
>> > > only
>> > > > > solve
>> > > > > > > > > > the problem of end-to-end duplication. But chunks 1, 2,
>> 3,
>> > > and 4
>> > > > > are
>> > > > > > > > > > still persisted in the topic.
>> > > > > > > > > >
>> > > > > > > > > > On Thu, Aug 24, 2023 at 3:00 PM Zike Yang <
>> z...@apache.org>
>> > > > > wrote:
>> > > > > > > > > > >
>> > > > > > > > > > > Hi Heesung,
>> > > > > > > > > > >
>> > > > > > > > > > > > I believe in this PIP "similar to the existing
>> "sequence
>> > > ID
>> > > > > map",
>> > > > > > > > > > > to facilitate effective filtering" actually means
>> tracking
>> > > the
>> > > > > last
>> > > > > > > > > > > chunkId(not all chunk ids) on the broker side.
>> > > > > > > > > > >
>> > > > > > > > > > > With this simple solution, I think we don't need to
>> track
>> > > the
>> > > > > > > > > > > (sequenceID, chunkID) on the broker side at all. The
>> broker
>> > > > > just
>> > > > > > > needs
>> > > > > > > > > > > to apply the deduplication logic to the last chunk
>> instead
>> > > of
>> > > > > all
>> > > > > > > > > > > previous chunks. This PIP actually could do that, but
>> it
>> > > will
>> > > > > > > > > > > introduce a new data format and compatibility issue.
>> > > > > > > > > > >
>> > > > > > > > > > > > This is still a behavior change(deduping chunk
>> messages
>> > > on
>> > > > > the
>> > > > > > > broker),
>> > > > > > > > > > > and I believe we need to discuss this addition as a
>> PIP.
>> > > > > > > > > > >
>> > > > > > > > > > > Actually, we didn't specifically state the deduping
>> chunk
>> > > > > message
>> > > > > > > > > > > behavior before. The chunked message should be equally
>> > > > > applicable
>> > > > > > > to
>> > > > > > > > > > > the de-duplication logic as a regular message.
>> Therefore, I
>> > > > > think
>> > > > > > > it
>> > > > > > > > > > > should be considered as a bug fix. But if this FIX is
>> worth
>> > > > > > > discussing
>> > > > > > > > > > > in depth. I have no objection to it being a new PIP.
>> > > > > > > > > > >
>> > > > > > > > > > > > I think brokers can track the last
>> chunkMaxMessageSize
>> > > for
>> > > > > > > > > > > each producer.
>> > > > > > > > > > >
>> > > > > > > > > > > Using different chunkMaxMessageSize is just one of the
>> > > > > aspects. In
>> > > > > > > > > > > PIP-132 [0], we have included the message metadata
>> size
>> > > when
>> > > > > > > checking
>> > > > > > > > > > > maxMessageSize.
>> > > > > > > > > > > The message metadata can be changed after splitting
>> the
>> > > > > chunks. We
>> > > > > > > are
>> > > > > > > > > > > still uncertain about the way the chunked message is
>> split,
>> > > > > even
>> > > > > > > using
>> > > > > > > > > > > the same ss chunkMaxMessageSize.
>> > > > > > > > > > >
>> > > > > > > > > > > > then the brokers can assume that the producer is
>> > > resending
>> > > > > the
>> > > > > > > chunks from
>> > > > > > > > > > > the beginning with a different scheme(restarted with a
>> > > > > different
>> > > > > > > > > > > chunkMaxMessageSize) and accept those new chunks from
>> the
>> > > > > > > beginning.
>> > > > > > > > > > >
>> > > > > > > > > > > Regarding this, it seems like we are implementing
>> dynamic
>> > > > > > > > > > > configuration for the chunkMaxMessageSize. I'm afraid
>> that
>> > > this
>> > > > > > > would
>> > > > > > > > > > > change the expected behavior and introduce more
>> complexity
>> > > to
>> > > > > this
>> > > > > > > > > > > configuration.
>> > > > > > > > > > >
>> > > > > > > > > > >
>> > > > > > > > > > > [0] https://github.com/apache/pulsar/pull/14007
>> > > > > > > > > > >
>> > > > > > > > > > > BR,
>> > > > > > > > > > > Zike Yang
>> > > > > > > > > > >
>> > > > > > > > > > > On Thu, Aug 24, 2023 at 2:21 PM Zike Yang <
>> z...@apache.org
>> > > >
>> > > > > wrote:
>> > > > > > > > > > > >
>> > > > > > > > > > > > Hi, xiangying
>> > > > > > > > > > > >
>> > > > > > > > > > > > > it will find that the message
>> > > > > > > > > > > > is out of order and rewind the cursor. Loop this
>> > > operation,
>> > > > > and
>> > > > > > > > > > > > discard this message after it expires instead of
>> > > assembling
>> > > > > 3,
>> > > > > > > 4, 5
>> > > > > > > > > > > > into a message.
>> > > > > > > > > > > >
>> > > > > > > > > > > > Could you point out where the implementation for
>> this?
>> > > From
>> > > > > my
>> > > > > > > > > > > > understanding, there should not be any rewind
>> operation
>> > > for
>> > > > > the
>> > > > > > > > > > > > chunking feature. You can check more detail here:
>> > > > > > > > > > > >
>> > > > > > >
>> > > > >
>> > >
>> https://streamnative.io/blog/deep-dive-into-message-chunking-in-pulsar#how-message-chunking-is-implemented
>> > > > > > > > > > > >
>> > > > > > > > > > > > > This solution also cannot solve the out-of-order
>> > > messages
>> > > > > > > inside the
>> > > > > > > > > > > > chunks. For example, the above five messages will
>> still
>> > > be
>> > > > > > > persisted.
>> > > > > > > > > > > >
>> > > > > > > > > > > > The consumer already handles this case. The above 5
>> > > messages
>> > > > > > > will all
>> > > > > > > > > > > > be persisted but the consumer will skip message 1
>> and 2.
>> > > > > > > > > > > > For messages 3, 4, and 5. The producer can guarantee
>> > > these
>> > > > > > > chunks are in order.
>> > > > > > > > > > > >
>> > > > > > > > > > > > BR,
>> > > > > > > > > > > > Zike Yang
>> > > > > > > > > > > >
>> > > > > > > > > > > > On Thu, Aug 24, 2023 at 11:48 AM Yubiao Feng
>> > > > > > > > > > > > <yubiao.f...@streamnative.io.invalid> wrote:
>> > > > > > > > > > > > >
>> > > > > > > > > > > > > > 1. SequenceID: 0, ChunkID: 0
>> > > > > > > > > > > > > > 2. SequenceID: 0, ChunkID: 1
>> > > > > > > > > > > > > > 3. SequenceID: 0, ChunkID: 0
>> > > > > > > > > > > > > > 4. SequenceID: 0, ChunkID: 1
>> > > > > > > > > > > > > > 5. SequenceID: 0, ChunkID: 2
>> > > > > > > > > > > > > > For the existing behavior, the consumer
>> assembles
>> > > > > > > > > > > > > > messages 3,4,5 into
>> > > > > > > > > > > > > > the original large message. But the changes
>> brought
>> > > > > > > > > > > > > > about by this PIP
>> > > > > > > > > > > > > > will cause the consumer to use messages 1,2,5
>> for
>> > > > > > > > > > > > > > assembly. There is
>> > > > > > > > > > > > > > no guarantee that the producer will split the
>> message
>> > > > > > > > > > > > > > in the same way
>> > > > > > > > > > > > > > twice before and after. For example, the
>> producer's
>> > > > > > > > > > > > > > maxMessageSize may
>> > > > > > > > > > > > > > be different. This may cause the consumer to
>> > > > > > > > > > > > > > receive a corrupt
>> > > > > > > > > > > > > > message.
>> > > > > > > > > > > > >
>> > > > > > > > > > > > > Good point.
>> > > > > > > > > > > > >
>> > > > > > > > > > > > >
>> > > > > > > > > > > > > Thanks
>> > > > > > > > > > > > > Yubiao Feng
>> > > > > > > > > > > > >
>> > > > > > > > > > > > > On Wed, Aug 23, 2023 at 12:34 PM Zike Yang <
>> > > > > z...@apache.org>
>> > > > > > > wrote:
>> > > > > > > > > > > > >
>> > > > > > > > > > > > > > Hi, xiangying,
>> > > > > > > > > > > > > >
>> > > > > > > > > > > > > > Thanks for your PIP.
>> > > > > > > > > > > > > >
>> > > > > > > > > > > > > > IIUC, this may change the existing behavior and
>> may
>> > > > > introduce
>> > > > > > > > > > > > > > inconsistencies.
>> > > > > > > > > > > > > > Suppose that we have a large message with 3
>> chunks.
>> > > But
>> > > > > the
>> > > > > > > producer
>> > > > > > > > > > > > > > crashes and resends the message after sending
>> the
>> > > > > chunk-1.
>> > > > > > > It will
>> > > > > > > > > > > > > > send a total of 5 messages to the Pulsar topic:
>> > > > > > > > > > > > > >
>> > > > > > > > > > > > > > 1. SequenceID: 0, ChunkID: 0
>> > > > > > > > > > > > > > 2. SequenceID: 0, ChunkID: 1
>> > > > > > > > > > > > > > 3. SequenceID: 0, ChunkID: 0   -> This message
>> will
>> > > be
>> > > > > > > dropped
>> > > > > > > > > > > > > > 4. SequenceID: 0, ChunkID: 1    -> Will also be
>> > > dropped
>> > > > > > > > > > > > > > 5. SequenceID: 0, ChunkID: 2    -> The last
>> chunk of
>> > > the
>> > > > > > > message
>> > > > > > > > > > > > > >
>> > > > > > > > > > > > > > For the existing behavior, the consumer
>> assembles
>> > > > > messages
>> > > > > > > 3,4,5 into
>> > > > > > > > > > > > > > the original large message. But the changes
>> brought
>> > > > > about by
>> > > > > > > this PIP
>> > > > > > > > > > > > > > will cause the consumer to use messages 1,2,5
>> for
>> > > > > assembly.
>> > > > > > > There is
>> > > > > > > > > > > > > > no guarantee that the producer will split the
>> > > message in
>> > > > > the
>> > > > > > > same way
>> > > > > > > > > > > > > > twice before and after. For example, the
>> producer's
>> > > > > > > maxMessageSize may
>> > > > > > > > > > > > > > be different. This may cause the consumer to
>> receive
>> > > a
>> > > > > > > corrupt
>> > > > > > > > > > > > > > message.
>> > > > > > > > > > > > > >
>> > > > > > > > > > > > > > Also, this PIP increases the complexity of
>> handling
>> > > > > chunks
>> > > > > > > on the
>> > > > > > > > > > > > > > broker side. Brokers should, in general, treat
>> the
>> > > chunk
>> > > > > as
>> > > > > > > a normal
>> > > > > > > > > > > > > > message.
>> > > > > > > > > > > > > >
>> > > > > > > > > > > > > > I think a simple better approach is to only
>> check the
>> > > > > > > deduplication
>> > > > > > > > > > > > > > for the last chunk of the large message. The
>> consumer
>> > > > > only
>> > > > > > > gets the
>> > > > > > > > > > > > > > whole message after receiving the last chunk. We
>> > > don't
>> > > > > need
>> > > > > > > to check
>> > > > > > > > > > > > > > the deduplication for all previous chunks. Also
>> by
>> > > doing
>> > > > > > > this we only
>> > > > > > > > > > > > > > need bug fixes, we don't need to introduce a
>> new PIP.
>> > > > > > > > > > > > > >
>> > > > > > > > > > > > > > BR,
>> > > > > > > > > > > > > > Zike Yang
>> > > > > > > > > > > > > >
>> > > > > > > > > > > > > > On Fri, Aug 18, 2023 at 7:54 PM Xiangying Meng <
>> > > > > > > xiangy...@apache.org>
>> > > > > > > > > > > > > > wrote:
>> > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > Dear Community,
>> > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > I hope this email finds you well. I'd like to
>> > > address
>> > > > > an
>> > > > > > > important
>> > > > > > > > > > > > > > > issue related to Apache Pulsar and discuss a
>> > > solution
>> > > > > I've
>> > > > > > > proposed on
>> > > > > > > > > > > > > > > GitHub. The problem pertains to the handling
>> of
>> > > Chunk
>> > > > > > > Messages after
>> > > > > > > > > > > > > > > enabling deduplication.
>> > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > In the current version of Apache Pulsar, all
>> > > chunks of
>> > > > > a
>> > > > > > > Chunk Message
>> > > > > > > > > > > > > > > share the same sequence ID. However, enabling
>> the
>> > > > > > > depublication
>> > > > > > > > > > > > > > > feature results in an inability to send Chunk
>> > > > > Messages. To
>> > > > > > > tackle this
>> > > > > > > > > > > > > > > problem, I've proposed a solution [1] that
>> ensures
>> > > > > > > messages are not
>> > > > > > > > > > > > > > > duplicated throughout end-to-end delivery.
>> While
>> > > this
>> > > > > fix
>> > > > > > > addresses
>> > > > > > > > > > > > > > > the duplication issue for end-to-end messages,
>> > > there
>> > > > > > > remains a
>> > > > > > > > > > > > > > > possibility of duplicate chunks within topics.
>> > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > To address this concern, I believe we should
>> > > introduce
>> > > > > a
>> > > > > > > "Chunk ID
>> > > > > > > > > > > > > > > map" at the Broker level, similar to the
>> existing
>> > > > > > > "sequence ID map",
>> > > > > > > > > > > > > > > to facilitate effective filtering. However,
>> > > > > implementing
>> > > > > > > this has led
>> > > > > > > > > > > > > > > to a challenge: a producer requires storage
>> for two
>> > > > > Long
>> > > > > > > values
>> > > > > > > > > > > > > > > simultaneously (sequence ID and chunk ID).
>> Because
>> > > the
>> > > > > > > snapshot of the
>> > > > > > > > > > > > > > > sequence ID map is stored through the
>> properties
>> > > of the
>> > > > > > > cursor
>> > > > > > > > > > > > > > > (Map<String, Long>), so in order to satisfy
>> the
>> > > > > storage of
>> > > > > > > two Longs
>> > > > > > > > > > > > > > > (sequence ID, chunk ID) corresponding to one
>> > > producer,
>> > > > > we
>> > > > > > > hope to add
>> > > > > > > > > > > > > > > a mark DeleteProperties (Map<String, Long>)
>> String,
>> > > > > > > String>) to
>> > > > > > > > > > > > > > > replace the properties (Map<String, Long>)
>> field.
>> > > To
>> > > > > > > resolve this,
>> > > > > > > > > > > > > > > I've proposed an alternative proposal [2]
>> > > involving the
>> > > > > > > introduction
>> > > > > > > > > > > > > > > of a "mark DeleteProperties" (Map<String,
>> String>)
>> > > to
>> > > > > > > replace the
>> > > > > > > > > > > > > > > current properties (Map<String, Long>) field.
>> > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > I'd appreciate it if you carefully review
>> both PRs
>> > > and
>> > > > > > > share your
>> > > > > > > > > > > > > > > valuable feedback and insights. Thank you
>> > > immensely for
>> > > > > > > your time and
>> > > > > > > > > > > > > > > attention. I eagerly anticipate your valuable
>> > > opinions
>> > > > > and
>> > > > > > > > > > > > > > > recommendations.
>> > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > Warm regards,
>> > > > > > > > > > > > > > > Xiangying
>> > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > [1]
>> https://github.com/apache/pulsar/pull/20948
>> > > > > > > > > > > > > > > [2]
>> https://github.com/apache/pulsar/pull/21027
>> > > > > > > > > > > > > >
>> > > > > > >
>> > > > >
>> > >
>>
>

Reply via email to