[ 
https://issues.apache.org/jira/browse/FLINK-14719?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16972191#comment-16972191
 ] 

chaiyongqiang commented on FLINK-14719:
---------------------------------------

when i check the code in branch Flink 1.9 and the master, the constructor in 
FlinkKafkaProducer 1.8 becomes @deprecated. In the newer version of Flink, we 
could modify the createKafkaProducer method in KafkaTableSinkBase and all the 
classes which extend KafkaTableSinkBase to support exactly-once Semantic API in 
Flink.  +I could open a new issue to tracking this.+

But for branch flink 1.8 , a light weight method would help. We could achieve 
the semantic config and set it in the constructor in the following way.
{code:java}

/**
* Configuration key for disabling the metrics reporting.
*/
public static final String KEY_DISABLE_METRICS = "flink.disable-metrics";
/**
* Configuration key for setting the producer semantic.
*/
public static final String KEY_SEMANTIC = "flink.semantic";

public FlinkKafkaProducer(
                String defaultTopicId,
                KeyedSerializationSchema<IN> serializationSchema,
                Properties producerConfig,
                Optional<FlinkKafkaPartitioner<IN>> customPartitioner) {
                this(
                        defaultTopicId,
                        serializationSchema,
                        producerConfig,
                        customPartitioner,
                        
Semantic.valueOf(producerConfig.getProperty(KEY_SEMANTIC, 
Semantic.AT_LEAST_ONCE.name()).toUpperCase()),
                        DEFAULT_KAFKA_PRODUCERS_POOL_SIZE);
        }
{code}

Could someone offer me some advice ? Many thans.

> Making  Semantic configurable in Flinkkafkaproducer to support exactly-once 
> semantic in Table API
> -------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-14719
>                 URL: https://issues.apache.org/jira/browse/FLINK-14719
>             Project: Flink
>          Issue Type: Improvement
>          Components: Connectors / Kafka
>    Affects Versions: 1.8.0
>            Reporter: chaiyongqiang
>            Priority: Major
>
> Flink supports kafka transaction with FlinkKafkaProducer and 
> FlinkKafkaProducer011 . When we use Datastream API , it's able to realize 
> exactly once semantic .  But when we use Table API, things are different. 
> The createKafkaProducer method in KafkaTableSink is used to create 
> FlinkKafkaProducer to sending messages to Kafka server.  It's like :
> {code:java}
> protected SinkFunction<Row> createKafkaProducer(
>               String topic,
>               Properties properties,
>               SerializationSchema<Row> serializationSchema,
>               Optional<FlinkKafkaPartitioner<Row>> partitioner) {
>               return new FlinkKafkaProducer<>(
>                       topic,
>                       new 
> KeyedSerializationSchemaWrapper<>(serializationSchema),
>                       properties,
>                       partitioner);
>       }
> {code}
> when we get into the constructor of FlinkKafkaProducer we can see this will 
> lead to an at_least_once semantic producer :
> {code:java}
>       public FlinkKafkaProducer(
>               String defaultTopicId,
>               KeyedSerializationSchema<IN> serializationSchema,
>               Properties producerConfig,
>               Optional<FlinkKafkaPartitioner<IN>> customPartitioner) {
>               this(
>                       defaultTopicId,
>                       serializationSchema,
>                       producerConfig,
>                       customPartitioner,
>                       FlinkKafkaProducer.Semantic.AT_LEAST_ONCE,
>                       DEFAULT_KAFKA_PRODUCERS_POOL_SIZE);
>       }
> {code}
> This makes user could not achieve exactly-once semantic when using Table API. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to