Thank you for the information. I have a feeling this is more to do with 
EXACTLY_ONCE kafka producers and transactions not playing nice with checkpoints 
and a timeout happens. The jobs seem to fail and hit this restart and fail 
loop. Looking in the logs, taskmanager logs grow very large with the same 
messages repeating over and over again. Ive attacked a file for this. The two 
lines that give me pause are:


Closing the Kafka producer with timeoutMillis = 0 ms.

Proceeding to force close the producer since pending requests could not be 
completed within timeout 0 ms.


I'm not really sure which timeout this is but it looks like there is a timeout 
loop happening here.


The Kafka producer has been configured as such (the transaction timeout has 
been set on the kafka server to match the producer):


Properties kafkaProducerProps = new Properties();
kafkaProducerProps.setProperty("bootstrap.servers", brokerList);
kafkaProducerProps.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 
"3600000");
kafkaProducerProps.setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,
 "5");
kafkaProducerProps.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, 
UUID.randomUUID().toString());
kafkaProducerProps.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, 
"true");
kafkaProducerProps.setProperty(ProducerConfig.LINGER_MS_CONFIG, "500");
kafkaProducerProps.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, "16384");
kafkaProducerProps.setProperty(ProducerConfig.BUFFER_MEMORY_CONFIG, "33554432");
kafkaProducerProps.setProperty(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 
"30000");
kafkaProducerProps.setProperty(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 
"120000");

FlinkKafkaProducer<String> myProducer =
    new FlinkKafkaProducer<>(
        producerTopic,
        (KafkaSerializationSchema<String>) (value, aLong) -> {
            return new ProducerRecord<>(producerTopic, value.getBytes());
        },
        kafkaProducerProps,
        Semantic.EXACTLY_ONCE,
        10);


And checkpoints have been configured as such:


StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
// configuring RocksDB state backend to use HDFS
String backupFolder = props.getProperty("hdfs.backupFolder");
StateBackend backend = new RocksDBStateBackend(backupFolder, true);
env.setStateBackend(backend);
// start a checkpoint based on supplied interval
env.enableCheckpointing(checkpointInterval);
// set mode to exactly-once (this is the default)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// make sure progress happen between checkpoints
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(checkpointInterval);
// checkpoints have to complete within two minute, or are discarded
env.getCheckpointConfig().setCheckpointTimeout(380000);
//env.getCheckpointConfig().setTolerableCheckpointFailureNumber();
// no external services which could take some time to respond, therefore 1
// allow only one checkpoint to be in progress at the same time
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// enable externalized checkpoints which are deleted after job cancellation
env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION);


Additionally, each taskmanager has been configured with 4GB of memory, there is 
a sliding window of 10 seconds with a slide of 1 second, and the cluster setup 
is using flink native.


Any hints would be much appreciated!


Regards,

M.


________________________________
From: Guowei Ma <guowei....@gmail.com>
Sent: 01 April 2021 14:19
To: Geldenhuys, Morgan Karl
Cc: user
Subject: Re: Checkpoint timeouts at times of high load

Hi,
I think there are many reasons that could lead to the checkpoint timeout.
Would you like to share some detailed information of checkpoint? For example, 
the detailed checkpoint information from the web.[1]  And which Flink version 
do you use?

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/ops/monitoring/checkpoint_monitoring.html

Best,
Guowei


On Thu, Apr 1, 2021 at 4:33 PM Geldenhuys, Morgan Karl 
<morgan.geldenh...@tu-berlin.de<mailto:morgan.geldenh...@tu-berlin.de>> wrote:

Hi Community,


I have a number of flink jobs running inside my session cluster with varying 
checkpoint intervals plus a large amount of operator state and in times of high 
load, the jobs fail due to checkpoint timeouts (set to 6 minutes). I can only 
assume this is because the latencies for saving checkpoints at these times of 
high load increase. I have a 30 node HDFS cluster for checkpoints... however I 
see that only 4 of these nodes are being used for storage. Is there a way of 
ensuring the load is evenly spread? Could there be another reason for these 
checkpoint timeouts? Events are being consumed from kafka, to kafka with 
EXACTLY ONCE guarantees enabled.


Thank you very much!


