wuchong commented on a change in pull request #12805:
URL: https://github.com/apache/flink/pull/12805#discussion_r451951070



##########
File path: docs/dev/table/connectors/kafka.md
##########
@@ -207,6 +214,19 @@ However, it will cause a lot of network connections 
between all the Flink instan
 
 By default, a Kafka sink ingests data with at-least-once guarantees into a 
Kafka topic if the query is executed with [checkpointing enabled]({% link 
dev/stream/state/checkpointing.md %}#enabling-and-configuring-checkpointing).
 
+With Flink's checkpointing enabled, the `kafka` and `kafka-0.11` connectors 
can provide exactly-once delivery guarantees.
+
+Besides enabling Flink's checkpointing, you can also choose three different 
modes of operating chosen by passing appropriate `sink.semantic` option:
+
+ * `NONE`: Flink will not guarantee anything. Produced records can be lost or 
they can be duplicated.
+ * `AT_LEAST_ONCE` (default setting): This guarantees that no records will be 
lost (although they can be duplicated).

Review comment:
       ```suggestion
    * `at-least-once` (default setting): This guarantees that no records will 
be lost (although they can be duplicated).
   ```

##########
File path: docs/dev/table/connectors/kafka.md
##########
@@ -207,6 +214,19 @@ However, it will cause a lot of network connections 
between all the Flink instan
 
 By default, a Kafka sink ingests data with at-least-once guarantees into a 
Kafka topic if the query is executed with [checkpointing enabled]({% link 
dev/stream/state/checkpointing.md %}#enabling-and-configuring-checkpointing).
 
+With Flink's checkpointing enabled, the `kafka` and `kafka-0.11` connectors 
can provide exactly-once delivery guarantees.
+
+Besides enabling Flink's checkpointing, you can also choose three different 
modes of operating chosen by passing appropriate `sink.semantic` option:
+
+ * `NONE`: Flink will not guarantee anything. Produced records can be lost or 
they can be duplicated.
+ * `AT_LEAST_ONCE` (default setting): This guarantees that no records will be 
lost (although they can be duplicated).
+ * `EXACTLY_ONCE`: Kafka transactions will be used to provide exactly-once 
semantic. Whenever you write

Review comment:
       ```suggestion
    * `exactly-once`: Kafka transactions will be used to provide exactly-once 
semantic. Whenever you write
   ```

##########
File path: docs/dev/table/connectors/kafka.md
##########
@@ -207,6 +214,19 @@ However, it will cause a lot of network connections 
between all the Flink instan
 
 By default, a Kafka sink ingests data with at-least-once guarantees into a 
Kafka topic if the query is executed with [checkpointing enabled]({% link 
dev/stream/state/checkpointing.md %}#enabling-and-configuring-checkpointing).
 
+With Flink's checkpointing enabled, the `kafka` and `kafka-0.11` connectors 
can provide exactly-once delivery guarantees.
+
+Besides enabling Flink's checkpointing, you can also choose three different 
modes of operating chosen by passing appropriate `sink.semantic` option:
+
+ * `NONE`: Flink will not guarantee anything. Produced records can be lost or 
they can be duplicated.

Review comment:
       ```suggestion
    * `none`: Flink will not guarantee anything. Produced records can be lost 
or they can be duplicated.
   ```

##########
File path: 
flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/table/Kafka010DynamicTableFactory.java
##########
@@ -65,18 +69,27 @@ protected KafkaDynamicSinkBase createKafkaTableSink(
                        String topic,
                        Properties properties,
                        Optional<FlinkKafkaPartitioner<RowData>> partitioner,
-                       EncodingFormat<SerializationSchema<RowData>> 
encodingFormat) {
+                       EncodingFormat<SerializationSchema<RowData>> 
encodingFormat,
+                       KafkaSemantic semantic) {
 
                return new Kafka010DynamicSink(
                        consumedDataType,
                        topic,
                        properties,
                        partitioner,
-                       encodingFormat);
+                       encodingFormat,
+                       semantic);
        }
 
        @Override
        public String factoryIdentifier() {
                return IDENTIFIER;
        }
+
+       @Override
+       public Set<ConfigOption<?>> optionalOptions() {
+               final Set<ConfigOption<?>> options = super.optionalOptions();
+               options.remove(SINK_SEMANTIC);

Review comment:
       Add a comment on this to explain why we remove sink semantic in 0.10




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to