XComp commented on code in PR #24:
URL: 
https://github.com/apache/flink-connector-pulsar/pull/24#discussion_r1102525817


##########
flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeOperator.java:
##########
@@ -80,7 +72,7 @@
 import static 
org.apache.pulsar.common.partition.PartitionedTopicMetadata.NON_PARTITIONED;
 
 /** A pulsar cluster operator used for operating pulsar instance. */
-public class PulsarRuntimeOperator implements Closeable {
+public class PulsarRuntimeOperator {

Review Comment:
   ```suggestion
   public class PulsarRuntimeOperator implements AutoClosable {
   ```
   We're still implementing the close method. using AutoClosable here would 
enable us to use Java' autoclosable feature.



##########
flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/sink/PulsarSinkTestContext.java:
##########
@@ -55,7 +56,11 @@ public Sink<String> createSink(TestingSinkSettings 
sinkSettings) {
         // Create the topic if it needs.
         if (creatTopic()) {
             for (String topic : topics) {
-                operator.createTopic(topic, 4);
+                try {
+                    operator.createTopic(topic, 4);
+                } catch (Exception e) {

Review Comment:
   I created FLINK-31014: It's annoying that we have to deal with these 
exceptions here. I don't see a point to worry about them in the test code. The 
interfaces should allow exceptions to be forwarded.



##########
flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReader.java:
##########
@@ -231,7 +232,11 @@ public void 
handleSplitsChanges(SplitsChange<PulsarPartitionSplit> splitsChanges
         }
 
         // Create pulsar consumer.
-        this.pulsarConsumer = 
createPulsarConsumer(registeredSplit.getPartition());
+        try {
+            this.pulsarConsumer = 
createPulsarConsumer(registeredSplit.getPartition());
+        } catch (PulsarClientException e) {

Review Comment:
   It's odd that we have to transform the `PulsarClientException` here. I see 
two reasons:
   * Either we should expose exceptions in SplitReader.handleSplitsChanges` on 
the Flink side
   * Or we're not implementing the interface properly on the Pulsar connector 
side
   I'm curious what your opinion is on that one because I'm not that familiar 
with the SDK part of the code



##########
flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.java:
##########
@@ -619,7 +619,7 @@ private void ensureSubscriberIsNull(String 
attemptingSubscribeMode) {
 
     private void ensureSchemaTypeIsValid(Schema<?> schema) {
         SchemaInfo info = schema.getSchemaInfo();
-        if (info.getType() == SchemaType.AUTO_CONSUME || info.getType() == 
SchemaType.AUTO) {
+        if (info.getType() == SchemaType.AUTO_CONSUME) {

Review Comment:
   How is this related to FLINK-30109?



##########
flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/cases/MultipleTopicsConsumingContext.java:
##########
@@ -56,7 +57,11 @@ protected String subscriptionName() {
     @Override
     protected String generatePartitionName() {
         String topic = topicPrefix + index;
-        operator.createTopic(topic, 1);
+        try {
+            operator.createTopic(topic, 1);
+        } catch (Exception e) {

Review Comment:
   Same here: Why don't we expose the exception in `generatePartitionName`? 
We're generating redundant code for transforming an `Exception` into a 
`FlinkRuntimeException` for a test context, if I see it right.



##########
flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumerator.java:
##########
@@ -220,11 +223,14 @@ private void createSubscription(List<TopicPartition> 
newPartitions) {
             CursorPosition position =
                     startCursor.position(partition.getTopic(), 
partition.getPartitionId());
 
-            if (sourceConfiguration.isResetSubscriptionCursor()) {
-                sneakyAdmin(() -> position.seekPosition(pulsarAdmin, topic, 
subscriptionName));
-            } else {
-                sneakyAdmin(
-                        () -> position.createInitialPosition(pulsarAdmin, 
topic, subscriptionName));
+            try {
+                if (sourceConfiguration.isResetSubscriptionCursor()) {
+                    position.seekPosition(pulsarAdmin, topic, 
subscriptionName);
+                } else {
+                    position.createInitialPosition(pulsarAdmin, topic, 
subscriptionName);
+                }
+            } catch (PulsarAdminException e) {
+                throw new FlinkRuntimeException(e);

Review Comment:
   Going up the call hierarchy, this method exception would be exposed (and not 
caught) in a method that's uses for error handling which sounds strange? Or is 
it reasonable? :thinking: 



##########
flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/utils/PulsarTransactionUtils.java:
##########
@@ -46,17 +45,18 @@ private PulsarTransactionUtils() {
     /** Create transaction with given timeout millis. */
     public static Transaction createTransaction(PulsarClient pulsarClient, 
long timeoutMs) {
         try {
-            CompletableFuture<Transaction> future =
-                    sneakyClient(pulsarClient::newTransaction)
-                            .withTransactionTimeout(timeoutMs, 
TimeUnit.MILLISECONDS)
-                            .build();
-
-            return future.get();
+            return pulsarClient
+                    .newTransaction()
+                    .withTransactionTimeout(timeoutMs, TimeUnit.MILLISECONDS)
+                    .build()
+                    .get();
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
             throw new IllegalStateException(e);
         } catch (ExecutionException e) {
             throw new FlinkRuntimeException(unwrap(e));
+        } catch (PulsarClientException e) {

Review Comment:
   It looks like `PulsarClientException` could be exposed instead of being 
transformed into a `FlinkRuntimeException`. Eventually, it would be forwarded 
to `PulsarWriter.createMessageBuilder` which exposes `PulsarClientException` 
again.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to