M.
2021-04-05 07:38:16,437 INFO  org.apache.kafka.clients.producer.KafkaProducer              [] - [Producer clientId=producer-Sink: KafkaSink-T6TNkBpAGd-46f8730428df9ecd6d7318a02bdc405e-20, transactionalId=Sink: KafkaSink-T6TNkBpAGd-46f8730428df9ecd6d7318a02bdc405e-20] Instantiated a transactional producer.
2021-04-05 07:38:16,439 INFO  org.apache.kafka.clients.producer.KafkaProducer              [] - [Producer clientId=producer-Sink: KafkaSink-T6TNkBpAGd-46f8730428df9ecd6d7318a02bdc405e-20, transactionalId=Sink: KafkaSink-T6TNkBpAGd-46f8730428df9ecd6d7318a02bdc405e-20] Overriding the default retries config to the recommended value of 2147483647 since the idempotent producer is enabled.
2021-04-05 07:38:16,439 INFO  org.apache.kafka.clients.producer.KafkaProducer              [] - [Producer clientId=producer-Sink: KafkaSink-T6TNkBpAGd-46f8730428df9ecd6d7318a02bdc405e-20, transactionalId=Sink: KafkaSink-T6TNkBpAGd-46f8730428df9ecd6d7318a02bdc405e-20] Overriding the default acks to all since idempotence is enabled.
2021-04-05 07:38:16,440 INFO  org.apache.kafka.common.utils.AppInfoParser                  [] - Kafka version: 2.4.1
2021-04-05 07:38:16,440 INFO  org.apache.kafka.common.utils.AppInfoParser                  [] - Kafka commitId: c57222ae8cd7866b
2021-04-05 07:38:16,440 INFO  org.apache.kafka.common.utils.AppInfoParser                  [] - Kafka startTimeMs: 1617608296440
2021-04-05 07:38:16,440 INFO  org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer [] - Starting FlinkKafkaInternalProducer (3/10) to produce into default topic iot-vehicles-notifications-1viyYcBkQo
2021-04-05 07:38:16,442 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=producer-Sink: KafkaSink-T6TNkBpAGd-46f8730428df9ecd6d7318a02bdc405e-20, transactionalId=Sink: KafkaSink-T6TNkBpAGd-46f8730428df9ecd6d7318a02bdc405e-20] ProducerId set to -1 with epoch -1
2021-04-05 07:38:16,443 INFO  org.apache.kafka.clients.Metadata                            [] - [Producer clientId=producer-Sink: KafkaSink-T6TNkBpAGd-46f8730428df9ecd6d7318a02bdc405e-20, transactionalId=Sink: KafkaSink-T6TNkBpAGd-46f8730428df9ecd6d7318a02bdc405e-20] Cluster ID: 2xzHxtHORb-ATUKF8FGQjQ
2021-04-05 07:38:16,546 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=producer-Sink: KafkaSink-T6TNkBpAGd-46f8730428df9ecd6d7318a02bdc405e-20, transactionalId=Sink: KafkaSink-T6TNkBpAGd-46f8730428df9ecd6d7318a02bdc405e-20] ProducerId set to 8625 with epoch 4
2021-04-05 07:38:17,977 INFO  org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction [] - FlinkKafkaProducer 3/10 - checkpoint 39 complete, committing transaction TransactionHolder{handle=KafkaTransactionState [transactionalId=Sink: KafkaSink-T6TNkBpAGd-46f8730428df9ecd6d7318a02bdc405e-21, producerId=5615, epoch=4], transactionStartTime=1617608271462} from checkpoint 39
2021-04-05 07:38:17,978 INFO  org.apache.flink.streaming.connectors.kafka.internals.FlinkKafkaInternalProducer [] - Flushing new partitions
2021-04-05 07:38:17,978 INFO  org.apache.kafka.clients.producer.KafkaProducer              [] - [Producer clientId=producer-Sink: KafkaSink-T6TNkBpAGd-46f8730428df9ecd6d7318a02bdc405e-21, transactionalId=Sink: KafkaSink-T6TNkBpAGd-46f8730428df9ecd6d7318a02bdc405e-21] Closing the Kafka producer with timeoutMillis = 0 ms.
2021-04-05 07:38:17,978 INFO  org.apache.kafka.clients.producer.KafkaProducer              [] - [Producer clientId=producer-Sink: KafkaSink-T6TNkBpAGd-46f8730428df9ecd6d7318a02bdc405e-21, transactionalId=Sink: KafkaSink-T6TNkBpAGd-46f8730428df9ecd6d7318a02bdc405e-21] Proceeding to force close the producer since pending requests could not be completed within timeout 0 ms.
2021-04-05 07:38:40,287 INFO  org.apache.flink.streaming.connectors.kafka.internals.FlinkKafkaInternalProducer [] - Flushing new partitions
2021-04-05 07:38:40,288 INFO  org.apache.kafka.clients.producer.ProducerConfig             [] - ProducerConfig values: 
	acks = 1
	batch.size = 16384
	bootstrap.servers = [130.149.249.46:32690]
	buffer.memory = 33554432
	client.dns.lookup = default
	client.id = 
	compression.type = none
	connections.max.idle.ms = 540000
	delivery.timeout.ms = 120000
	enable.idempotence = true
	interceptor.classes = []
	key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
	linger.ms = 500
	max.block.ms = 60000
	max.in.flight.requests.per.connection = 5
	max.request.size = 1048576
	metadata.max.age.ms = 300000
	metric.reporters = []
	metrics.num.samples = 2
	metrics.recording.level = INFO
	metrics.sample.window.ms = 30000
	partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
	receive.buffer.bytes = 32768
	reconnect.backoff.max.ms = 1000
	reconnect.backoff.ms = 50
	request.timeout.ms = 30000
	retries = 2147483647
	retry.backoff.ms = 100
	sasl.client.callback.handler.class = null
	sasl.jaas.config = null
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.login.callback.handler.class = null
	sasl.login.class = null
	sasl.login.refresh.buffer.seconds = 300
	sasl.login.refresh.min.period.seconds = 60
	sasl.login.refresh.window.factor = 0.8
	sasl.login.refresh.window.jitter = 0.05
	sasl.mechanism = GSSAPI
	security.protocol = PLAINTEXT
	security.providers = null
	send.buffer.bytes = 131072
	ssl.cipher.suites = null
	ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
	ssl.endpoint.identification.algorithm = https
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.location = null
	ssl.keystore.password = null
	ssl.keystore.type = JKS
	ssl.protocol = TLS
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.location = null
	ssl.truststore.password = null
	ssl.truststore.type = JKS
	transaction.timeout.ms = 3600000
	transactional.id = Sink: KafkaSink-T6TNkBpAGd-46f8730428df9ecd6d7318a02bdc405e-29
	value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer

