Hi Xiangying,

Thanks for driving the proposal.
>From my understanding.
The message deduplication should only check the last chunk of the message.
It doesn't need to care about whether each chunk is duplicated.
The client side should handle issues like duplicated chunks.

For the example that you have discussed:

```
Producer sent:
1. SequenceID: 0, ChunkID: 0
2. SequenceID: 0, ChunkID: 1
3. SequenceID: 0, ChunkID: 0
4. SequenceID: 0, ChunkID: 1
5. SequenceID: 0, ChunkID: 2
```

The consumer should give up 1 and 2. And start to build the chunk message
from 3 to 5.
Because 1 and 2 belong to an incomplete chunk message.

For the deduplication. If the chunkId 2 is the last chunk of the message.
We should put it into the persistence map in the deduplication once it has
been persistent.
Any subsequent messages with the same sequence ID and producer name will be
treated as
duplicates, no matter whether the sequence ID is generated by the producer
or specified by users.

Regards,
Penghui

On Sat, Aug 26, 2023 at 5:55 PM Xiangying Meng <xiangy...@apache.org> wrote:

> Share more information: Even for versions before 3.0.0, the approach
> doesn't assemble chunks 3, 4, and 5 together.
>
> Please note this line of code:
>
> ```java
> chunkedMsgCtx = chunkedMessagesMap.computeIfAbsent(msgMetadata.getUuid(),
>                     (key) -> ChunkedMessageCtx.get(totalChunks,
> chunkedMsgBuffer));
> ```
>
> And the new solution we adopted in the PR [0] is to add a timestamp in
> the uuid. Thank Heesung for providing this idea again.
>
> [0]  https://github.com/apache/pulsar/pull/20948
>
>
> On Sat, Aug 26, 2023 at 5:20 PM Xiangying Meng <xiangy...@apache.org>
> wrote:
> >
> > Hi Zike,
> >
> > PR [0] has already fixed this bug and won't introduce compatibility
> issues.
> > PR [1] is unnecessary and can be closed. However, I still greatly
> > appreciate the information you provided.
> >
> > [0] https://github.com/apache/pulsar/pull/20948
> > [1] https://github.com/apache/pulsar/pull/21070
> >
> > On Sat, Aug 26, 2023 at 4:49 PM Zike Yang <z...@apache.org> wrote:
> > >
> > > > Hi Zike
> > > You can see the processMessageChunk method of the ConsumerImpl.
> > >
> > > Ah. That seems like a regression bug introduced by
> > > https://github.com/apache/pulsar/pull/18511. I have pushed a PR to fix
> > > it: https://github.com/apache/pulsar/pull/21070
> > >
> > > For the behavior before Pulsar 3.0.0. The consumer should assemble the
> > > message using 3,4,5.
> > >
> > > Thanks for pointing this out.
> > >
> > > BR,
> > > Zike Yang
> > >
> > > On Sat, Aug 26, 2023 at 3:58 PM Xiangying Meng <xiangy...@apache.org>
> wrote:
> > > >
> > > > >> Consumer receive:
> > > > >1. SequenceID: 0, ChunkID: 0
> > > > >2. SequenceID: 0, ChunkID: 1
> > > > >3. SequenceID: 0, ChunkID: 0 // chunk ID out of order. Release this
> > > > >chunk and recycle its `chunkedMsgCtx`.
> > > > >4. SequenceID: 0, ChunkID: 1  // chunkedMsgCtx == null Release it.
> > > > >5. SequenceID: 0, ChunkID: 2  // chunkedMsgCtx == null Release it.
> > > > >
> > > > >I think this case is wrong. For the current implementation, the
> > > > >message 3,4,5 will be assembled as a original large message.
> > > >
> > > > Hi Zike
> > > > You can see the processMessageChunk method of the ConsumerImpl.
> > > >
> > > > ```
> > > >
> > > > ChunkedMessageCtx chunkedMsgCtx =
> chunkedMessagesMap.get(msgMetadata.getUuid());
> > > >
> > > > if (msgMetadata.getChunkId() == 0 && chunkedMsgCtx == null) {
> > > >     //assemble a chunkedMsgCtx and put into
> > > > pendingChunkedMessageUuidQueue and chunkedMessagesMap.
> > > > }
> > > >
> > > > if (chunkedMsgCtx == null || chunkedMsgCtx.chunkedMsgBuffer == null
> > > >         || msgMetadata.getChunkId() !=
> > > > (chunkedMsgCtx.lastChunkedMessageId + 1)) {
> > > >     if (chunkedMsgCtx != null) {
> > > >         if (chunkedMsgCtx.chunkedMsgBuffer != null) {
> > > >
>  ReferenceCountUtil.safeRelease(chunkedMsgCtx.chunkedMsgBuffer);
> > > >         }
> > > >         chunkedMsgCtx.recycle();
> > > >     }
> > > >     chunkedMessagesMap.remove(msgMetadata.getUuid());
> > > >     compressedPayload.release();
> > > >     increaseAvailablePermits(cnx);
> > > > }
> > > >
> > > > ```
> > > >
> > > > On Sat, Aug 26, 2023 at 3:48 PM Zike Yang <z...@apache.org> wrote:
> > > > >
> > > > > > Consumer receive:
> > > > > 1. SequenceID: 0, ChunkID: 0
> > > > > 2. SequenceID: 0, ChunkID: 1
> > > > > 3. SequenceID: 0, ChunkID: 0 // chunk ID out of order. Release this
> > > > > chunk and recycle its `chunkedMsgCtx`.
> > > > > 4. SequenceID: 0, ChunkID: 1  // chunkedMsgCtx == null Release it.
> > > > > 5. SequenceID: 0, ChunkID: 2  // chunkedMsgCtx == null Release it.
> > > > >
> > > > > I think this case is wrong. For the current implementation, the
> > > > > message 3,4,5 will be assembled as a original large message.
> > > > >
> > > > > HI, Heesung
> > > > >
> > > > >
> > > > > > I think brokers probably need to track map<uuid, last_chunk_id>
> to dedup
> > > > >
> > > > > I propose a simpler solution in this mail thread earlier, which
> > > > > doesn't need to introduce map<uuid, last_chunk_id> :
> > > > >
> > > > > > I think a simple better approach is to only check the
> deduplication
> > > > > for the last chunk of the large message. The consumer only gets the
> > > > > whole message after receiving the last chunk. We don't need to
> check
> > > > > the deduplication for all previous chunks. Also by doing this we
> only
> > > > > need bug fixes, we don't need to introduce a new PIP.
> > > > >
> > > > > Could you explain or show a case in what cases would lead to this
> > > > > simpler solution not working?
> > > > >
> > > > > Thanks,
> > > > > Zike Yang
> > > > >
> > > > > On Sat, Aug 26, 2023 at 1:38 PM Heesung Sohn
> > > > > <heesung.s...@streamnative.io.invalid> wrote:
> > > > > >
> > > > > > > In this case, the consumer only can receive m1.
> > > > > >
> > > > > > Regarding this comment, can you explain how the consumer only
> receives m1?
> > > > > > Here, m1's and m2's uuid and msgId will be different(if we
> suffix with a
> > > > > > chunkingSessionId), although the sequence id is the same.
> > > > > >
> > > > > > > If we throw an exception when users use the same sequence to
> send the
> > > > > > message.
> > > > > > Do You mean `If "producers" throw an exception when users use
> the same
> > > > > > sequence to send the message.`.
> > > > > > Again, when the producers restart, they lose the last sequence
> id sent.
> > > > > >
> > > > > >
> > > > > > > If we do not throw an exception when users use the same
> sequence to
> > > > > > send the message.
> > > > > >
> > > > > > For this logic, how do we handle if the producer suddenly
> resends the
> > > > > > chunked message with a different chunking scheme(e.g.
> maxMessageSize) ?
> > > > > > uuid=1, sid=0, cid=0
> > > > > > uuid=1, sid=0, cid=1
> > > > > > uuid=2, sid=0, cid=0
> > > > > > uuid=2, sid=0, cid=1
> > > > > >
> > > > > > We could refine what to track and algo logic on the broker side
> more, but
> > > > > > do we agree that the broker chunk dedup logic is needed?
> > > > > >
> > > > > > I will continue to think more next week. Have a nice weekend.
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Fri, Aug 25, 2023 at 9:14 PM Xiangying Meng <
> xiangy...@apache.org> wrote:
> > > > > >
> > > > > > > Hi Heesung,
> > > > > > >
> > > > > > > Maybe we only need to maintain the last chunk ID in a map.
> > > > > > > Map<producername, chunkID> map1.
> > > > > > > And we already have a map maintaining the last sequence ID.
> > > > > > > Map<producername, sequence ID> map2.
> > > > > > >
> > > > > > > If we do not throw an exception when users use the same
> sequence to
> > > > > > > send the message.
> > > > > > >
> > > > > > > For any incoming msg, m :
> > > > > > > chunk ID = -1;
> > > > > > > If m is a chunk message:
> > > > > > > chunk ID = m.chunkID.
> > > > > > >       If m.currentSeqid < LastSeqId, dedup.
> > > > > > >       If m.currentSeqid > LastSeqId && m.chunk ID = 0, nodedup
> > > > > > >                 if chunk ID exists in the map.
> > > > > > >                    Update it and log an error. This means
> there is an
> > > > > > > incomplete chunk message.
> > > > > > >                 If chunk ID does not exist in the map.
> > > > > > >                    Put it on the map.
> > > > > > >       If m.currentSeqid == LastSeqId,
> > > > > > >            1. if m.chunk ID == -1 || m.chunk ID == 0. dedup.
> > > > > > >            2. If 1 <= m.chunkID <= total chunk.
> > > > > > >               1. If chunk ID does not exist in the map. dedup.
> > > > > > >               2. If chunk ID exists in the map. dedup. Check
> the order
> > > > > > > of the chunkID to determine whether dedup;
> > > > > > >            3. If m.chunkID == total chunk, persistent the
> chunk and
> > > > > > > remove the chunkID in the map.
> > > > > > >
> > > > > > >
> > > > > > > If we throw an exception when users use the same sequence to
> send the
> > > > > > > message.
> > > > > > >
> > > > > > > For any incoming msg, m :
> > > > > > > chunk ID = 0;
> > > > > > > If m is a chunk message:
> > > > > > > chunk ID = m.chunkID.
> > > > > > >    If m.currentSeqid < LastSeqId, dedup.
> > > > > > >    If m.currentSeqid == LastSeqId.
> > > > > > >        If chunkID > 0, Check the last chunkID to determine
> whether to
> > > > > > > dedup.
> > > > > > >             If chunkID == 1, put chunkID into the map if
> absent.
> > > > > > >        IF chunkID = 0, dedup.
> > > > > > >
> > > > > > > BR,
> > > > > > > xiangying
> > > > > > >
> > > > > > > On Sat, Aug 26, 2023 at 11:53 AM Heesung Sohn
> > > > > > > <heesung.s...@streamnative.io.invalid> wrote:
> > > > > > > >
> > > > > > > > However, what If the producer jvm gets restarted after the
> broker
> > > > > > > persists
> > > > > > > > the m1 (but before updating their sequence id in their
> persistence
> > > > > > > > storage), and the producer is trying to resend the same
> msg(so m2) with
> > > > > > > the
> > > > > > > > same sequence id after restarting?
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > On Fri, Aug 25, 2023 at 8:22 PM Xiangying Meng <
> xiangy...@apache.org>
> > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Hi Heesung,
> > > > > > > > >
> > > > > > > > > In this case, the consumer only can receive m1.
> > > > > > > > >
> > > > > > > > > But it has the same content as the previous case: What
> should we do if
> > > > > > > > > the user sends messages with the sequence ID that was used
> previously?
> > > > > > > > >
> > > > > > > > > I am afraid to introduce the incompatibility in this case,
> so I only
> > > > > > > > > added a warning log in the PR[0] instead of throwing an
> exception.
> > > > > > > > > Regarding this matter, what do you think? Should we throw
> an exception
> > > > > > > > > or add error logs?
> > > > > > > > >
> > > > > > > > > I'm looking forward to hearing your viewpoint.
> > > > > > > > >
> > > > > > > > > Thanks,
> > > > > > > > > Xiangying
> > > > > > > > >
> > > > > > > > > [0] https://github.com/apache/pulsar/pull/21047
> > > > > > > > >
> > > > > > > > > On Sat, Aug 26, 2023 at 10:58 AM Heesung Sohn
> > > > > > > > > <heesung.s...@streamnative.io.invalid> wrote:
> > > > > > > > > >
> > > > > > > > > > Actually, can we think about this case too?
> > > > > > > > > >
> > > > > > > > > > What happens if the cx sends the same chunked msg with
> the same seq
> > > > > > > id
> > > > > > > > > when
> > > > > > > > > > dedup is enabled?
> > > > > > > > > >
> > > > > > > > > > // user send a chunked msg, m1
> > > > > > > > > > s1, c0
> > > > > > > > > > s1, c1
> > > > > > > > > > s1, c2 // complete
> > > > > > > > > >
> > > > > > > > > > // user resend the duplicate msg, m2
> > > > > > > > > > s1, c0
> > > > > > > > > > s1, c1
> > > > > > > > > > s1, c2 //complete
> > > > > > > > > >
> > > > > > > > > > Do consumers receive m1 and m2(no dedup)?
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > On Fri, Aug 25, 2023 at 6:55 PM Xiangying Meng <
> xiangy...@apache.org
> > > > > > > >
> > > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > Hi Heesung,
> > > > > > > > > > >
> > > > > > > > > > > >I think this means, for the PIP, the broker side's
> chunk
> > > > > > > > > deduplication.
> > > > > > > > > > > >I think brokers probably need to track map<uuid,
> last_chunk_id> to
> > > > > > > > > dedup
> > > > > > > > > > >
> > > > > > > > > > > What is the significance of doing this?
> > > > > > > > > > > My understanding is that if the client crashes and
> restarts after
> > > > > > > > > > > sending half of a chunk message and then it resends
> the previous
> > > > > > > chunk
> > > > > > > > > > > message, the resent chunk message should be treated as
> a new
> > > > > > > message
> > > > > > > > > > > since it calls the producer's API again.
> > > > > > > > > > > If deduplication is enabled, users should ensure that
> their
> > > > > > > customized
> > > > > > > > > > > sequence ID is not lower than the previous sequence ID.
> > > > > > > > > > > I have considered this scenario and added a warning
> log in PR[0].
> > > > > > > (I'm
> > > > > > > > > > > not sure whether an error log should be added or an
> exception
> > > > > > > thrown.)
> > > > > > > > > > > If deduplication is not enabled, on the consumer side,
> there
> > > > > > > should be
> > > > > > > > > > > an incomplete chunk message received alongside another
> complete
> > > > > > > chunk
> > > > > > > > > > > message, each with a different UUID, and they will not
> interfere
> > > > > > > with
> > > > > > > > > > > each other.
> > > > > > > > > > >
> > > > > > > > > > > My main point is that every message sent using
> > > > > > > > > > > `producer.newMessage().send()` should be treated as a
> new message.
> > > > > > > > > > > UUID is solely used for the consumer side to identify
> different
> > > > > > > chunk
> > > > > > > > > > > messages.
> > > > > > > > > > >
> > > > > > > > > > > BR
> > > > > > > > > > > Xiangying
> > > > > > > > > > >
> > > > > > > > > > > [0] https://github.com/apache/pulsar/pull/21047
> > > > > > > > > > >
> > > > > > > > > > > On Sat, Aug 26, 2023 at 9:34 AM Heesung Sohn
> > > > > > > > > > > <heesung.s...@streamnative.io.invalid> wrote:
> > > > > > > > > > > >
> > > > > > > > > > > > I think this means, for the PIP, the broker side's
> chunk
> > > > > > > > > deduplication.
> > > > > > > > > > > > I think brokers probably need to track map<uuid,
> last_chunk_id>
> > > > > > > to
> > > > > > > > > dedup
> > > > > > > > > > > > chunks on the broker side.
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > On Fri, Aug 25, 2023 at 6:16 PM Xiangying Meng <
> > > > > > > xiangy...@apache.org
> > > > > > > > > >
> > > > > > > > > > > wrote:
> > > > > > > > > > > >
> > > > > > > > > > > > > Hi Heesung
> > > > > > > > > > > > >
> > > > > > > > > > > > > It is a good point.
> > > > > > > > > > > > > Assume the producer application jvm restarts in
> the middle of
> > > > > > > > > chunking
> > > > > > > > > > > and
> > > > > > > > > > > > > resends the message chunks from the beginning with
> the previous
> > > > > > > > > > > sequence
> > > > > > > > > > > > > id.
> > > > > > > > > > > > >
> > > > > > > > > > > > > For the previous version, it should be:
> > > > > > > > > > > > >
> > > > > > > > > > > > > Producer send:
> > > > > > > > > > > > > 1. SequenceID: 0, ChunkID: 0
> > > > > > > > > > > > > 2. SequenceID: 0, ChunkID: 1
> > > > > > > > > > > > > 3. SequenceID: 0, ChunkID: 0
> > > > > > > > > > > > > 4. SequenceID: 0, ChunkID: 1
> > > > > > > > > > > > > 5. SequenceID: 0, ChunkID: 2
> > > > > > > > > > > > >
> > > > > > > > > > > > > Consumer receive:
> > > > > > > > > > > > > 1. SequenceID: 0, ChunkID: 0
> > > > > > > > > > > > > 2. SequenceID: 0, ChunkID: 1
> > > > > > > > > > > > > 3. SequenceID: 0, ChunkID: 0 // chunk ID out of
> order. Release
> > > > > > > this
> > > > > > > > > > > > > chunk and recycle its `chunkedMsgCtx`.
> > > > > > > > > > > > > 4. SequenceID: 0, ChunkID: 1  // chunkedMsgCtx ==
> null Release
> > > > > > > it.
> > > > > > > > > > > > > 5. SequenceID: 0, ChunkID: 2  // chunkedMsgCtx ==
> null Release
> > > > > > > it.
> > > > > > > > > > > > >
> > > > > > > > > > > > > Therefore, for the previous version, this chunk
> message can
> > > > > > > not be
> > > > > > > > > > > > > received by the consumer. It is not an
> incompatibility issue.
> > > > > > > > > > > > >
> > > > > > > > > > > > > However, the solution of optimizing the `uuid` is
> valuable to
> > > > > > > the
> > > > > > > > > new
> > > > > > > > > > > > > implementation.
> > > > > > > > > > > > > I will modify this in the PR[0]. Thank you very
> much for your
> > > > > > > > > reminder
> > > > > > > > > > > > > and the provided UUID optimization solution.
> > > > > > > > > > > > >
> > > > > > > > > > > > > BR,
> > > > > > > > > > > > > Xiangying
> > > > > > > > > > > > >
> > > > > > > > > > > > > [0] https://github.com/apache/pulsar/pull/20948
> > > > > > > > > > > > >
> > > > > > > > > > > > > On Sat, Aug 26, 2023 at 8:48 AM Heesung Sohn
> > > > > > > > > > > > > <heesung.s...@streamnative.io.invalid> wrote:
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Hi, I meant
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > What if the producer application jvm restarts in
> the middle
> > > > > > > of
> > > > > > > > > > > chunking
> > > > > > > > > > > > > and
> > > > > > > > > > > > > > resends the message chunks from the beginning
> with the
> > > > > > > previous
> > > > > > > > > > > sequence
> > > > > > > > > > > > > id?
> > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > On Fri, Aug 25, 2023 at 5:15 PM Xiangying Meng <
> > > > > > > > > xiangy...@apache.org
> > > > > > > > > > > >
> > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Hi Heesung
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > It is a good idea to cover this
> incompatibility case if the
> > > > > > > > > > > producer
> > > > > > > > > > > > > > > splits the chunk message again when retrying.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > But in fact, the producer only resents the
> chunks that are
> > > > > > > > > > > assembled
> > > > > > > > > > > > > > > to `OpSendMsg` instead of splitting the chunk
> message
> > > > > > > again.
> > > > > > > > > > > > > > > So, there is no incompatibility issue of
> resenting the
> > > > > > > chunk
> > > > > > > > > > > message
> > > > > > > > > > > > > > > by splitting the chunk message again.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > The logic of sending chunk messages can be
> found here:
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > >
> > > > > > >
> https://github.com/apache/pulsar/blob/e0c481e5f8d7fa5534d3327785928a234376789e/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L533
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > The logic of resending the message can be
> found here:
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > >
> > > > > > >
> https://github.com/apache/pulsar/blob/e0c481e5f8d7fa5534d3327785928a234376789e/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L1892
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > BR,
> > > > > > > > > > > > > > > Xiangying
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > On Sat, Aug 26, 2023 at 2:24 AM Heesung Sohn
> > > > > > > > > > > > > > > <heesung.s...@streamnative.io.invalid> wrote:
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > >> I think brokers can track the last
> > > > > > > chunkMaxMessageSize for
> > > > > > > > > > > each
> > > > > > > > > > > > > > > producer.
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > Using different chunkMaxMessageSize is
> just one of the
> > > > > > > > > > > aspects. In
> > > > > > > > > > > > > > > > PIP-132 [0], we have included the message
> metadata size
> > > > > > > when
> > > > > > > > > > > checking
> > > > > > > > > > > > > > > > maxMessageSize.The message metadata can be
> changed after
> > > > > > > > > > > splitting
> > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > chunks. We are still uncertain about the way
> the chunked
> > > > > > > > > message
> > > > > > > > > > > is
> > > > > > > > > > > > > > > split,
> > > > > > > > > > > > > > > > even using the same ss chunkMaxMessageSize.
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > This sounds like we need to revisit chunking
> uuid logic.
> > > > > > > > > > > > > > > > Like I commented here,
> > > > > > > > > > > > > > > >
> > > > > > > > >
> https://github.com/apache/pulsar/pull/20948/files#r1305997883
> > > > > > > > > > > > > > > > Why don't we add a chunk session id suffix
> to identify
> > > > > > > the
> > > > > > > > > > > ongoing
> > > > > > > > > > > > > > > chunking
> > > > > > > > > > > > > > > > uniquely?
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > Currently,
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > chunking uuid = producer + sequence_id
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > Proposal
> > > > > > > > > > > > > > > > chunking  uuid = producer + sequence_id +
> > > > > > > chunkingSessionId
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > * chunkingSessionId could be a timestamp
> when the
> > > > > > > chunking
> > > > > > > > > > > started.
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > On Fri, Aug 25, 2023 at 6:02 AM Xiangying
> Meng <
> > > > > > > > > > > xiangy...@apache.org
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > Hi Zike,
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > >How would this happen to get two
> duplicated and
> > > > > > > > > consecutive
> > > > > > > > > > > > > ChunkID-1
> > > > > > > > > > > > > > > > > >messages? The producer should guarantee
> to retry the
> > > > > > > whole
> > > > > > > > > > > chunked
> > > > > > > > > > > > > > > > > >messages instead of some parts of the
> chunks.
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > If the producer guarantees to retry the
> whole chunked
> > > > > > > > > messages
> > > > > > > > > > > > > instead
> > > > > > > > > > > > > > > > > of some parts of the chunks,
> > > > > > > > > > > > > > > > > Why doesn't the bug of the producer retry
> chunk
> > > > > > > messages in
> > > > > > > > > > > the PR
> > > > > > > > > > > > > [0]
> > > > > > > > > > > > > > > > > appear?
> > > > > > > > > > > > > > > > > And why do you need to set `chunkId` in
> > > > > > > `op.rePopulate`?
> > > > > > > > > > > > > > > > > It will be rested when split chunk
> messages again if
> > > > > > > the
> > > > > > > > > > > producer
> > > > > > > > > > > > > > > > > guarantees to retry the whole chunked
> messages.
> > > > > > > > > > > > > > > > > ```
> > > > > > > > > > > > > > > > > final MessageMetadata finalMsgMetadata =
> msgMetadata;
> > > > > > > > > > > > > > > > > op.rePopulate = () -> {
> > > > > > > > > > > > > > > > > if (msgMetadata.hasChunkId()) {
> > > > > > > > > > > > > > > > > // The message metadata is shared between
> all chunks
> > > > > > > in a
> > > > > > > > > large
> > > > > > > > > > > > > message
> > > > > > > > > > > > > > > > > // We need to reset the chunk id for each
> call of this
> > > > > > > > > method
> > > > > > > > > > > > > > > > > // It's safe to do that because there is
> only 1 thread
> > > > > > > to
> > > > > > > > > > > > > manipulate
> > > > > > > > > > > > > > > > > this message metadata
> > > > > > > > > > > > > > > > > finalMsgMetadata.setChunkId(chunkId);
> > > > > > > > > > > > > > > > > }
> > > > > > > > > > > > > > > > > op.cmd = sendMessage(producerId,
> sequenceId,
> > > > > > > numMessages,
> > > > > > > > > > > > > messageId,
> > > > > > > > > > > > > > > > > finalMsgMetadata,
> > > > > > > > > > > > > > > > > encryptedPayload);
> > > > > > > > > > > > > > > > > };
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > ```
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > >> But chunks 1, 2, 3, and 4 are still
> persisted in the
> > > > > > > > > topic.
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > >I think it's OK to persist them all on
> the topic. Is
> > > > > > > > > there any
> > > > > > > > > > > > > issue
> > > > > > > > > > > > > > > > > >with doing that?
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > This is just one scenario. Whether only
> check the
> > > > > > > sequence
> > > > > > > > > ID
> > > > > > > > > > > of
> > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > first chunk (as I used in PR[1]) or check
> the sequence
> > > > > > > ID
> > > > > > > > > of
> > > > > > > > > > > the
> > > > > > > > > > > > > last
> > > > > > > > > > > > > > > > > chunk (as you suggested), in reality,
> neither of these
> > > > > > > > > methods
> > > > > > > > > > > can
> > > > > > > > > > > > > > > > > deduplicate chunks on the broker side
> because the
> > > > > > > broker
> > > > > > > > > cannot
> > > > > > > > > > > > > know
> > > > > > > > > > > > > > > > > the chunk ID of the previous message.
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > However, if combined with the optimization
> of
> > > > > > > consumer-side
> > > > > > > > > > > logic,
> > > > > > > > > > > > > > > > > end-to-end deduplication can be completed.
> > > > > > > > > > > > > > > > > This is also a less-than-perfect solution
> I mentioned
> > > > > > > in my
> > > > > > > > > > > first
> > > > > > > > > > > > > > > > > email and implemented in PR[1].
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > The reason I propose this proposal is not
> to solve the
> > > > > > > > > > > end-to-end
> > > > > > > > > > > > > > > > > deduplication of chunk messages between
> producers and
> > > > > > > > > > > consumers.
> > > > > > > > > > > > > That
> > > > > > > > > > > > > > > > > aspect has essentially been addressed in
> PR[1] and is
> > > > > > > still
> > > > > > > > > > > > > undergoing
> > > > > > > > > > > > > > > > > review.
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > This proposal aims to ensure that no
> corrupt data
> > > > > > > exists
> > > > > > > > > > > within the
> > > > > > > > > > > > > > > > > topic, as our data might be offloaded and
> used
> > > > > > > elsewhere in
> > > > > > > > > > > > > scenarios
> > > > > > > > > > > > > > > > > where consumer logic is not optimized.
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > BR,
> > > > > > > > > > > > > > > > > Xiangying
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > [0]
> https://github.com/apache/pulsar/pull/21048
> > > > > > > > > > > > > > > > > [1]
> https://github.com/apache/pulsar/pull/20948
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > On Fri, Aug 25, 2023 at 5:18 PM Zike Yang <
> > > > > > > z...@apache.org
> > > > > > > > > >
> > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > HI xiangying
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > The rewind operation is seen in the
> test log.
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > That seems weird. Not sure if this
> rewind is related
> > > > > > > to
> > > > > > > > > the
> > > > > > > > > > > chunk
> > > > > > > > > > > > > > > > > consuming.
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > 1. SequenceID: 0, ChunkID: 0
> > > > > > > > > > > > > > > > > > 2. SequenceID: 0, ChunkID: 1
> > > > > > > > > > > > > > > > > > 3. SequenceID: 0, ChunkID: 1
> > > > > > > > > > > > > > > > > > 4. SequenceID: 0, ChunkID: 2
> > > > > > > > > > > > > > > > > > Such four chunks cannot be processed
> correctly by the
> > > > > > > > > > > consumer.
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > How would this happen to get two
> duplicated and
> > > > > > > > > consecutive
> > > > > > > > > > > > > ChunkID-1
> > > > > > > > > > > > > > > > > > messages? The producer should guarantee
> to retry the
> > > > > > > > > whole
> > > > > > > > > > > > > chunked
> > > > > > > > > > > > > > > > > > messages instead of some parts of the
> chunks.
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > But chunks 1, 2, 3, and 4 are still
> persisted in
> > > > > > > the
> > > > > > > > > topic.
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > I think it's OK to persist them all in
> the topic. Is
> > > > > > > > > there
> > > > > > > > > > > any
> > > > > > > > > > > > > issue
> > > > > > > > > > > > > > > > > > with doing that?
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > There is another point. The resend of
> the chunk
> > > > > > > message
> > > > > > > > > > > has a
> > > > > > > > > > > > > bug
> > > > > > > > > > > > > > > that
> > > > > > > > > > > > > > > > > > I shared with you, and you fixed in PR
> [0]. It will
> > > > > > > make
> > > > > > > > > this
> > > > > > > > > > > > > case
> > > > > > > > > > > > > > > > > > happen in another way.
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > If the user sets the sequence ID
> manually, the case
> > > > > > > > > could be
> > > > > > > > > > > > > > > reproduced.
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > BR,
> > > > > > > > > > > > > > > > > > Zike Yang
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > On Thu, Aug 24, 2023 at 8:48 PM
> Xiangying Meng <
> > > > > > > > > > > > > xiangy...@apache.org
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > >IIUC, this may change the existing
> behavior and
> > > > > > > may
> > > > > > > > > > > introduce
> > > > > > > > > > > > > > > > > inconsistencies.
> > > > > > > > > > > > > > > > > > > >Suppose that we have a large message
> with 3
> > > > > > > chunks.
> > > > > > > > > But
> > > > > > > > > > > the
> > > > > > > > > > > > > > > producer
> > > > > > > > > > > > > > > > > > > >crashes and resends the message after
> sending the
> > > > > > > > > > > chunk-1. It
> > > > > > > > > > > > > will
> > > > > > > > > > > > > > > > > > > >send a total of 5 messages to the
> Pulsar topic:
> > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > >1. SequenceID: 0, ChunkID: 0
> > > > > > > > > > > > > > > > > > > >2. SequenceID: 0, ChunkID: 1
> > > > > > > > > > > > > > > > > > > >3. SequenceID: 0, ChunkID: 0   ->
> This message
> > > > > > > will be
> > > > > > > > > > > dropped
> > > > > > > > > > > > > > > > > > > >4. SequenceID: 0, ChunkID: 1    ->
> Will also be
> > > > > > > > > dropped
> > > > > > > > > > > > > > > > > > > >5. SequenceID: 0, ChunkID: 2    ->
> The last chunk
> > > > > > > of
> > > > > > > > > the
> > > > > > > > > > > > > message
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > Hi Zike
> > > > > > > > > > > > > > > > > > > There is another point. The resend of
> the chunk
> > > > > > > message
> > > > > > > > > > > has a
> > > > > > > > > > > > > bug
> > > > > > > > > > > > > > > that
> > > > > > > > > > > > > > > > > > > I shared with you, and you fixed in PR
> [0]. It will
> > > > > > > > > make
> > > > > > > > > > > this
> > > > > > > > > > > > > case
> > > > > > > > > > > > > > > > > > > happen in another way.
> > > > > > > > > > > > > > > > > > > Sample description for the  bug:
> > > > > > > > > > > > > > > > > > > Because the chunk message uses the
> same message
> > > > > > > > > metadata,
> > > > > > > > > > > if
> > > > > > > > > > > > > the
> > > > > > > > > > > > > > > chunk
> > > > > > > > > > > > > > > > > > > is not sent out immediately. Then,
> when resending,
> > > > > > > all
> > > > > > > > > > > chunks
> > > > > > > > > > > > > of
> > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > > same chunk message use the chunk ID of
> the last
> > > > > > > chunk.
> > > > > > > > > > > > > > > > > > > In this case, It should happen as:
> > > > > > > > > > > > > > > > > > > 1. SequenceID: 0, ChunkID: 0 (Put op1
> into
> > > > > > > > > > > `pendingMessages`
> > > > > > > > > > > > > and
> > > > > > > > > > > > > > > send)
> > > > > > > > > > > > > > > > > > > 2. SequenceID: 0, ChunkID: 1 (Put op2
> into
> > > > > > > > > > > `pendingMessages`
> > > > > > > > > > > > > and
> > > > > > > > > > > > > > > send)
> > > > > > > > > > > > > > > > > > > 3. SequenceID: 0, ChunkID: 2   -> (Put
> op3 into
> > > > > > > > > > > > > `pendingMessages`)
> > > > > > > > > > > > > > > > > > > 4. SequenceID: 0, ChunkID: 2   ->
> (Resend op1)
> > > > > > > > > > > > > > > > > > > 5. SequenceID: 0, ChunkID: 2   ->
> (Resend op2)
> > > > > > > > > > > > > > > > > > > 6. SequenceID: 0, ChunkID: 2   ->
> (Send op3)
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > BR,
> > > > > > > > > > > > > > > > > > > Xiangying
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > [0] -
> https://github.com/apache/pulsar/pull/21048
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > On Thu, Aug 24, 2023 at 8:09 PM
> Xiangying Meng <
> > > > > > > > > > > > > > > xiangy...@apache.org>
> > > > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > >> This solution also cannot solve
> the
> > > > > > > out-of-order
> > > > > > > > > > > messages
> > > > > > > > > > > > > > > inside
> > > > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > > > >>chunks. For example, the above
> five messages
> > > > > > > will
> > > > > > > > > > > still be
> > > > > > > > > > > > > > > > > persisted.
> > > > > > > > > > > > > > > > > > > > >The consumer already handles this
> case. The
> > > > > > > above 5
> > > > > > > > > > > messages
> > > > > > > > > > > > > > > will
> > > > > > > > > > > > > > > > > all
> > > > > > > > > > > > > > > > > > > > >be persisted but the consumer will
> skip message
> > > > > > > 1
> > > > > > > > > and 2.
> > > > > > > > > > > > > > > > > > > > >For messages 3, 4, and 5. The
> producer can
> > > > > > > guarantee
> > > > > > > > > > > these
> > > > > > > > > > > > > > > chunks
> > > > > > > > > > > > > > > > > are in order.
> > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > The rewind operation is seen in the
> test log.
> > > > > > > Every
> > > > > > > > > time
> > > > > > > > > > > an
> > > > > > > > > > > > > > > incorrect
> > > > > > > > > > > > > > > > > > > > chunk message is received, it will
> rewind, and
> > > > > > > the
> > > > > > > > > code
> > > > > > > > > > > has
> > > > > > > > > > > > > yet
> > > > > > > > > > > > > > > to be
> > > > > > > > > > > > > > > > > > > > studied in depth.
> > > > > > > > > > > > > > > > > > > > If it does not call rewind, then
> this case is
> > > > > > > > > considered
> > > > > > > > > > > a
> > > > > > > > > > > > > > > workable
> > > > > > > > > > > > > > > > > > > > case. Let's look at another case.
> > > > > > > > > > > > > > > > > > > > 1. SequenceID: 0, ChunkID: 0
> > > > > > > > > > > > > > > > > > > > 2. SequenceID: 0, ChunkID: 1
> > > > > > > > > > > > > > > > > > > > 3. SequenceID: 0, ChunkID: 1
> > > > > > > > > > > > > > > > > > > > 4. SequenceID: 0, ChunkID: 2
> > > > > > > > > > > > > > > > > > > > Such four chunks cannot be processed
> correctly
> > > > > > > by the
> > > > > > > > > > > > > consumer.
> > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > In fact, this solution is my
> original idea. The
> > > > > > > PR I
> > > > > > > > > > > > > mentioned
> > > > > > > > > > > > > > > in the
> > > > > > > > > > > > > > > > > > > > first email above uses a similar
> solution and
> > > > > > > > > modifies
> > > > > > > > > > > the
> > > > > > > > > > > > > logic
> > > > > > > > > > > > > > > on
> > > > > > > > > > > > > > > > > > > > the consumer side.
> > > > > > > > > > > > > > > > > > > > Also, as I mentioned in the first
> email, this
> > > > > > > > > solution
> > > > > > > > > > > can
> > > > > > > > > > > > > only
> > > > > > > > > > > > > > > solve
> > > > > > > > > > > > > > > > > > > > the problem of end-to-end
> duplication. But
> > > > > > > chunks 1,
> > > > > > > > > 2,
> > > > > > > > > > > 3,
> > > > > > > > > > > > > and 4
> > > > > > > > > > > > > > > are
> > > > > > > > > > > > > > > > > > > > still persisted in the topic.
> > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > On Thu, Aug 24, 2023 at 3:00 PM Zike
> Yang <
> > > > > > > > > > > z...@apache.org>
> > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > Hi Heesung,
> > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > I believe in this PIP "similar
> to the
> > > > > > > existing
> > > > > > > > > > > "sequence
> > > > > > > > > > > > > ID
> > > > > > > > > > > > > > > map",
> > > > > > > > > > > > > > > > > > > > > to facilitate effective filtering"
> actually
> > > > > > > means
> > > > > > > > > > > tracking
> > > > > > > > > > > > > the
> > > > > > > > > > > > > > > last
> > > > > > > > > > > > > > > > > > > > > chunkId(not all chunk ids) on the
> broker side.
> > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > With this simple solution, I think
> we don't
> > > > > > > need to
> > > > > > > > > > > track
> > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > > > > (sequenceID, chunkID) on the
> broker side at
> > > > > > > all.
> > > > > > > > > The
> > > > > > > > > > > broker
> > > > > > > > > > > > > > > just
> > > > > > > > > > > > > > > > > needs
> > > > > > > > > > > > > > > > > > > > > to apply the deduplication logic
> to the last
> > > > > > > chunk
> > > > > > > > > > > instead
> > > > > > > > > > > > > of
> > > > > > > > > > > > > > > all
> > > > > > > > > > > > > > > > > > > > > previous chunks. This PIP actually
> could do
> > > > > > > that,
> > > > > > > > > but
> > > > > > > > > > > it
> > > > > > > > > > > > > will
> > > > > > > > > > > > > > > > > > > > > introduce a new data format and
> compatibility
> > > > > > > > > issue.
> > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > This is still a behavior
> change(deduping
> > > > > > > chunk
> > > > > > > > > > > messages
> > > > > > > > > > > > > on
> > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > broker),
> > > > > > > > > > > > > > > > > > > > > and I believe we need to discuss
> this addition
> > > > > > > as a
> > > > > > > > > > > PIP.
> > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > Actually, we didn't specifically
> state the
> > > > > > > deduping
> > > > > > > > > > > chunk
> > > > > > > > > > > > > > > message
> > > > > > > > > > > > > > > > > > > > > behavior before. The chunked
> message should be
> > > > > > > > > equally
> > > > > > > > > > > > > > > applicable
> > > > > > > > > > > > > > > > > to
> > > > > > > > > > > > > > > > > > > > > the de-duplication logic as a
> regular message.
> > > > > > > > > > > Therefore, I
> > > > > > > > > > > > > > > think
> > > > > > > > > > > > > > > > > it
> > > > > > > > > > > > > > > > > > > > > should be considered as a bug fix.
> But if this
> > > > > > > FIX
> > > > > > > > > is
> > > > > > > > > > > worth
> > > > > > > > > > > > > > > > > discussing
> > > > > > > > > > > > > > > > > > > > > in depth. I have no objection to
> it being a new
> > > > > > > > > PIP.
> > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > I think brokers can track the
> last
> > > > > > > > > > > chunkMaxMessageSize
> > > > > > > > > > > > > for
> > > > > > > > > > > > > > > > > > > > > each producer.
> > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > Using different
> chunkMaxMessageSize is just
> > > > > > > one of
> > > > > > > > > the
> > > > > > > > > > > > > > > aspects. In
> > > > > > > > > > > > > > > > > > > > > PIP-132 [0], we have included the
> message
> > > > > > > metadata
> > > > > > > > > size
> > > > > > > > > > > > > when
> > > > > > > > > > > > > > > > > checking
> > > > > > > > > > > > > > > > > > > > > maxMessageSize.
> > > > > > > > > > > > > > > > > > > > > The message metadata can be
> changed after
> > > > > > > > > splitting the
> > > > > > > > > > > > > > > chunks. We
> > > > > > > > > > > > > > > > > are
> > > > > > > > > > > > > > > > > > > > > still uncertain about the way the
> chunked
> > > > > > > message
> > > > > > > > > is
> > > > > > > > > > > split,
> > > > > > > > > > > > > > > even
> > > > > > > > > > > > > > > > > using
> > > > > > > > > > > > > > > > > > > > > the same ss chunkMaxMessageSize.
> > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > then the brokers can assume that
> the
> > > > > > > producer is
> > > > > > > > > > > > > resending
> > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > chunks from
> > > > > > > > > > > > > > > > > > > > > the beginning with a different
> scheme(restarted
> > > > > > > > > with a
> > > > > > > > > > > > > > > different
> > > > > > > > > > > > > > > > > > > > > chunkMaxMessageSize) and accept
> those new
> > > > > > > chunks
> > > > > > > > > from
> > > > > > > > > > > the
> > > > > > > > > > > > > > > > > beginning.
> > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > Regarding this, it seems like we
> are
> > > > > > > implementing
> > > > > > > > > > > dynamic
> > > > > > > > > > > > > > > > > > > > > configuration for the
> chunkMaxMessageSize. I'm
> > > > > > > > > afraid
> > > > > > > > > > > that
> > > > > > > > > > > > > this
> > > > > > > > > > > > > > > > > would
> > > > > > > > > > > > > > > > > > > > > change the expected behavior and
> introduce more
> > > > > > > > > > > complexity
> > > > > > > > > > > > > to
> > > > > > > > > > > > > > > this
> > > > > > > > > > > > > > > > > > > > > configuration.
> > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > [0]
> > > > > > > https://github.com/apache/pulsar/pull/14007
> > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > BR,
> > > > > > > > > > > > > > > > > > > > > Zike Yang
> > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > On Thu, Aug 24, 2023 at 2:21 PM
> Zike Yang <
> > > > > > > > > > > z...@apache.org
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > Hi, xiangying
> > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > it will find that the message
> > > > > > > > > > > > > > > > > > > > > > is out of order and rewind the
> cursor. Loop
> > > > > > > this
> > > > > > > > > > > > > operation,
> > > > > > > > > > > > > > > and
> > > > > > > > > > > > > > > > > > > > > > discard this message after it
> expires
> > > > > > > instead of
> > > > > > > > > > > > > assembling
> > > > > > > > > > > > > > > 3,
> > > > > > > > > > > > > > > > > 4, 5
> > > > > > > > > > > > > > > > > > > > > > into a message.
> > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > Could you point out where the
> implementation
> > > > > > > for
> > > > > > > > > > > this?
> > > > > > > > > > > > > From
> > > > > > > > > > > > > > > my
> > > > > > > > > > > > > > > > > > > > > > understanding, there should not
> be any rewind
> > > > > > > > > > > operation
> > > > > > > > > > > > > for
> > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > > > > > chunking feature. You can check
> more detail
> > > > > > > here:
> > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > >
> > > > > > >
> https://streamnative.io/blog/deep-dive-into-message-chunking-in-pulsar#how-message-chunking-is-implemented
> > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > This solution also cannot
> solve the
> > > > > > > > > out-of-order
> > > > > > > > > > > > > messages
> > > > > > > > > > > > > > > > > inside the
> > > > > > > > > > > > > > > > > > > > > > chunks. For example, the above
> five messages
> > > > > > > will
> > > > > > > > > > > still
> > > > > > > > > > > > > be
> > > > > > > > > > > > > > > > > persisted.
> > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > The consumer already handles
> this case. The
> > > > > > > > > above 5
> > > > > > > > > > > > > messages
> > > > > > > > > > > > > > > > > will all
> > > > > > > > > > > > > > > > > > > > > > be persisted but the consumer
> will skip
> > > > > > > message 1
> > > > > > > > > > > and 2.
> > > > > > > > > > > > > > > > > > > > > > For messages 3, 4, and 5. The
> producer can
> > > > > > > > > guarantee
> > > > > > > > > > > > > these
> > > > > > > > > > > > > > > > > chunks are in order.
> > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > BR,
> > > > > > > > > > > > > > > > > > > > > > Zike Yang
> > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > On Thu, Aug 24, 2023 at 11:48 AM
> Yubiao Feng
> > > > > > > > > > > > > > > > > > > > > > <yubiao.f...@streamnative.io.invalid>
> wrote:
> > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > 1. SequenceID: 0, ChunkID: 0
> > > > > > > > > > > > > > > > > > > > > > > > 2. SequenceID: 0, ChunkID: 1
> > > > > > > > > > > > > > > > > > > > > > > > 3. SequenceID: 0, ChunkID: 0
> > > > > > > > > > > > > > > > > > > > > > > > 4. SequenceID: 0, ChunkID: 1
> > > > > > > > > > > > > > > > > > > > > > > > 5. SequenceID: 0, ChunkID: 2
> > > > > > > > > > > > > > > > > > > > > > > > For the existing behavior,
> the consumer
> > > > > > > > > assembles
> > > > > > > > > > > > > > > > > > > > > > > > messages 3,4,5 into
> > > > > > > > > > > > > > > > > > > > > > > > the original large message.
> But the
> > > > > > > changes
> > > > > > > > > > > brought
> > > > > > > > > > > > > > > > > > > > > > > > about by this PIP
> > > > > > > > > > > > > > > > > > > > > > > > will cause the consumer to
> use messages
> > > > > > > > > 1,2,5 for
> > > > > > > > > > > > > > > > > > > > > > > > assembly. There is
> > > > > > > > > > > > > > > > > > > > > > > > no guarantee that the
> producer will
> > > > > > > split the
> > > > > > > > > > > message
> > > > > > > > > > > > > > > > > > > > > > > > in the same way
> > > > > > > > > > > > > > > > > > > > > > > > twice before and after. For
> example, the
> > > > > > > > > > > producer's
> > > > > > > > > > > > > > > > > > > > > > > > maxMessageSize may
> > > > > > > > > > > > > > > > > > > > > > > > be different. This may cause
> the
> > > > > > > consumer to
> > > > > > > > > > > > > > > > > > > > > > > > receive a corrupt
> > > > > > > > > > > > > > > > > > > > > > > > message.
> > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > Good point.
> > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > Thanks
> > > > > > > > > > > > > > > > > > > > > > > Yubiao Feng
> > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > On Wed, Aug 23, 2023 at
> 12:34 PM Zike Yang
> > > > > > > <
> > > > > > > > > > > > > > > z...@apache.org>
> > > > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > Hi, xiangying,
> > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > Thanks for your PIP.
> > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > IIUC, this may change the
> existing
> > > > > > > behavior
> > > > > > > > > and
> > > > > > > > > > > may
> > > > > > > > > > > > > > > introduce
> > > > > > > > > > > > > > > > > > > > > > > > inconsistencies.
> > > > > > > > > > > > > > > > > > > > > > > > Suppose that we have a large
> message
> > > > > > > with 3
> > > > > > > > > > > chunks.
> > > > > > > > > > > > > But
> > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > producer
> > > > > > > > > > > > > > > > > > > > > > > > crashes and resends the
> message after
> > > > > > > > > sending the
> > > > > > > > > > > > > > > chunk-1.
> > > > > > > > > > > > > > > > > It will
> > > > > > > > > > > > > > > > > > > > > > > > send a total of 5 messages
> to the Pulsar
> > > > > > > > > topic:
> > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > 1. SequenceID: 0, ChunkID: 0
> > > > > > > > > > > > > > > > > > > > > > > > 2. SequenceID: 0, ChunkID: 1
> > > > > > > > > > > > > > > > > > > > > > > > 3. SequenceID: 0, ChunkID:
> 0   -> This
> > > > > > > > > message
> > > > > > > > > > > will
> > > > > > > > > > > > > be
> > > > > > > > > > > > > > > > > dropped
> > > > > > > > > > > > > > > > > > > > > > > > 4. SequenceID: 0, ChunkID:
> 1    -> Will
> > > > > > > also
> > > > > > > > > be
> > > > > > > > > > > > > dropped
> > > > > > > > > > > > > > > > > > > > > > > > 5. SequenceID: 0, ChunkID:
> 2    -> The
> > > > > > > last
> > > > > > > > > > > chunk of
> > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > message
> > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > For the existing behavior,
> the consumer
> > > > > > > > > assembles
> > > > > > > > > > > > > > > messages
> > > > > > > > > > > > > > > > > 3,4,5 into
> > > > > > > > > > > > > > > > > > > > > > > > the original large message.
> But the
> > > > > > > changes
> > > > > > > > > > > brought
> > > > > > > > > > > > > > > about by
> > > > > > > > > > > > > > > > > this PIP
> > > > > > > > > > > > > > > > > > > > > > > > will cause the consumer to
> use messages
> > > > > > > > > 1,2,5 for
> > > > > > > > > > > > > > > assembly.
> > > > > > > > > > > > > > > > > There is
> > > > > > > > > > > > > > > > > > > > > > > > no guarantee that the
> producer will
> > > > > > > split the
> > > > > > > > > > > > > message in
> > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > same way
> > > > > > > > > > > > > > > > > > > > > > > > twice before and after. For
> example, the
> > > > > > > > > > > producer's
> > > > > > > > > > > > > > > > > maxMessageSize may
> > > > > > > > > > > > > > > > > > > > > > > > be different. This may cause
> the
> > > > > > > consumer to
> > > > > > > > > > > receive
> > > > > > > > > > > > > a
> > > > > > > > > > > > > > > > > corrupt
> > > > > > > > > > > > > > > > > > > > > > > > message.
> > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > Also, this PIP increases the
> complexity
> > > > > > > of
> > > > > > > > > > > handling
> > > > > > > > > > > > > > > chunks
> > > > > > > > > > > > > > > > > on the
> > > > > > > > > > > > > > > > > > > > > > > > broker side. Brokers should,
> in general,
> > > > > > > > > treat
> > > > > > > > > > > the
> > > > > > > > > > > > > chunk
> > > > > > > > > > > > > > > as
> > > > > > > > > > > > > > > > > a normal
> > > > > > > > > > > > > > > > > > > > > > > > message.
> > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > I think a simple better
> approach is to
> > > > > > > only
> > > > > > > > > > > check the
> > > > > > > > > > > > > > > > > deduplication
> > > > > > > > > > > > > > > > > > > > > > > > for the last chunk of the
> large message.
> > > > > > > The
> > > > > > > > > > > consumer
> > > > > > > > > > > > > > > only
> > > > > > > > > > > > > > > > > gets the
> > > > > > > > > > > > > > > > > > > > > > > > whole message after
> receiving the last
> > > > > > > > > chunk. We
> > > > > > > > > > > > > don't
> > > > > > > > > > > > > > > need
> > > > > > > > > > > > > > > > > to check
> > > > > > > > > > > > > > > > > > > > > > > > the deduplication for all
> previous
> > > > > > > chunks.
> > > > > > > > > Also
> > > > > > > > > > > by
> > > > > > > > > > > > > doing
> > > > > > > > > > > > > > > > > this we only
> > > > > > > > > > > > > > > > > > > > > > > > need bug fixes, we don't
> need to
> > > > > > > introduce a
> > > > > > > > > new
> > > > > > > > > > > PIP.
> > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > BR,
> > > > > > > > > > > > > > > > > > > > > > > > Zike Yang
> > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > On Fri, Aug 18, 2023 at
> 7:54 PM Xiangying
> > > > > > > > > Meng <
> > > > > > > > > > > > > > > > > xiangy...@apache.org>
> > > > > > > > > > > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > Dear Community,
> > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > I hope this email finds
> you well. I'd
> > > > > > > like
> > > > > > > > > to
> > > > > > > > > > > > > address
> > > > > > > > > > > > > > > an
> > > > > > > > > > > > > > > > > important
> > > > > > > > > > > > > > > > > > > > > > > > > issue related to Apache
> Pulsar and
> > > > > > > discuss
> > > > > > > > > a
> > > > > > > > > > > > > solution
> > > > > > > > > > > > > > > I've
> > > > > > > > > > > > > > > > > proposed on
> > > > > > > > > > > > > > > > > > > > > > > > > GitHub. The problem
> pertains to the
> > > > > > > > > handling of
> > > > > > > > > > > > > Chunk
> > > > > > > > > > > > > > > > > Messages after
> > > > > > > > > > > > > > > > > > > > > > > > > enabling deduplication.
> > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > In the current version of
> Apache
> > > > > > > Pulsar,
> > > > > > > > > all
> > > > > > > > > > > > > chunks of
> > > > > > > > > > > > > > > a
> > > > > > > > > > > > > > > > > Chunk Message
> > > > > > > > > > > > > > > > > > > > > > > > > share the same sequence
> ID. However,
> > > > > > > > > enabling
> > > > > > > > > > > the
> > > > > > > > > > > > > > > > > depublication
> > > > > > > > > > > > > > > > > > > > > > > > > feature results in an
> inability to send
> > > > > > > > > Chunk
> > > > > > > > > > > > > > > Messages. To
> > > > > > > > > > > > > > > > > tackle this
> > > > > > > > > > > > > > > > > > > > > > > > > problem, I've proposed a
> solution [1]
> > > > > > > that
> > > > > > > > > > > ensures
> > > > > > > > > > > > > > > > > messages are not
> > > > > > > > > > > > > > > > > > > > > > > > > duplicated throughout
> end-to-end
> > > > > > > delivery.
> > > > > > > > > > > While
> > > > > > > > > > > > > this
> > > > > > > > > > > > > > > fix
> > > > > > > > > > > > > > > > > addresses
> > > > > > > > > > > > > > > > > > > > > > > > > the duplication issue for
> end-to-end
> > > > > > > > > messages,
> > > > > > > > > > > > > there
> > > > > > > > > > > > > > > > > remains a
> > > > > > > > > > > > > > > > > > > > > > > > > possibility of duplicate
> chunks within
> > > > > > > > > topics.
> > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > To address this concern, I
> believe we
> > > > > > > > > should
> > > > > > > > > > > > > introduce
> > > > > > > > > > > > > > > a
> > > > > > > > > > > > > > > > > "Chunk ID
> > > > > > > > > > > > > > > > > > > > > > > > > map" at the Broker level,
> similar to
> > > > > > > the
> > > > > > > > > > > existing
> > > > > > > > > > > > > > > > > "sequence ID map",
> > > > > > > > > > > > > > > > > > > > > > > > > to facilitate effective
> filtering.
> > > > > > > However,
> > > > > > > > > > > > > > > implementing
> > > > > > > > > > > > > > > > > this has led
> > > > > > > > > > > > > > > > > > > > > > > > > to a challenge: a producer
> requires
> > > > > > > storage
> > > > > > > > > > > for two
> > > > > > > > > > > > > > > Long
> > > > > > > > > > > > > > > > > values
> > > > > > > > > > > > > > > > > > > > > > > > > simultaneously (sequence
> ID and chunk
> > > > > > > ID).
> > > > > > > > > > > Because
> > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > snapshot of the
> > > > > > > > > > > > > > > > > > > > > > > > > sequence ID map is stored
> through the
> > > > > > > > > > > properties
> > > > > > > > > > > > > of the
> > > > > > > > > > > > > > > > > cursor
> > > > > > > > > > > > > > > > > > > > > > > > > (Map<String, Long>), so in
> order to
> > > > > > > > > satisfy the
> > > > > > > > > > > > > > > storage of
> > > > > > > > > > > > > > > > > two Longs
> > > > > > > > > > > > > > > > > > > > > > > > > (sequence ID, chunk ID)
> corresponding
> > > > > > > to
> > > > > > > > > one
> > > > > > > > > > > > > producer,
> > > > > > > > > > > > > > > we
> > > > > > > > > > > > > > > > > hope to add
> > > > > > > > > > > > > > > > > > > > > > > > > a mark DeleteProperties
> (Map<String,
> > > > > > > Long>)
> > > > > > > > > > > String,
> > > > > > > > > > > > > > > > > String>) to
> > > > > > > > > > > > > > > > > > > > > > > > > replace the properties
> (Map<String,
> > > > > > > Long>)
> > > > > > > > > > > field.
> > > > > > > > > > > > > To
> > > > > > > > > > > > > > > > > resolve this,
> > > > > > > > > > > > > > > > > > > > > > > > > I've proposed an
> alternative proposal
> > > > > > > [2]
> > > > > > > > > > > > > involving the
> > > > > > > > > > > > > > > > > introduction
> > > > > > > > > > > > > > > > > > > > > > > > > of a "mark
> DeleteProperties"
> > > > > > > (Map<String,
> > > > > > > > > > > String>)
> > > > > > > > > > > > > to
> > > > > > > > > > > > > > > > > replace the
> > > > > > > > > > > > > > > > > > > > > > > > > current properties
> (Map<String, Long>)
> > > > > > > > > field.
> > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > I'd appreciate it if you
> carefully
> > > > > > > review
> > > > > > > > > both
> > > > > > > > > > > PRs
> > > > > > > > > > > > > and
> > > > > > > > > > > > > > > > > share your
> > > > > > > > > > > > > > > > > > > > > > > > > valuable feedback and
> insights. Thank
> > > > > > > you
> > > > > > > > > > > > > immensely for
> > > > > > > > > > > > > > > > > your time and
> > > > > > > > > > > > > > > > > > > > > > > > > attention. I eagerly
> anticipate your
> > > > > > > > > valuable
> > > > > > > > > > > > > opinions
> > > > > > > > > > > > > > > and
> > > > > > > > > > > > > > > > > > > > > > > > > recommendations.
> > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > Warm regards,
> > > > > > > > > > > > > > > > > > > > > > > > > Xiangying
> > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > [1]
> > > > > > > > > > > https://github.com/apache/pulsar/pull/20948
> > > > > > > > > > > > > > > > > > > > > > > > > [2]
> > > > > > > > > > > https://github.com/apache/pulsar/pull/21027
> > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > >
> > > > > > >
>

Reply via email to