syhily commented on a change in pull request #16900: URL: https://github.com/apache/flink/pull/16900#discussion_r694833397
########## File path: flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/config/PulsarSourceConfigUtils.java ########## @@ -234,25 +239,87 @@ public static void checkConfigurations(Configuration configuration) { PulsarClient client, Schema<T> schema, ConsumerConfigurationData<T> config) throws PulsarClientException { // ConsumerBuilder don't support using the given ConsumerConfigurationData directly. - ConsumerBuilder<T> consumerBuilder = client.newConsumer(schema).loadConf(configMap(config)); + ConsumerBuilder<T> builder = new ConsumerBuilderImpl<>((PulsarClientImpl) client, schema); - // Set some non-serializable fields. - if (config.getMessageListener() != null) { - consumerBuilder.messageListener(config.getMessageListener()); - } - if (config.getConsumerEventListener() != null) { - consumerBuilder.consumerEventListener(config.getConsumerEventListener()); - } - if (config.getCryptoKeyReader() != null) { - consumerBuilder.cryptoKeyReader(config.getCryptoKeyReader()); - } - if (config.getMessageCrypto() != null) { - consumerBuilder.messageCrypto(config.getMessageCrypto()); - } - if (config.getBatchReceivePolicy() != null) { - consumerBuilder.batchReceivePolicy(config.getBatchReceivePolicy()); + // Since pulsar don't expose the config constructor. + // We have to set the builder fields one by one. + setConfig(config.getTopicNames(), topics -> builder.topics(new ArrayList<>(topics))); + setConfig(config.getTopicsPattern(), builder::topicsPattern); + setConfig(config.getSubscriptionName(), builder::subscriptionName); + setConfig( + config.getAckTimeoutMillis(), + millis -> builder.ackTimeout(millis, TimeUnit.MILLISECONDS)); + setConfig(config.isAckReceiptEnabled(), builder::isAckReceiptEnabled); + setConfig( + config.getTickDurationMillis(), + millis -> builder.ackTimeoutTickTime(millis, TimeUnit.MILLISECONDS)); + setConfig( + config.getNegativeAckRedeliveryDelayMicros(), + micros -> builder.negativeAckRedeliveryDelay(micros, TimeUnit.MICROSECONDS)); + setConfig(config.getSubscriptionType(), builder::subscriptionType); + setConfig(config.getSubscriptionMode(), builder::subscriptionMode); + setConfig(config.getMessageListener(), builder::messageListener); + setConfig(config.getConsumerEventListener(), builder::consumerEventListener); + setConfig(config.getCryptoKeyReader(), builder::cryptoKeyReader); + setConfig(config.getMessageCrypto(), builder::messageCrypto); + setConfig(config.getCryptoFailureAction(), builder::cryptoFailureAction); + setConfig(config.getReceiverQueueSize(), builder::receiverQueueSize); + setConfig( + config.getAcknowledgementsGroupTimeMicros(), + micros -> builder.acknowledgmentGroupTime(micros, TimeUnit.MICROSECONDS)); + setConfig(config.getConsumerName(), builder::consumerName); + setConfig(config.getPriorityLevel(), builder::priorityLevel); + setConfig(config.getMaxPendingChunkedMessage(), builder::maxPendingChunkedMessage); + setConfig( + config.isAutoAckOldestChunkedMessageOnQueueFull(), + builder::autoAckOldestChunkedMessageOnQueueFull); + setConfig(config.getProperties(), builder::properties); + setConfig( + config.getMaxTotalReceiverQueueSizeAcrossPartitions(), + builder::maxTotalReceiverQueueSizeAcrossPartitions); + setConfig(config.isReadCompacted(), builder::readCompacted); + setConfig(config.getPatternAutoDiscoveryPeriod(), builder::patternAutoDiscoveryPeriod); + setConfig(config.getPatternAutoDiscoveryPeriod(), builder::patternAutoDiscoveryPeriod); + setConfig(config.getSubscriptionInitialPosition(), builder::subscriptionInitialPosition); + setConfig(config.getRegexSubscriptionMode(), builder::subscriptionTopicsMode); + setConfig(config.isReplicateSubscriptionState(), builder::replicateSubscriptionState); + setConfig(config.getDeadLetterPolicy(), builder::deadLetterPolicy); + setConfig(config.isAutoUpdatePartitions(), builder::autoUpdatePartitions); + setConfig( + config.getAutoUpdatePartitionsIntervalSeconds(), + seconds -> + builder.autoUpdatePartitionsInterval( + Math.toIntExact(seconds), TimeUnit.SECONDS)); + if (config.isResetIncludeHead()) { + builder.startMessageIdInclusive(); } + setConfig(config.getBatchReceivePolicy(), builder::batchReceivePolicy); + setConfig(config.getKeySharedPolicy(), builder::keySharedPolicy); + setConfig(config.isRetryEnable(), builder::enableRetry); + setConfig(config.isBatchIndexAckEnabled(), builder::enableBatchIndexAcknowledgment); + setConfig( + config.getExpireTimeOfIncompleteChunkedMessageMillis(), + millis -> + builder.expireTimeOfIncompleteChunkedMessage( + millis, TimeUnit.MILLISECONDS)); + setConfig(config.isPoolMessages(), builder::poolMessages); - return consumerBuilder.subscribe(); + return builder.subscribe(); + } + + private static <T> void setConfig(T value, java.util.function.Consumer<T> consumer) { + if (value != null) { + if (value instanceof Collection) { + if (!((Collection<?>) value).isEmpty()) { + consumer.accept(value); + } + } else if (value instanceof Map) { + if (!((Map<?, ?>) value).isEmpty()) { + consumer.accept(value); + } + } else { + consumer.accept(value); + } Review comment: Seems like we have to combine this mock server with config refactor in one PR. The tests on mock server need the removal of PulsarJsonUtils. I would divide them in two commits. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org