2021-04-05 07:38:40,289 INFO  org.apache.kafka.clients.producer.KafkaProducer              [] - [Producer clientId=producer-Sink: KafkaSink-T6TNkBpAGd-46f8730428df9ecd6d7318a02bdc405e-29, transactionalId=Sink: KafkaSink-T6TNkBpAGd-46f8730428df9ecd6d7318a02bdc405e-29] Instantiated a transactional producer.
2021-04-05 07:38:40,293 INFO  org.apache.kafka.clients.producer.KafkaProducer              [] - [Producer clientId=producer-Sink: KafkaSink-T6TNkBpAGd-46f8730428df9ecd6d7318a02bdc405e-29, transactionalId=Sink: KafkaSink-T6TNkBpAGd-46f8730428df9ecd6d7318a02bdc405e-29] Overriding the default retries config to the recommended value of 2147483647 since the idempotent producer is enabled.
2021-04-05 07:38:40,293 INFO  org.apache.kafka.clients.producer.KafkaProducer              [] - [Producer clientId=producer-Sink: KafkaSink-T6TNkBpAGd-46f8730428df9ecd6d7318a02bdc405e-29, transactionalId=Sink: KafkaSink-T6TNkBpAGd-46f8730428df9ecd6d7318a02bdc405e-29] Overriding the default acks to all since idempotence is enabled.
2021-04-05 07:38:40,294 INFO  org.apache.kafka.common.utils.AppInfoParser                  [] - Kafka version: 2.4.1
2021-04-05 07:38:40,294 INFO  org.apache.kafka.common.utils.AppInfoParser                  [] - Kafka commitId: c57222ae8cd7866b
2021-04-05 07:38:40,294 INFO  org.apache.kafka.common.utils.AppInfoParser                  [] - Kafka startTimeMs: 1617608320294
2021-04-05 07:38:40,294 INFO  org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer [] - Starting FlinkKafkaInternalProducer (3/10) to produce into default topic iot-vehicles-notifications-1viyYcBkQo
2021-04-05 07:38:40,295 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=producer-Sink: KafkaSink-T6TNkBpAGd-46f8730428df9ecd6d7318a02bdc405e-29, transactionalId=Sink: KafkaSink-T6TNkBpAGd-46f8730428df9ecd6d7318a02bdc405e-29] ProducerId set to -1 with epoch -1
2021-04-05 07:38:40,297 INFO  org.apache.kafka.clients.Metadata                            [] - [Producer clientId=producer-Sink: KafkaSink-T6TNkBpAGd-46f8730428df9ecd6d7318a02bdc405e-29, transactionalId=Sink: KafkaSink-T6TNkBpAGd-46f8730428df9ecd6d7318a02bdc405e-29] Cluster ID: 2xzHxtHORb-ATUKF8FGQjQ
2021-04-05 07:38:40,399 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=producer-Sink: KafkaSink-T6TNkBpAGd-46f8730428df9ecd6d7318a02bdc405e-29, transactionalId=Sink: KafkaSink-T6TNkBpAGd-46f8730428df9ecd6d7318a02bdc405e-29] ProducerId set to 6452 with epoch 5
2021-04-05 07:38:43,170 INFO  org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction [] - FlinkKafkaProducer 3/10 - checkpoint 40 complete, committing transaction TransactionHolder{handle=KafkaTransactionState [transactionalId=Sink: KafkaSink-T6TNkBpAGd-46f8730428df9ecd6d7318a02bdc405e-20, producerId=8625, epoch=4], transactionStartTime=1617608296546} from checkpoint 40
2021-04-05 07:38:43,170 INFO  org.apache.flink.streaming.connectors.kafka.internals.FlinkKafkaInternalProducer [] - Flushing new partitions
2021-04-05 07:38:43,170 INFO  org.apache.kafka.clients.producer.KafkaProducer              [] - [Producer clientId=producer-Sink: KafkaSink-T6TNkBpAGd-46f8730428df9ecd6d7318a02bdc405e-20, transactionalId=Sink: KafkaSink-T6TNkBpAGd-46f8730428df9ecd6d7318a02bdc405e-20] Closing the Kafka producer with timeoutMillis = 0 ms.
2021-04-05 07:38:43,171 INFO  org.apache.kafka.clients.producer.KafkaProducer              [] - [Producer clientId=producer-Sink: KafkaSink-T6TNkBpAGd-46f8730428df9ecd6d7318a02bdc405e-20, transactionalId=Sink: KafkaSink-T6TNkBpAGd-46f8730428df9ecd6d7318a02bdc405e-20] Proceeding to force close the producer since pending requests could not be completed within timeout 0 ms.
2021-04-05 07:39:05,403 INFO  org.apache.flink.streaming.connectors.kafka.internals.FlinkKafkaInternalProducer [] - Flushing new partitions
2021-04-05 07:39:05,404 INFO  org.apache.kafka.clients.producer.ProducerConfig             [] - ProducerConfig values: 
	acks = 1
	batch.size = 16384
	bootstrap.servers = [130.149.249.46:32690]
	buffer.memory = 33554432
	client.dns.lookup = default
	client.id = 
	compression.type = none
	connections.max.idle.ms = 540000
	delivery.timeout.ms = 120000
	enable.idempotence = true
	interceptor.classes = []
	key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
	linger.ms = 500
	max.block.ms = 60000
	max.in.flight.requests.per.connection = 5
	max.request.size = 1048576
	metadata.max.age.ms = 300000
	metric.reporters = []
	metrics.num.samples = 2
	metrics.recording.level = INFO
	metrics.sample.window.ms = 30000
	partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
	receive.buffer.bytes = 32768
	reconnect.backoff.max.ms = 1000
	reconnect.backoff.ms = 50
	request.timeout.ms = 30000
	retries = 2147483647
	retry.backoff.ms = 100
	sasl.client.callback.handler.class = null
	sasl.jaas.config = null
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.login.callback.handler.class = null
	sasl.login.class = null
	sasl.login.refresh.buffer.seconds = 300
	sasl.login.refresh.min.period.seconds = 60
	sasl.login.refresh.window.factor = 0.8
	sasl.login.refresh.window.jitter = 0.05
	sasl.mechanism = GSSAPI
	security.protocol = PLAINTEXT
	security.providers = null
	send.buffer.bytes = 131072
	ssl.cipher.suites = null
	ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
	ssl.endpoint.identification.algorithm = https
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.location = null
	ssl.keystore.password = null
	ssl.keystore.type = JKS
	ssl.protocol = TLS
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.location = null
	ssl.truststore.password = null
	ssl.truststore.type = JKS
	transaction.timeout.ms = 3600000
	transactional.id = Sink: KafkaSink-T6TNkBpAGd-46f8730428df9ecd6d7318a02bdc405e-28
	value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer

