[ https://issues.apache.org/jira/browse/FLINK-21057?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Yun Tang updated FLINK-21057: ----------------------------- Component/s: (was: Runtime / Checkpointing) Connectors / Kafka > Streaming checkpointing with small interval leads app to hang > ------------------------------------------------------------- > > Key: FLINK-21057 > URL: https://issues.apache.org/jira/browse/FLINK-21057 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka > Affects Versions: 1.11.3 > Environment: * streaming app > * flink cluster in standalone-job / application mode > * 1.11.3 Flink version > * jobmanager --> 1 instance > * taskmanager --> 1 instance > * parallelism --> 2 > Reporter: Nazar Volynets > Priority: Major > Attachments: jobmanager.log, taskmanager.log > > > There is a simple streaming app with enabled checkpointing: > * statebackend --> RockDB > * mode --> EXACTLY_ONCE > STRs: > 1. Run Flink cluster in standalone-job / application mode (with embedded > streaming app) > 2. Get error > 3. Wait 1 min > 4. Stop Flink cluster > 4. Repeat steps from 1 to 3 util error : > {code:java|title=taskmanager} > org.apache.kafka.common.KafkaException: Unexpected error in > InitProducerIdResponse; Producer attempted an operation with an old epoch. > Either there is a newer producer with the same transactionalId, or the > producer's transaction has been expired by the broker. > flink-kafka-mirror-maker-jobmanager | at > org.apache.kafka.clients.producer.internals.TransactionManager$InitProducerIdHandler.handleResponse(TransactionManager.java:1352) > ~[?:?] > flink-kafka-mirror-maker-jobmanager | at > org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1260) > ~[?:?] > flink-kafka-mirror-maker-jobmanager | at > org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109) > ~[?:?] > flink-kafka-mirror-maker-jobmanager | at > org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:572) > ~[?:?] > flink-kafka-mirror-maker-jobmanager | at > org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:564) ~[?:?] > flink-kafka-mirror-maker-jobmanager | at > org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:414) > ~[?:?] > flink-kafka-mirror-maker-jobmanager | at > org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:312) > ~[?:?] > flink-kafka-mirror-maker-jobmanager | at > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239) ~[?:?] > flink-kafka-mirror-maker-jobmanager | at java.lang.Thread.run(Unknown > Source) ~[?:?] > {code} > It is obvious > Please find below: > * streaming app code base (example) > * attached logs > ** jobmanager > ** taskmanager > *Example* > +App+ > {code:java|title=build.gradle (dependencies)} > ... > ext { > ... > javaVersion = '11' > flinkVersion = '1.12.0' > scalaBinaryVersion = '2.11' > ... > } > dependencies { > ... > implementation > "org.apache.flink:flink-streaming-java_${scalaBinaryVersion}:${flinkVersion}" > implementation > "org.apache.flink:flink-clients_${scalaBinaryVersion}:${flinkVersion}" > implementation > "org.apache.flink:flink-statebackend-rocksdb_${scalaBinaryVersion}:${flinkVersion}" > ... > } > {code} > {code:java|title=App} > public static void main(String[] args) { > ... > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.setParallelism(2); > env.enableCheckpointing(500); > env.setStateBackend(new > RocksDBStateBackend("file:///xxx/config/checkpoints/rocksdb", true)); > > env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); > env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000); > env.getCheckpointConfig().setCheckpointTimeout(600000); > env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); > FlinkKafkaConsumer<Record> consumer = createConsumer(); > FlinkKafkaProducer<Record> producer = createProducer(); > env > .addSource(consumer) > .uid("kafka-consumer") > .addSink(producer) > .uid("kafka-producer") > ; > env.execute(); > } > public static FlinkKafkaConsumer<Record> createConsumer() { > ... > Properties props = new Properties(); > props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, > "kafka-source-1:9091"); > ... // nothing special > props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); > FlinkKafkaConsumer<Record> consumer = new FlinkKafkaConsumer<>("topic-1", > new RecordKafkaDerSchema(), props); > ... // RecordKafkaDerSchema --> custom schema is used to copy not only > message body but message key too > ... // SimpleStringSchema --> can be used instead to reproduce issue > consumer.setStartFromGroupOffsets(); > consumer.setCommitOffsetsOnCheckpoints(true); > return consumer; > } > public static FlinkKafkaProducer<Record> createProducer() { > ... > Properties props = new Properties(); > ... > props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, > "kafka-target-1:9094"); > props.setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, > "1"); > props.setProperty(ProducerConfig.ACKS_CONFIG, "all"); > props.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, "16384"); > props.setProperty(ProducerConfig.LINGER_MS_CONFIG, "1000"); > props.setProperty(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "2000"); > props.setProperty(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, "9000"); > props.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true"); > props.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "xxx"); // > ignored due to expected behaviour - > https://issues.apache.org/jira/browse/FLINK-17691 > props.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, "" + (15 * 60 > * 1000)); // decreased from 1 hour to 15 mins; app is going to be restarted > quickly > ... > FlinkKafkaProducer<Record> producer = new FlinkKafkaProducer<>("topic-1", > new RecordKafkaSerSchema(true), props, > FlinkKafkaProducer.Semantic.EXACTLY_ONCE); > ... // RecordKafkaSerSchema --> custom schema is used to copy not only > message body but message key too > ... // SimpleStringSchema --> can be used instead to reproduce issue > return producer; > } > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)