AHeise commented on a change in pull request #16900:
URL: https://github.com/apache/flink/pull/16900#discussion_r696344008



##########
File path: 
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceOptions.java
##########
@@ -429,21 +513,28 @@ private PulsarSourceOptions() {
     public static final ConfigOption<Integer> PULSAR_MAX_REDELIVER_COUNT =
             ConfigOptions.key(CONSUMER_CONFIG_PREFIX + 
"deadLetterPolicy.maxRedeliverCount")
                     .intType()
-                    .defaultValue(0);
+                    .defaultValue(0)
+                    .withDescription(
+                            "Maximum number of times that a message will be 
redelivered before being sent to the dead letter queue.");
 
     public static final ConfigOption<String> PULSAR_RETRY_LETTER_TOPIC =
             ConfigOptions.key(CONSUMER_CONFIG_PREFIX + 
"deadLetterPolicy.retryLetterTopic")
                     .stringType()
-                    .noDefaultValue();
+                    .noDefaultValue()
+                    .withDescription(
+                            "Name of the retry topic where the failing 
messages will be sent.");
     public static final ConfigOption<String> PULSAR_DEAD_LETTER_TOPIC =
             ConfigOptions.key(CONSUMER_CONFIG_PREFIX + 
"deadLetterPolicy.deadLetterTopic")
                     .stringType()
-                    .noDefaultValue();
+                    .noDefaultValue()
+                    .withDescription(
+                            "Name of the dead topic where the failing messages 
will be sent.");
 
     public static final ConfigOption<Boolean> PULSAR_RETRY_ENABLE =
             ConfigOptions.key(CONSUMER_CONFIG_PREFIX + "retryEnable")
                     .booleanType()
-                    .defaultValue(false);
+                    .defaultValue(false)
+                    .withDescription("If enabled, the consumer will auto retry 
message.");

Review comment:
       ```suggestion
                       .withDescription("If enabled, the consumer will auto 
retry messages.");
   ```

##########
File path: 
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.java
##########
@@ -360,64 +359,52 @@
 
     /**
      * Set an arbitrary property for the PulsarSource and PulsarConsumer. The 
valid keys can be
-     * found in {@link PulsarSourceOptions}.
+     * found in {@link PulsarSourceOptions} and {@link PulsarOptions}.
      *
-     * @param key the key of the property.
-     * @param value the value of the property.
-     * @return this PulsarSourceBuilder.
-     */
-    public PulsarSourceBuilder<OUT> setProperty(String key, String value) {
-        checkNotNull(key);
-        checkNotNull(value);
-        if (configuration.containsKey(key)) {
-            ConfigOption<String> rawOption = 
ConfigOptions.key(key).stringType().noDefaultValue();
-            LOG.warn(
-                    "Config option {} already has a value {}, override to new 
value {}",
-                    key,
-                    configuration.getString(rawOption),
-                    value);
-        }
-        configuration.setString(key, value);
-        return this;
-    }
-
-    /**
-     * Set an arbitrary property for the PulsarSource and PulsarConsumer. The 
valid keys can be
-     * found in {@link PulsarSourceOptions}.
+     * <p>Make sure the option could be set only once or with same value.
      *
      * @param key the key of the property.
      * @param value the value of the property.
      * @return this PulsarSourceBuilder.
      */
-    public <T> PulsarSourceBuilder<OUT> setProperty(ConfigOption<T> key, T 
value) {
+    public <T> PulsarSourceBuilder<OUT> setConfig(ConfigOption<T> key, T 
value) {
         checkNotNull(key);
         checkNotNull(value);
         if (configuration.contains(key)) {
             T oldValue = configuration.get(key);
-            LOG.warn(
-                    "Config option {} already has a value {}, override to new 
value {}",
-                    key,
-                    oldValue,
-                    value);
+            checkArgument(
+                    Objects.equals(oldValue, value),
+                    "This option %s has been set to value %s.",

Review comment:
       ```suggestion
   "This option %s has been already set to value %s.",
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to