wuchong commented on a change in pull request #12805: URL: https://github.com/apache/flink/pull/12805#discussion_r449967255
########## File path: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTestBase.java ########## @@ -386,6 +411,7 @@ private CatalogTable createKafkaSinkCatalogTable(Map<String, String> options) { tableOptions.put("properties.group.id", "dummy"); tableOptions.put("properties.bootstrap.servers", "dummy"); tableOptions.put("sink.partitioner", KafkaOptions.SINK_PARTITIONER_VALUE_FIXED); + tableOptions.put("sink.semantic", KafkaOptions.SINK_SEMANTIC_VALUE_AT_LEAST_ONCE); Review comment: use exactly-once to verify the configuration works, because default value is at-least-once. ########## File path: docs/dev/table/connectors/kafka.md ########## @@ -165,6 +165,14 @@ Connector Options </ul> </td> </tr> + <tr> + <td><h5>sink.semantic</h5></td> + <td>optional</td> + <td style="word-wrap: break-word;">at-least-once</td> + <td>String</td> + <td>Optional semantic when commit. Valid enumerationns are ['at-lease-once', 'exactly-once', 'none']. + Only Kafka whose version greater than 1.0.0 support 'exactly-once' with checkpointing enabled.</td> Review comment: ```suggestion <td>Defines the delivery semantic for the Kafka sink. Valid enumerationns are <code>'at-lease-once'</code>, <code>'exactly-once'</code> and <code>'none'</code>. <code>'kafka-0.10'</code> doesn't support this option. See <a href='#consistency-guarantees'>Consistency guarantees</a> for more details. </td> ``` ########## File path: docs/dev/table/connectors/kafka.md ########## @@ -206,6 +214,8 @@ However, it will cause a lot of network connections between all the Flink instan ### Consistency guarantees By default, a Kafka sink ingests data with at-least-once guarantees into a Kafka topic if the query is executed with [checkpointing enabled]({% link dev/stream/state/checkpointing.md %}#enabling-and-configuring-checkpointing). +With Kafka whose version is greater than 1.0.0, `sink.semantic` can provide exactly-once delivery guarantee. Whenever you write to Kafka using transactions, do not forget about setting the desired `isolation.level` +(`read_committed` or `read_uncommitted` - latter one is the default value) for any application consuming records from Kafka. Review comment: ```suggestion With Flink's checkpointing enabled, the `kafka` and `kafka-0.11` connectors can provide exactly-once delivery guarantees. Besides enabling Flink's checkpointing, you can also choose three different modes of operating chosen by passing appropriate `sink.semantic` option: * `NONE`: Flink will not guarantee anything. Produced records can be lost or they can be duplicated. * `AT_LEAST_ONCE` (default setting): This guarantees that no records will be lost (although they can be duplicated). * `EXACTLY_ONCE`: Kafka transactions will be used to provide exactly-once semantic. Whenever you write to Kafka using transactions, do not forget about setting desired `isolation.level` (`read_committed` or `read_uncommitted` - the latter one is the default value) for any application consuming records from Kafka. Please refer to [Kafka documentation]({% link dev/connectors/kafka.md %}#kafka-producers-and-fault-tolerance) for more caveats about delivery guarantees. ``` ########## File path: flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/table/Kafka011DynamicSink.java ########## @@ -42,26 +44,31 @@ public Kafka011DynamicSink( String topic, Properties properties, Optional<FlinkKafkaPartitioner<RowData>> partitioner, - EncodingFormat<SerializationSchema<RowData>> encodingFormat) { + EncodingFormat<SerializationSchema<RowData>> encodingFormat, + String semantic) { super( consumedDataType, topic, properties, partitioner, - encodingFormat); + encodingFormat, + semantic); } @Override protected SinkFunction<RowData> createKafkaProducer( String topic, Properties properties, SerializationSchema<RowData> serializationSchema, - Optional<FlinkKafkaPartitioner<RowData>> partitioner) { + Optional<FlinkKafkaPartitioner<RowData>> partitioner, + String semantic) { return new FlinkKafkaProducer011<>( topic, - serializationSchema, + new KeyedSerializationSchemaWrapper<>(serializationSchema), properties, - partitioner); + partitioner, + getSemantic(semantic), Review comment: I suggest to move this logic into `KafkaOptions` to avoid duplicate code. You can have a table level enum class `SinkSemantic`. And simply call `FlinkKafkaProducer.Semantic.valueOf(semantic.name())` here. ########## File path: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaOptions.java ########## @@ -191,6 +208,21 @@ private static void validateSinkPartitioner(ReadableConfig tableOptions) { }); } + private static void validateSinkSemantic(ReadableConfig tableOptions, String kafkaVersion) { + tableOptions.getOptional(SINK_SEMANTIC).ifPresent(semantic -> { + if (!SINK_SEMANTIC_ENUMS.contains(semantic)){ + throw new ValidationException( + String.format("Unsupported value '%s' for '%s'. Supported values are ['at-least-once', 'exactly-once', 'none'].", + semantic, SINK_SEMANTIC.key())); + } + + if (kafkaVersion.equals("kafka-0.10") && (!SINK_SEMANTIC_VALUE_AT_LEAST_ONCE.equals(semantic))){ Review comment: This is indeed what we should avoid. A base class shouldn't depends on specific connector implementation. We should move this special logic into `Kafka010DynamicTableFactory`. A simple way to disallow this option in kafka-0.10 is override `optionalOptions()` and remove `SINK_SEMANTIC` from it. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org