2021-04-05 07:39:05,405 INFO  org.apache.kafka.clients.producer.KafkaProducer              [] - [Producer clientId=producer-Sink: KafkaSink-T6TNkBpAGd-46f8730428df9ecd6d7318a02bdc405e-28, transactionalId=Sink: KafkaSink-T6TNkBpAGd-46f8730428df9ecd6d7318a02bdc405e-28] Instantiated a transactional producer.
2021-04-05 07:39:05,410 INFO  org.apache.kafka.clients.producer.KafkaProducer              [] - [Producer clientId=producer-Sink: KafkaSink-T6TNkBpAGd-46f8730428df9ecd6d7318a02bdc405e-28, transactionalId=Sink: KafkaSink-T6TNkBpAGd-46f8730428df9ecd6d7318a02bdc405e-28] Overriding the default retries config to the recommended value of 2147483647 since the idempotent producer is enabled.
2021-04-05 07:39:05,410 INFO  org.apache.kafka.clients.producer.KafkaProducer              [] - [Producer clientId=producer-Sink: KafkaSink-T6TNkBpAGd-46f8730428df9ecd6d7318a02bdc405e-28, transactionalId=Sink: KafkaSink-T6TNkBpAGd-46f8730428df9ecd6d7318a02bdc405e-28] Overriding the default acks to all since idempotence is enabled.
2021-04-05 07:39:05,411 INFO  org.apache.kafka.common.utils.AppInfoParser                  [] - Kafka version: 2.4.1
2021-04-05 07:39:05,411 INFO  org.apache.kafka.common.utils.AppInfoParser                  [] - Kafka commitId: c57222ae8cd7866b
2021-04-05 07:39:05,411 INFO  org.apache.kafka.common.utils.AppInfoParser                  [] - Kafka startTimeMs: 1617608345411
2021-04-05 07:39:05,411 INFO  org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer [] - Starting FlinkKafkaInternalProducer (3/10) to produce into default topic iot-vehicles-notifications-1viyYcBkQo
2021-04-05 07:39:05,411 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=producer-Sink: KafkaSink-T6TNkBpAGd-46f8730428df9ecd6d7318a02bdc405e-28, transactionalId=Sink: KafkaSink-T6TNkBpAGd-46f8730428df9ecd6d7318a02bdc405e-28] ProducerId set to -1 with epoch -1
2021-04-05 07:39:05,415 INFO  org.apache.kafka.clients.Metadata                            [] - [Producer clientId=producer-Sink: KafkaSink-T6TNkBpAGd-46f8730428df9ecd6d7318a02bdc405e-28, transactionalId=Sink: KafkaSink-T6TNkBpAGd-46f8730428df9ecd6d7318a02bdc405e-28] Cluster ID: 2xzHxtHORb-ATUKF8FGQjQ
2021-04-05 07:39:05,518 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=producer-Sink: KafkaSink-T6TNkBpAGd-46f8730428df9ecd6d7318a02bdc405e-28, transactionalId=Sink: KafkaSink-T6TNkBpAGd-46f8730428df9ecd6d7318a02bdc405e-28] ProducerId set to 9475 with epoch 5
2021-04-05 07:39:08,426 INFO  org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction [] - FlinkKafkaProducer 3/10 - checkpoint 41 complete, committing transaction TransactionHolder{handle=KafkaTransactionState [transactionalId=Sink: KafkaSink-T6TNkBpAGd-46f8730428df9ecd6d7318a02bdc405e-29, producerId=6452, epoch=5], transactionStartTime=1617608320399} from checkpoint 41
2021-04-05 07:39:08,426 INFO  org.apache.flink.streaming.connectors.kafka.internals.FlinkKafkaInternalProducer [] - Flushing new partitions
2021-04-05 07:39:08,427 INFO  org.apache.kafka.clients.producer.KafkaProducer              [] - [Producer clientId=producer-Sink: KafkaSink-T6TNkBpAGd-46f8730428df9ecd6d7318a02bdc405e-29, transactionalId=Sink: KafkaSink-T6TNkBpAGd-46f8730428df9ecd6d7318a02bdc405e-29] Closing the Kafka producer with timeoutMillis = 0 ms.
2021-04-05 07:39:08,427 INFO  org.apache.kafka.clients.producer.KafkaProducer              [] - [Producer clientId=producer-Sink: KafkaSink-T6TNkBpAGd-46f8730428df9ecd6d7318a02bdc405e-29, transactionalId=Sink: KafkaSink-T6TNkBpAGd-46f8730428df9ecd6d7318a02bdc405e-29] Proceeding to force close the producer since pending requests could not be completed within timeout 0 ms.
2021-04-05 07:39:30,663 INFO  org.apache.flink.streaming.connectors.kafka.internals.FlinkKafkaInternalProducer [] - Flushing new partitions
2021-04-05 07:39:30,664 INFO  org.apache.kafka.clients.producer.ProducerConfig             [] - ProducerConfig values: 
	acks = 1
	batch.size = 16384
	bootstrap.servers = [130.149.249.46:32690]
	buffer.memory = 33554432
	client.dns.lookup = default
	client.id = 
	compression.type = none
	connections.max.idle.ms = 540000
	delivery.timeout.ms = 120000
	enable.idempotence = true
	interceptor.classes = []
	key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
	linger.ms = 500
	max.block.ms = 60000
	max.in.flight.requests.per.connection = 5
	max.request.size = 1048576
	metadata.max.age.ms = 300000
	metric.reporters = []
	metrics.num.samples = 2
	metrics.recording.level = INFO
	metrics.sample.window.ms = 30000
	partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
	receive.buffer.bytes = 32768
	reconnect.backoff.max.ms = 1000
	reconnect.backoff.ms = 50
	request.timeout.ms = 30000
	retries = 2147483647
	retry.backoff.ms = 100
	sasl.client.callback.handler.class = null
	sasl.jaas.config = null
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.login.callback.handler.class = null
	sasl.login.class = null
	sasl.login.refresh.buffer.seconds = 300
	sasl.login.refresh.min.period.seconds = 60
	sasl.login.refresh.window.factor = 0.8
	sasl.login.refresh.window.jitter = 0.05
	sasl.mechanism = GSSAPI
	security.protocol = PLAINTEXT
	security.providers = null
	send.buffer.bytes = 131072
	ssl.cipher.suites = null
	ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
	ssl.endpoint.identification.algorithm = https
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.location = null
	ssl.keystore.password = null
	ssl.keystore.type = JKS
	ssl.protocol = TLS
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.location = null
	ssl.truststore.password = null
	ssl.truststore.type = JKS
	transaction.timeout.ms = 3600000
	transactional.id = Sink: KafkaSink-T6TNkBpAGd-46f8730428df9ecd6d7318a02bdc405e-27
	value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer

