eric yu created FLINK-25126: ------------------------------- Summary: when SET 'execution.runtime-mode' = 'batch' and 'sink.delivery-guarantee' = 'exactly-once',kafka conncetor will commit fail Key: FLINK-25126 URL: https://issues.apache.org/jira/browse/FLINK-25126 Project: Flink Issue Type: Bug Components: Connectors / Kafka Affects Versions: 1.14.0 Environment: SET 'execution.runtime-mode' = 'batch';
CREATE TABLE ka15 ( name String, cnt bigint ) WITH ( 'connector' = 'kafka', 'topic' = 'shifang8888', 'properties.bootstrap.servers' = 'flinkx1:9092', 'properties.transaction.timeout.ms' = '800000', 'properties.max.block.ms' = '300000', 'value.format' = 'json', 'sink.parallelism' = '2', 'sink.delivery-guarantee' = 'exactly-once', 'sink.transactional-id-prefix' = 'dtstack9999'); insert into ka15 SELECT name, cnt FROM (VALUES ('Bob',100), ('Alice',100), ('Greg',100), ('Bob',100)) AS NameTable(name,cnt); Reporter: eric yu flinksql task submitted by sql client will failed: Caused by: java.lang.IllegalStateException at org.apache.flink.util.Preconditions.checkState(Preconditions.java:177) at org.apache.flink.connector.kafka.sink.FlinkKafkaInternalProducer.setTransactionId(FlinkKafkaInternalProducer.java:164) at org.apache.flink.connector.kafka.sink.KafkaCommitter.getRecoveryProducer(KafkaCommitter.java:144) at org.apache.flink.connector.kafka.sink.KafkaCommitter.lambda$commit$0(KafkaCommitter.java:76) at java.util.Optional.orElseGet(Optional.java:267) at org.apache.flink.connector.kafka.sink.KafkaCommitter.commit(KafkaCommitter.java:76) ... 14 more i found the reason why kafka commit failed, when downstream operator CommitterOperator was commiting transaction, the upstream operator SinkOperator has closed , it will abort the transaction which is committing by CommitterOperator, this is occurs when execution.runtime-mode is batch -- This message was sent by Atlassian Jira (v8.20.1#820001)