wenbingshen commented on code in PR #95: URL: https://github.com/apache/flink-connector-pulsar/pull/95#discussion_r1738475655
########## flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/CursorPosition.java: ########## @@ -104,6 +105,19 @@ private MessageId getActualMessageId(MessageIdAdv messageIdImpl) { if (include) { return messageIdImpl; } else { + // if the message is batched, should return next single message in current batch. + if (messageIdImpl.getBatchIndex() >= 0 + && messageIdImpl.getBatchSize() > 0 + && messageIdImpl.getBatchIndex() != messageIdImpl.getBatchSize() - 1) { + return new BatchMessageIdImpl( + messageIdImpl.getLedgerId(), + messageIdImpl.getEntryId(), + messageIdImpl.getPartitionIndex(), + messageIdImpl.getBatchIndex() + 1, + messageIdImpl.getBatchSize(), + messageIdImpl.getAckSet()); Review Comment: > Should we change the act set here for making sure the current batch index has been acknowledged? > > https://github.com/apache/pulsar/blob/dccc06bf50bb5ca510b39167908c02d2b4602ca5/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdAdvUtils.java#L50 There is no need to modify this AckSet, because we finally call Consumer seek. In the seek method, the messages before batchIndex will be cumulative acked. The AckSet here will not work regardless of its status. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org