eric yu created FLINK-25126:
-------------------------------

             Summary: when SET 'execution.runtime-mode' = 'batch' and 
'sink.delivery-guarantee' = 'exactly-once',kafka conncetor will commit fail
                 Key: FLINK-25126
                 URL: https://issues.apache.org/jira/browse/FLINK-25126
             Project: Flink
          Issue Type: Bug
          Components: Connectors / Kafka
    Affects Versions: 1.14.0
         Environment: SET 'execution.runtime-mode' = 'batch';

 CREATE TABLE ka15 (
     name String,
     cnt bigint
 ) WITH (
   'connector' = 'kafka',
   'topic' = 'shifang8888',
  'properties.bootstrap.servers' = 'flinkx1:9092',
  'properties.transaction.timeout.ms' = '800000',
  'properties.max.block.ms' = '300000',
   'value.format' = 'json',
   'sink.parallelism' = '2',
   'sink.delivery-guarantee' = 'exactly-once',
   'sink.transactional-id-prefix' = 'dtstack9999');

 

insert into ka15 SELECT
  name,
  cnt
FROM
  (VALUES ('Bob',100), ('Alice',100), ('Greg',100), ('Bob',100)) AS 
NameTable(name,cnt);
            Reporter: eric yu


flinksql task submitted by sql client will failed:
 
Caused by: java.lang.IllegalStateException
at org.apache.flink.util.Preconditions.checkState(Preconditions.java:177)
at 
org.apache.flink.connector.kafka.sink.FlinkKafkaInternalProducer.setTransactionId(FlinkKafkaInternalProducer.java:164)
at 
org.apache.flink.connector.kafka.sink.KafkaCommitter.getRecoveryProducer(KafkaCommitter.java:144)
at 
org.apache.flink.connector.kafka.sink.KafkaCommitter.lambda$commit$0(KafkaCommitter.java:76)
at java.util.Optional.orElseGet(Optional.java:267)
at 
org.apache.flink.connector.kafka.sink.KafkaCommitter.commit(KafkaCommitter.java:76)
... 14 more
 
 
i found the reason why  kafka commit failed, when downstream operator 
CommitterOperator was commiting transaction, the upstream  operator 
SinkOperator has closed , it will abort the transaction which  is committing by 
CommitterOperator, this is occurs when execution.runtime-mode is batch



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to