chaiyongqiang created FLINK-14719: ------------------------------------- Summary: Making Semantic configurable in Flinkkafkaproducer to support exactly-once semantic in Table API Key: FLINK-14719 URL: https://issues.apache.org/jira/browse/FLINK-14719 Project: Flink Issue Type: Improvement Components: Connectors / Kafka Affects Versions: 1.8.0 Reporter: chaiyongqiang
Flink supports kafka transaction with FlinkKafkaProducer and FlinkKafkaProducer011 . When we use Datastream API , it's able to realize exactly once semantic . But when we use Table API, things are different. The createKafkaProducer method in KafkaTableSink is used to create FlinkKafkaProducer to sending messages to Kafka server. It's like : {code:java} protected SinkFunction<Row> createKafkaProducer( String topic, Properties properties, SerializationSchema<Row> serializationSchema, Optional<FlinkKafkaPartitioner<Row>> partitioner) { return new FlinkKafkaProducer<>( topic, new KeyedSerializationSchemaWrapper<>(serializationSchema), properties, partitioner); } {code} when we get into the constructor of FlinkKafkaProducer we can see this will lead to an at_least_once semantic producer : {code:java} public FlinkKafkaProducer( String defaultTopicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, Optional<FlinkKafkaPartitioner<IN>> customPartitioner) { this( defaultTopicId, serializationSchema, producerConfig, customPartitioner, FlinkKafkaProducer.Semantic.AT_LEAST_ONCE, DEFAULT_KAFKA_PRODUCERS_POOL_SIZE); } {code} This makes user could not achieve exactly-once semantic when using Table API. -- This message was sent by Atlassian Jira (v8.3.4#803005)