chaiyongqiang created FLINK-14719:
-------------------------------------

             Summary: Making  Semantic configurable in Flinkkafkaproducer to 
support exactly-once semantic in Table API
                 Key: FLINK-14719
                 URL: https://issues.apache.org/jira/browse/FLINK-14719
             Project: Flink
          Issue Type: Improvement
          Components: Connectors / Kafka
    Affects Versions: 1.8.0
            Reporter: chaiyongqiang


Flink supports kafka transaction with FlinkKafkaProducer and 
FlinkKafkaProducer011 . When we use Datastream API , it's able to realize 
exactly once semantic .  But when we use Table API, things are different. 

The createKafkaProducer method in KafkaTableSink is used to create 
FlinkKafkaProducer to sending messages to Kafka server.  It's like :


{code:java}
protected SinkFunction<Row> createKafkaProducer(
                String topic,
                Properties properties,
                SerializationSchema<Row> serializationSchema,
                Optional<FlinkKafkaPartitioner<Row>> partitioner) {
                return new FlinkKafkaProducer<>(
                        topic,
                        new 
KeyedSerializationSchemaWrapper<>(serializationSchema),
                        properties,
                        partitioner);
        }
{code}

when we get into the constructor of FlinkKafkaProducer we can see this will 
lead to an at_least_once semantic producer :


{code:java}
        public FlinkKafkaProducer(
                String defaultTopicId,
                KeyedSerializationSchema<IN> serializationSchema,
                Properties producerConfig,
                Optional<FlinkKafkaPartitioner<IN>> customPartitioner) {
                this(
                        defaultTopicId,
                        serializationSchema,
                        producerConfig,
                        customPartitioner,
                        FlinkKafkaProducer.Semantic.AT_LEAST_ONCE,
                        DEFAULT_KAFKA_PRODUCERS_POOL_SIZE);
        }
{code}

This makes user could not achieve exactly-once semantic when using Table API. 




--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to