2021-04-05 07:39:30,690 INFO  org.apache.kafka.clients.producer.KafkaProducer              [] - [Producer clientId=producer-Sink: KafkaSink-T6TNkBpAGd-46f8730428df9ecd6d7318a02bdc405e-27, transactionalId=Sink: KafkaSink-T6TNkBpAGd-46f8730428df9ecd6d7318a02bdc405e-27] Instantiated a transactional producer.
2021-04-05 07:39:30,692 INFO  org.apache.kafka.clients.producer.KafkaProducer              [] - [Producer clientId=producer-Sink: KafkaSink-T6TNkBpAGd-46f8730428df9ecd6d7318a02bdc405e-27, transactionalId=Sink: KafkaSink-T6TNkBpAGd-46f8730428df9ecd6d7318a02bdc405e-27] Overriding the default retries config to the recommended value of 2147483647 since the idempotent producer is enabled.
2021-04-05 07:39:30,692 INFO  org.apache.kafka.clients.producer.KafkaProducer              [] - [Producer clientId=producer-Sink: KafkaSink-T6TNkBpAGd-46f8730428df9ecd6d7318a02bdc405e-27, transactionalId=Sink: KafkaSink-T6TNkBpAGd-46f8730428df9ecd6d7318a02bdc405e-27] Overriding the default acks to all since idempotence is enabled.
2021-04-05 07:39:30,693 INFO  org.apache.kafka.common.utils.AppInfoParser                  [] - Kafka version: 2.4.1
2021-04-05 07:39:30,693 INFO  org.apache.kafka.common.utils.AppInfoParser                  [] - Kafka commitId: c57222ae8cd7866b
2021-04-05 07:39:30,693 INFO  org.apache.kafka.common.utils.AppInfoParser                  [] - Kafka startTimeMs: 1617608370693
2021-04-05 07:39:30,694 INFO  org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer [] - Starting FlinkKafkaInternalProducer (3/10) to produce into default topic iot-vehicles-notifications-1viyYcBkQo
2021-04-05 07:39:30,694 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=producer-Sink: KafkaSink-T6TNkBpAGd-46f8730428df9ecd6d7318a02bdc405e-27, transactionalId=Sink: KafkaSink-T6TNkBpAGd-46f8730428df9ecd6d7318a02bdc405e-27] ProducerId set to -1 with epoch -1
2021-04-05 07:39:30,696 INFO  org.apache.kafka.clients.Metadata                            [] - [Producer clientId=producer-Sink: KafkaSink-T6TNkBpAGd-46f8730428df9ecd6d7318a02bdc405e-27, transactionalId=Sink: KafkaSink-T6TNkBpAGd-46f8730428df9ecd6d7318a02bdc405e-27] Cluster ID: 2xzHxtHORb-ATUKF8FGQjQ
2021-04-05 07:39:30,799 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=producer-Sink: KafkaSink-T6TNkBpAGd-46f8730428df9ecd6d7318a02bdc405e-27, transactionalId=Sink: KafkaSink-T6TNkBpAGd-46f8730428df9ecd6d7318a02bdc405e-27] ProducerId set to 7513 with epoch 5
2021-04-05 07:39:32,270 INFO  org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction [] - FlinkKafkaProducer 3/10 - checkpoint 42 complete, committing transaction TransactionHolder{handle=KafkaTransactionState [transactionalId=Sink: KafkaSink-T6TNkBpAGd-46f8730428df9ecd6d7318a02bdc405e-28, producerId=9475, epoch=5], transactionStartTime=1617608345518} from checkpoint 42
2021-04-05 07:39:32,270 INFO  org.apache.flink.streaming.connectors.kafka.internals.FlinkKafkaInternalProducer [] - Flushing new partitions
2021-04-05 07:39:32,271 INFO  org.apache.kafka.clients.producer.KafkaProducer              [] - [Producer clientId=producer-Sink: KafkaSink-T6TNkBpAGd-46f8730428df9ecd6d7318a02bdc405e-28, transactionalId=Sink: KafkaSink-T6TNkBpAGd-46f8730428df9ecd6d7318a02bdc405e-28] Closing the Kafka producer with timeoutMillis = 0 ms.
2021-04-05 07:39:32,271 INFO  org.apache.kafka.clients.producer.KafkaProducer              [] - [Producer clientId=producer-Sink: KafkaSink-T6TNkBpAGd-46f8730428df9ecd6d7318a02bdc405e-28, transactionalId=Sink: KafkaSink-T6TNkBpAGd-46f8730428df9ecd6d7318a02bdc405e-28] Proceeding to force close the producer since pending requests could not be completed within timeout 0 ms.

Reply via email to