syhily commented on code in PR #95: URL: https://github.com/apache/flink-connector-pulsar/pull/95#discussion_r1739589754
########## flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/CursorPosition.java: ########## @@ -104,6 +105,19 @@ private MessageId getActualMessageId(MessageIdAdv messageIdImpl) { if (include) { return messageIdImpl; } else { + // if the message is batched, should return next single message in current batch. + if (messageIdImpl.getBatchIndex() >= 0 + && messageIdImpl.getBatchSize() > 0 + && messageIdImpl.getBatchIndex() != messageIdImpl.getBatchSize() - 1) { + return new BatchMessageIdImpl( + messageIdImpl.getLedgerId(), + messageIdImpl.getEntryId(), + messageIdImpl.getPartitionIndex(), + messageIdImpl.getBatchIndex() + 1, + messageIdImpl.getBatchSize(), + messageIdImpl.getAckSet()); Review Comment: 1. The receive queue setting https://github.com/apache/flink-connector-pulsar/blob/b37a8b32f30683664ff25888d403c4de414043e1/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceOptions.java#L282 has been exposed to the use with default value `1000` 2. IIUC, the support for the batch `AckSet` is achieved locally by the pulsar-client after all the batch message has been acked. (BTW, this shouldn't be touched by the connector user and developer, which should be promised by the pulsar client developer.) 3. The recover is queried from the checkpoint saved `MessageId`. Which the AckSet is controlled internally by the client I think. ########## flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReader.java: ########## @@ -196,7 +197,14 @@ public void handleSplitsChanges(SplitsChange<PulsarPartitionSplit> splitsChanges MessageId latestConsumedId = registeredSplit.getLatestConsumedId(); if (latestConsumedId != null) { - LOG.info("Reset subscription position by the checkpoint {}", latestConsumedId); + if (latestConsumedId instanceof BatchMessageIdImpl) { Review Comment: I can see that all the message implementation implement the `MessageIdAdv` interface. Which contains all the required information for the client. I think it's more better to use MessageIdAdv instead of the `MessageId` here in the whole connector. ``` /** * The {@link MessageId} interface provided for advanced users. * <p> * All built-in MessageId implementations should be able to be cast to MessageIdAdv. * </p> */ ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org