[ https://issues.apache.org/jira/browse/FLINK-14719?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16972191#comment-16972191 ]
chaiyongqiang commented on FLINK-14719: --------------------------------------- when i check the code in branch Flink 1.9 and the master, the constructor in FlinkKafkaProducer 1.8 becomes @deprecated. In the newer version of Flink, we could modify the createKafkaProducer method in KafkaTableSinkBase and all the classes which extend KafkaTableSinkBase to support exactly-once Semantic API in Flink. +I could open a new issue to tracking this.+ But for branch flink 1.8 , a light weight method would help. We could achieve the semantic config and set it in the constructor in the following way. {code:java} /** * Configuration key for disabling the metrics reporting. */ public static final String KEY_DISABLE_METRICS = "flink.disable-metrics"; /** * Configuration key for setting the producer semantic. */ public static final String KEY_SEMANTIC = "flink.semantic"; public FlinkKafkaProducer( String defaultTopicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, Optional<FlinkKafkaPartitioner<IN>> customPartitioner) { this( defaultTopicId, serializationSchema, producerConfig, customPartitioner, Semantic.valueOf(producerConfig.getProperty(KEY_SEMANTIC, Semantic.AT_LEAST_ONCE.name()).toUpperCase()), DEFAULT_KAFKA_PRODUCERS_POOL_SIZE); } {code} Could someone offer me some advice ? Many thans. > Making Semantic configurable in Flinkkafkaproducer to support exactly-once > semantic in Table API > ------------------------------------------------------------------------------------------------- > > Key: FLINK-14719 > URL: https://issues.apache.org/jira/browse/FLINK-14719 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kafka > Affects Versions: 1.8.0 > Reporter: chaiyongqiang > Priority: Major > > Flink supports kafka transaction with FlinkKafkaProducer and > FlinkKafkaProducer011 . When we use Datastream API , it's able to realize > exactly once semantic . But when we use Table API, things are different. > The createKafkaProducer method in KafkaTableSink is used to create > FlinkKafkaProducer to sending messages to Kafka server. It's like : > {code:java} > protected SinkFunction<Row> createKafkaProducer( > String topic, > Properties properties, > SerializationSchema<Row> serializationSchema, > Optional<FlinkKafkaPartitioner<Row>> partitioner) { > return new FlinkKafkaProducer<>( > topic, > new > KeyedSerializationSchemaWrapper<>(serializationSchema), > properties, > partitioner); > } > {code} > when we get into the constructor of FlinkKafkaProducer we can see this will > lead to an at_least_once semantic producer : > {code:java} > public FlinkKafkaProducer( > String defaultTopicId, > KeyedSerializationSchema<IN> serializationSchema, > Properties producerConfig, > Optional<FlinkKafkaPartitioner<IN>> customPartitioner) { > this( > defaultTopicId, > serializationSchema, > producerConfig, > customPartitioner, > FlinkKafkaProducer.Semantic.AT_LEAST_ONCE, > DEFAULT_KAFKA_PRODUCERS_POOL_SIZE); > } > {code} > This makes user could not achieve exactly-once semantic when using Table API. -- This message was sent by Atlassian Jira (v8.3.4#803005)