Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/6387#discussion_r204342102 --- Diff: flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSink.java --- @@ -58,14 +65,23 @@ public Kafka010JsonTableSink(String topic, Properties properties) { * @param topic topic in Kafka to which table is written * @param properties properties to connect to Kafka * @param partitioner Kafka partitioner + * @deprecated Use table descriptors instead of implementation-specific classes. */ + @Deprecated public Kafka010JsonTableSink(String topic, Properties properties, FlinkKafkaPartitioner<Row> partitioner) { super(topic, properties, partitioner); } @Override protected FlinkKafkaProducerBase<Row> createKafkaProducer(String topic, Properties properties, SerializationSchema<Row> serializationSchema, FlinkKafkaPartitioner<Row> partitioner) { - return new FlinkKafkaProducer010<>(topic, serializationSchema, properties, partitioner); + final FlinkKafkaProducerBase<Row> kafkaProducer = new FlinkKafkaProducer010<>( + topic, + serializationSchema, + properties, + partitioner); + // always enable flush on checkpoint to achieve at-least-once if query runs with checkpointing enabled. --- End diff -- 1. is this relevant to the pr/commit? 2. this value is set to true by default, thus please either drop this or add `checkState`.
---