wuchong commented on a change in pull request #12805:
URL: https://github.com/apache/flink/pull/12805#discussion_r450703944



##########
File path: 
flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/table/Kafka010DynamicTableFactory.java
##########
@@ -65,18 +74,31 @@ protected KafkaDynamicSinkBase createKafkaTableSink(
                        String topic,
                        Properties properties,
                        Optional<FlinkKafkaPartitioner<RowData>> partitioner,
-                       EncodingFormat<SerializationSchema<RowData>> 
encodingFormat) {
+                       EncodingFormat<SerializationSchema<RowData>> 
encodingFormat,
+                       String semantic) {
 
                return new Kafka010DynamicSink(
                        consumedDataType,
                        topic,
                        properties,
                        partitioner,
-                       encodingFormat);
+                       encodingFormat,
+                       semantic);
        }
 
        @Override
        public String factoryIdentifier() {
                return IDENTIFIER;
        }
+
+       @Override
+       public Set<ConfigOption<?>> optionalOptions() {
+               final Set<ConfigOption<?>> options = new HashSet<>();
+               options.add(PROPS_GROUP_ID);
+               options.add(SCAN_STARTUP_MODE);
+               options.add(SCAN_STARTUP_SPECIFIC_OFFSETS);
+               options.add(SCAN_STARTUP_TIMESTAMP_MILLIS);
+               options.add(SINK_PARTITIONER);
+               return options;

Review comment:
       remove sink semantic option.

##########
File path: 
flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/table/Kafka010DynamicTableFactoryTest.java
##########
@@ -82,12 +87,47 @@ protected KafkaDynamicSinkBase getExpectedSink(
                        String topic,
                        Properties properties,
                        Optional<FlinkKafkaPartitioner<RowData>> partitioner,
-                       EncodingFormat<SerializationSchema<RowData>> 
encodingFormat) {
+                       EncodingFormat<SerializationSchema<RowData>> 
encodingFormat,
+                       String semantic) {
+               // we only support "at-least-semantic" for kafka-0.10 connector.
+               // if users use `sink.semantic` to select behaviour for 
kafka-0.10 connector,
+               // he/she will get validation error of unsupported option.
                return new Kafka010DynamicSink(
                                consumedDataType,
                                topic,
                                properties,
                                partitioner,
-                               encodingFormat);
+                               encodingFormat,
+                               "AT_LEAST_ONCE");
+       }
+
+       @Override
+       protected Map<String, String> getFullSinkOptions(){
+               Map<String, String> options = super.getFullSinkOptions();
+               options.remove("sink.semantic");
+               return options;
+       }
+
+       @Override
+       public void testInvalidSinkSemantic() {
+               ObjectIdentifier objectIdentifier = ObjectIdentifier.of(
+                       "default",
+                       "default",
+                       "sinkTable");
+
+               final Map<String, String> modifiedOptions = getModifiedOptions(
+                       getFullSinkOptions(),
+                       options -> {
+                               options.put("sink.semantic", "exactly-once");
+                       });
+               final CatalogTable sinkTable = 
createKafkaSinkCatalogTable(modifiedOptions);
+
+               thrown.expect(ValidationException.class);

Review comment:
       Please check the exception message too. If the expected message is 
nested in the stack, you can use 
`org.apache.flink.core.testutils.FlinkMatchers#containsCause` util.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to