XComp commented on code in PR #24: URL: https://github.com/apache/flink-connector-pulsar/pull/24#discussion_r1102525817
########## flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeOperator.java: ########## @@ -80,7 +72,7 @@ import static org.apache.pulsar.common.partition.PartitionedTopicMetadata.NON_PARTITIONED; /** A pulsar cluster operator used for operating pulsar instance. */ -public class PulsarRuntimeOperator implements Closeable { +public class PulsarRuntimeOperator { Review Comment: ```suggestion public class PulsarRuntimeOperator implements AutoClosable { ``` We're still implementing the close method. using AutoClosable here would enable us to use Java' autoclosable feature. ########## flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/sink/PulsarSinkTestContext.java: ########## @@ -55,7 +56,11 @@ public Sink<String> createSink(TestingSinkSettings sinkSettings) { // Create the topic if it needs. if (creatTopic()) { for (String topic : topics) { - operator.createTopic(topic, 4); + try { + operator.createTopic(topic, 4); + } catch (Exception e) { Review Comment: I created FLINK-31014: It's annoying that we have to deal with these exceptions here. I don't see a point to worry about them in the test code. The interfaces should allow exceptions to be forwarded. ########## flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReader.java: ########## @@ -231,7 +232,11 @@ public void handleSplitsChanges(SplitsChange<PulsarPartitionSplit> splitsChanges } // Create pulsar consumer. - this.pulsarConsumer = createPulsarConsumer(registeredSplit.getPartition()); + try { + this.pulsarConsumer = createPulsarConsumer(registeredSplit.getPartition()); + } catch (PulsarClientException e) { Review Comment: It's odd that we have to transform the `PulsarClientException` here. I see two reasons: * Either we should expose exceptions in SplitReader.handleSplitsChanges` on the Flink side * Or we're not implementing the interface properly on the Pulsar connector side I'm curious what your opinion is on that one because I'm not that familiar with the SDK part of the code ########## flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.java: ########## @@ -619,7 +619,7 @@ private void ensureSubscriberIsNull(String attemptingSubscribeMode) { private void ensureSchemaTypeIsValid(Schema<?> schema) { SchemaInfo info = schema.getSchemaInfo(); - if (info.getType() == SchemaType.AUTO_CONSUME || info.getType() == SchemaType.AUTO) { + if (info.getType() == SchemaType.AUTO_CONSUME) { Review Comment: How is this related to FLINK-30109? ########## flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/cases/MultipleTopicsConsumingContext.java: ########## @@ -56,7 +57,11 @@ protected String subscriptionName() { @Override protected String generatePartitionName() { String topic = topicPrefix + index; - operator.createTopic(topic, 1); + try { + operator.createTopic(topic, 1); + } catch (Exception e) { Review Comment: Same here: Why don't we expose the exception in `generatePartitionName`? We're generating redundant code for transforming an `Exception` into a `FlinkRuntimeException` for a test context, if I see it right. ########## flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumerator.java: ########## @@ -220,11 +223,14 @@ private void createSubscription(List<TopicPartition> newPartitions) { CursorPosition position = startCursor.position(partition.getTopic(), partition.getPartitionId()); - if (sourceConfiguration.isResetSubscriptionCursor()) { - sneakyAdmin(() -> position.seekPosition(pulsarAdmin, topic, subscriptionName)); - } else { - sneakyAdmin( - () -> position.createInitialPosition(pulsarAdmin, topic, subscriptionName)); + try { + if (sourceConfiguration.isResetSubscriptionCursor()) { + position.seekPosition(pulsarAdmin, topic, subscriptionName); + } else { + position.createInitialPosition(pulsarAdmin, topic, subscriptionName); + } + } catch (PulsarAdminException e) { + throw new FlinkRuntimeException(e); Review Comment: Going up the call hierarchy, this method exception would be exposed (and not caught) in a method that's uses for error handling which sounds strange? Or is it reasonable? :thinking: ########## flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/utils/PulsarTransactionUtils.java: ########## @@ -46,17 +45,18 @@ private PulsarTransactionUtils() { /** Create transaction with given timeout millis. */ public static Transaction createTransaction(PulsarClient pulsarClient, long timeoutMs) { try { - CompletableFuture<Transaction> future = - sneakyClient(pulsarClient::newTransaction) - .withTransactionTimeout(timeoutMs, TimeUnit.MILLISECONDS) - .build(); - - return future.get(); + return pulsarClient + .newTransaction() + .withTransactionTimeout(timeoutMs, TimeUnit.MILLISECONDS) + .build() + .get(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new IllegalStateException(e); } catch (ExecutionException e) { throw new FlinkRuntimeException(unwrap(e)); + } catch (PulsarClientException e) { Review Comment: It looks like `PulsarClientException` could be exposed instead of being transformed into a `FlinkRuntimeException`. Eventually, it would be forwarded to `PulsarWriter.createMessageBuilder` which exposes `PulsarClientException` again. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org