syhily commented on code in PR #95:
URL: 
https://github.com/apache/flink-connector-pulsar/pull/95#discussion_r1739589754


##########
flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/CursorPosition.java:
##########
@@ -104,6 +105,19 @@ private MessageId getActualMessageId(MessageIdAdv 
messageIdImpl) {
         if (include) {
             return messageIdImpl;
         } else {
+            // if the message is batched, should return next single message in 
current batch.
+            if (messageIdImpl.getBatchIndex() >= 0
+                    && messageIdImpl.getBatchSize() > 0
+                    && messageIdImpl.getBatchIndex() != 
messageIdImpl.getBatchSize() - 1) {
+                return new BatchMessageIdImpl(
+                        messageIdImpl.getLedgerId(),
+                        messageIdImpl.getEntryId(),
+                        messageIdImpl.getPartitionIndex(),
+                        messageIdImpl.getBatchIndex() + 1,
+                        messageIdImpl.getBatchSize(),
+                        messageIdImpl.getAckSet());

Review Comment:
   1. The receive queue setting 
https://github.com/apache/flink-connector-pulsar/blob/b37a8b32f30683664ff25888d403c4de414043e1/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceOptions.java#L282
 has been exposed to the use with default value `1000`
   2. IIUC, the support for the batch `AckSet` is achieved locally by the 
pulsar-client after all the batch message has been acked. (BTW, this shouldn't 
be touched by the connector user and developer, which should be promised by the 
pulsar client developer.)
   3. The recover is queried from the checkpoint saved `MessageId`. Which the 
AckSet is controlled internally by the client I think.



##########
flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReader.java:
##########
@@ -196,7 +197,14 @@ public void 
handleSplitsChanges(SplitsChange<PulsarPartitionSplit> splitsChanges
         MessageId latestConsumedId = registeredSplit.getLatestConsumedId();
 
         if (latestConsumedId != null) {
-            LOG.info("Reset subscription position by the checkpoint {}", 
latestConsumedId);
+            if (latestConsumedId instanceof BatchMessageIdImpl) {

Review Comment:
   I can see that all the message implementation implement the `MessageIdAdv` 
interface. Which contains all the required information for the client. I think 
it's more better to use MessageIdAdv instead of the `MessageId` here in the 
whole connector.
   
   ```
   /**
    * The {@link MessageId} interface provided for advanced users.
    * <p>
    * All built-in MessageId implementations should be able to be cast to 
MessageIdAdv.
    * </p>
    */
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to