Hi Oleg,
this sounds indeed like abnormal behavior. Are you sure that these large
checkpoints are related to the Kafka consumer only? Are there other
operators in the pipeline? Because internally the state kept in a Kafka
consumer is pretty minimal and only related to Kafka partition and
offset management.
If you are sure that the Kafka consumer must produce such a state size,
I would recommend to use a remote debugger and check what is
checkpointed in the corresponding `FlinkKafkaConsumerBase#snapshotState`.
Regards,
Timo
On 15.04.20 03:37, Oleg Vysotsky wrote:
Hello,
Sometime our flink job starts creating large checkpoints which include
55 Gb (instead of 2 MB) related to kafka source. After the flink job
creates first “abnormal” checkpoint all next checkpoints are “abnormal”
as well. Flink job can’t be restored from such checkpoint. Restoring
from the checkpoint hangs/fails. Also flnk dashboard hangs and flink
cluster crashs during the restoring from such checkpoint. We didn’t
catch related error message. Also we don’t find clear way to reproduce
this problem (when the flink job creates “abnormal” checkpoints).
Configuration:
We are using flink 1.8.1 on emr (emr 5.27)
Kafka: confluence kafka 5.4.1
Flink kafka connector:
org.apache.flink:flink-connector-kafka_2.11:1.8.1 (it includes
org.apache.kafka:kafka-clients:2.0.1 dependencies)
Our input kafka topic has 32 partitions and related flink source has 32
parallelism
We use pretty much all default flink kafka concumer setting. We only
specified:
CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
ConsumerConfig.GROUP_ID_CONFIG,
CommonClientConfigs.SECURITY_PROTOCOL_CONFIG
Thanks a lot in advance!
Oleg