wuchong commented on a change in pull request #12908:
URL: https://github.com/apache/flink/pull/12908#discussion_r464809980



##########
File path: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryBase.java
##########
@@ -72,26 +80,34 @@ public DynamicTableSource createDynamicTableSource(Context 
context) {
                FactoryUtil.TableFactoryHelper helper = 
FactoryUtil.createTableFactoryHelper(this, context);
 
                ReadableConfig tableOptions = helper.getOptions();
-
-               String topic = tableOptions.get(TOPIC);
                DecodingFormat<DeserializationSchema<RowData>> decodingFormat = 
helper.discoverDecodingFormat(
                                DeserializationFormatFactory.class,
                                FactoryUtil.FORMAT);
                // Validate the option data type.
                helper.validateExcept(PROPERTIES_PREFIX);
                // Validate the option values.
-               validateTableOptions(tableOptions);
+               validateTableSourceOptions(tableOptions);
 
                DataType producedDataType = 
context.getCatalogTable().getSchema().toPhysicalRowDataType();
-               final StartupOptions startupOptions = 
getStartupOptions(tableOptions, topic);
+
+               final StartupOptions startupOptions = 
getStartupOptions(tableOptions);
+               final Properties properties = 
getKafkaProperties(context.getCatalogTable().getOptions());
+               // add topic-partition discovery
+               
properties.setProperty(FlinkKafkaConsumerBase.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS,
+                       String.valueOf(tableOptions
+                               .getOptional(SCAN_TOPIC_PARTITION_DISCOVERY)
+                               .map(val -> val.toMillis())

Review comment:
       ```suggestion
                                .map(Duration::toMillis)
   ```

##########
File path: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaOptions.java
##########
@@ -160,12 +178,45 @@ private KafkaOptions() {}
        // Validation
        // 
--------------------------------------------------------------------------------------------
 
-       public static void validateTableOptions(ReadableConfig tableOptions) {
+       public static void validateTableSourceOptions(ReadableConfig 
tableOptions) {
+               validateSourceTopic(tableOptions);
                validateScanStartupMode(tableOptions);
+       }
+
+       public static void validateTableSinkOptions(ReadableConfig 
tableOptions) {
+               validateSinkTopic(tableOptions);
                validateSinkPartitioner(tableOptions);
                validateSinkSemantic(tableOptions);
        }
 
+       public static void validateSourceTopic(ReadableConfig tableOptions) {
+               Optional<String> topic = tableOptions.getOptional(TOPIC);
+               Optional<String> pattern = 
tableOptions.getOptional(TOPIC_PATTERN);
+
+               if (topic.isPresent() && pattern.isPresent()) {
+                       throw new ValidationException("Option 'topic' and 
'topic-pattern' shouldn't be set together.");
+               }
+
+               if (!topic.isPresent() && !pattern.isPresent()) {
+                       throw new ValidationException("Either 'topic' or 
'topic-pattern' must be set.");
+               }
+       }
+
+       public static void validateSinkTopic(ReadableConfig tableOptions) {
+               String errorMessageTemp = "Flink Kafka sink currently only 
supports single topic, but got %s: %s.";
+               if (!isSingleTopic(tableOptions)) {
+                       if 
(tableOptions.getOptional(TOPIC_PATTERN).isPresent()) {
+                               throw new ValidationException(String.format(
+                                       errorMessageTemp, "'topic-pattern'", 
tableOptions.get(TOPIC_PATTERN)
+                               ));
+                       } else {
+                               throw new ValidationException(String.format(
+                                       errorMessageTemp, "topic-list", 
tableOptions.get(TOPIC)

Review comment:
       "topic-list" -> "topic"? We don't have "topic-list" option.

##########
File path: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryBase.java
##########
@@ -72,26 +80,34 @@ public DynamicTableSource createDynamicTableSource(Context 
context) {
                FactoryUtil.TableFactoryHelper helper = 
FactoryUtil.createTableFactoryHelper(this, context);
 
                ReadableConfig tableOptions = helper.getOptions();
-
-               String topic = tableOptions.get(TOPIC);
                DecodingFormat<DeserializationSchema<RowData>> decodingFormat = 
helper.discoverDecodingFormat(
                                DeserializationFormatFactory.class,
                                FactoryUtil.FORMAT);
                // Validate the option data type.
                helper.validateExcept(PROPERTIES_PREFIX);
                // Validate the option values.
-               validateTableOptions(tableOptions);
+               validateTableSourceOptions(tableOptions);
 
                DataType producedDataType = 
context.getCatalogTable().getSchema().toPhysicalRowDataType();
-               final StartupOptions startupOptions = 
getStartupOptions(tableOptions, topic);
+
+               final StartupOptions startupOptions = 
getStartupOptions(tableOptions);
+               final Properties properties = 
getKafkaProperties(context.getCatalogTable().getOptions());
+               // add topic-partition discovery
+               
properties.setProperty(FlinkKafkaConsumerBase.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS,

Review comment:
       ???

##########
File path: 
flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTestBase.java
##########
@@ -139,10 +152,12 @@ public void testTableSource() {
                                Thread.currentThread().getContextClassLoader());
 
                // Test scan source equals
+               
KAFKA_SOURCE_PROPERTIES.setProperty("flink.partition-discovery.interval-millis",
 "1000");

Review comment:
       Is it still needed? Because we have set it in static block. 

##########
File path: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryBase.java
##########
@@ -168,10 +185,14 @@ protected abstract KafkaDynamicSinkBase 
createKafkaTableSink(
        @Override
        public Set<ConfigOption<?>> optionalOptions() {
                final Set<ConfigOption<?>> options = new HashSet<>();
+               options.add(TOPIC);
+               options.add(TOPIC_PATTERN);
                options.add(PROPS_GROUP_ID);
                options.add(SCAN_STARTUP_MODE);
                options.add(SCAN_STARTUP_SPECIFIC_OFFSETS);
+               options.add(SCAN_TOPIC_PARTITION_DISCOVERY);
                options.add(SCAN_STARTUP_TIMESTAMP_MILLIS);
+               options.add(SCAN_TOPIC_PARTITION_DISCOVERY);

Review comment:
       duplicate

##########
File path: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaOptions.java
##########
@@ -239,9 +293,25 @@ public static KafkaSinkSemantic 
getSinkSemantic(ReadableConfig tableOptions){
                }
        }
 
-       public static StartupOptions getStartupOptions(
-                       ReadableConfig tableOptions,
-                       String topic) {
+       public static List<String> getSourceTopics(ReadableConfig tableOptions) 
{
+               return tableOptions.getOptional(TOPIC).map(value ->
+                       Arrays
+                               .stream(value.split(","))
+                               .map(String::trim)
+                               .collect(Collectors.toList()))
+                       .orElse(null);
+       }
+
+       public static Pattern getSourceTopicPattern(ReadableConfig 
tableOptions) {
+               return tableOptions.getOptional(TOPIC_PATTERN).map(value -> 
Pattern.compile(value)).orElse(null);
+       }
+
+       private static boolean isSingleTopic(ReadableConfig tableOptions) {
+               // Option 'topic-pattern' is regarded as multi-topics.
+               return tableOptions.getOptional(TOPIC).isPresent() && 
tableOptions.get(TOPIC).split(",").length == 1;

Review comment:
       The community recommend to use List ConfigOption for list values, 
framework will handle the parsing. This will also change to use `;` as the 
separator, but this is more align with other list options. You can declare a 
List ConfigOption by :
   
   ```java
        public static final ConfigOption<List<String>> TOPIC = ConfigOptions
                        .key("topic")
                        .stringType()
                        .asList()
                        .noDefaultValue()
                        .withDescription("...");
   ```
   
   Then you can call `return tableOptions.getOptional(TOPIC).map(t -> t.size() 
== 1).orElse(false);` here. 
   
   
   Sorry for the late reminder.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to