syhily commented on code in PR #24: URL: https://github.com/apache/flink-connector-pulsar/pull/24#discussion_r1103520528
########## flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReader.java: ########## @@ -231,7 +232,11 @@ public void handleSplitsChanges(SplitsChange<PulsarPartitionSplit> splitsChanges } // Create pulsar consumer. - this.pulsarConsumer = createPulsarConsumer(registeredSplit.getPartition()); + try { + this.pulsarConsumer = createPulsarConsumer(registeredSplit.getPartition()); + } catch (PulsarClientException e) { Review Comment: This should be the difference between Kafka client and Pulsar client. Kafka use polling API, and the client is created before handling the split. Pulsar share the consumers in a same client instance, every consumer will support only one split. So we have to create the consumer here. And the exception have to be wrapped into a runtime exception. I think we should expose exceptions in SplitReader.handleSplitsChanges` on the Flink side. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org