wuchong commented on a change in pull request #12805:
URL: https://github.com/apache/flink/pull/12805#discussion_r449967255



##########
File path: 
flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTestBase.java
##########
@@ -386,6 +411,7 @@ private CatalogTable 
createKafkaSinkCatalogTable(Map<String, String> options) {
                tableOptions.put("properties.group.id", "dummy");
                tableOptions.put("properties.bootstrap.servers", "dummy");
                tableOptions.put("sink.partitioner", 
KafkaOptions.SINK_PARTITIONER_VALUE_FIXED);
+               tableOptions.put("sink.semantic", 
KafkaOptions.SINK_SEMANTIC_VALUE_AT_LEAST_ONCE);

Review comment:
       use exactly-once to verify the configuration works, because default 
value is at-least-once. 

##########
File path: docs/dev/table/connectors/kafka.md
##########
@@ -165,6 +165,14 @@ Connector Options
       </ul>
       </td>
     </tr>
+    <tr>
+      <td><h5>sink.semantic</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">at-least-once</td>
+      <td>String</td>
+      <td>Optional semantic when commit. Valid enumerationns are 
['at-lease-once', 'exactly-once', 'none']. 
+          Only Kafka whose version greater than 1.0.0 support 'exactly-once' 
with checkpointing enabled.</td>

Review comment:
       ```suggestion
         <td>Defines the delivery semantic for the Kafka sink. Valid 
enumerationns are <code>'at-lease-once'</code>, <code>'exactly-once'</code> and 
<code>'none'</code>. <code>'kafka-0.10'</code> doesn't support this option. See 
<a href='#consistency-guarantees'>Consistency guarantees</a> for more details. 
</td>
   ```

##########
File path: docs/dev/table/connectors/kafka.md
##########
@@ -206,6 +214,8 @@ However, it will cause a lot of network connections between 
all the Flink instan
 ### Consistency guarantees
 
 By default, a Kafka sink ingests data with at-least-once guarantees into a 
Kafka topic if the query is executed with [checkpointing enabled]({% link 
dev/stream/state/checkpointing.md %}#enabling-and-configuring-checkpointing).
+With Kafka whose version is greater than 1.0.0, `sink.semantic` can provide 
exactly-once delivery guarantee. Whenever you write to Kafka using 
transactions, do not forget about setting the desired `isolation.level`
+(`read_committed` or `read_uncommitted` - latter one is the default value) for 
any application consuming records from Kafka.

Review comment:
       ```suggestion
   
   With Flink's checkpointing enabled, the `kafka` and `kafka-0.11` connectors 
can provide exactly-once delivery guarantees.
   
   Besides enabling Flink's checkpointing, you can also choose three different 
modes of operating chosen by passing appropriate `sink.semantic` option:
   
    * `NONE`: Flink will not guarantee anything. Produced records can be lost 
or they can be duplicated.
    * `AT_LEAST_ONCE` (default setting): This guarantees that no records will 
be lost (although they can be duplicated).
    * `EXACTLY_ONCE`: Kafka transactions will be used to provide exactly-once 
semantic. Whenever you write
    to Kafka using transactions, do not forget about setting desired 
`isolation.level` (`read_committed`
    or `read_uncommitted` - the latter one is the default value) for any 
application consuming records
    from Kafka.
   
   Please refer to [Kafka documentation]({% link dev/connectors/kafka.md 
%}#kafka-producers-and-fault-tolerance) for more caveats about delivery 
guarantees.
   ```

##########
File path: 
flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/table/Kafka011DynamicSink.java
##########
@@ -42,26 +44,31 @@ public Kafka011DynamicSink(
                        String topic,
                        Properties properties,
                        Optional<FlinkKafkaPartitioner<RowData>> partitioner,
-                       EncodingFormat<SerializationSchema<RowData>> 
encodingFormat) {
+                       EncodingFormat<SerializationSchema<RowData>> 
encodingFormat,
+                       String semantic) {
                super(
                                consumedDataType,
                                topic,
                                properties,
                                partitioner,
-                               encodingFormat);
+                               encodingFormat,
+                               semantic);
        }
 
        @Override
        protected SinkFunction<RowData> createKafkaProducer(
                        String topic,
                        Properties properties,
                        SerializationSchema<RowData> serializationSchema,
-                       Optional<FlinkKafkaPartitioner<RowData>> partitioner) {
+                       Optional<FlinkKafkaPartitioner<RowData>> partitioner,
+                       String semantic) {
                return new FlinkKafkaProducer011<>(
                                topic,
-                               serializationSchema,
+                               new 
KeyedSerializationSchemaWrapper<>(serializationSchema),
                                properties,
-                               partitioner);
+                               partitioner,
+                               getSemantic(semantic),

Review comment:
       I suggest to move this logic into `KafkaOptions` to avoid duplicate 
code. You can have a table level enum class `SinkSemantic`. And simply call 
`FlinkKafkaProducer.Semantic.valueOf(semantic.name())` here.

##########
File path: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaOptions.java
##########
@@ -191,6 +208,21 @@ private static void validateSinkPartitioner(ReadableConfig 
tableOptions) {
                                });
        }
 
+       private static void validateSinkSemantic(ReadableConfig tableOptions, 
String kafkaVersion) {
+               tableOptions.getOptional(SINK_SEMANTIC).ifPresent(semantic -> {
+                       if (!SINK_SEMANTIC_ENUMS.contains(semantic)){
+                               throw new ValidationException(
+                                       String.format("Unsupported value '%s' 
for '%s'. Supported values are ['at-least-once', 'exactly-once', 'none'].",
+                                               semantic, SINK_SEMANTIC.key()));
+                       }
+
+                       if (kafkaVersion.equals("kafka-0.10") && 
(!SINK_SEMANTIC_VALUE_AT_LEAST_ONCE.equals(semantic))){

Review comment:
       This is indeed what we should avoid. A base class shouldn't depends on 
specific connector implementation. We should move this special logic into 
`Kafka010DynamicTableFactory`. 
   
   A simple way to disallow this option in kafka-0.10 is override 
`optionalOptions()` and remove `SINK_SEMANTIC` from it. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to