Thank you for the information. I have a feeling this is more to do with EXACTLY_ONCE kafka producers and transactions not playing nice with checkpoints and a timeout happens. The jobs seem to fail and hit this restart and fail loop. Looking in the logs, taskmanager logs grow very large with the same messages repeating over and over again. Ive attacked a file for this. The two lines that give me pause are:
Closing the Kafka producer with timeoutMillis = 0 ms. Proceeding to force close the producer since pending requests could not be completed within timeout 0 ms. I'm not really sure which timeout this is but it looks like there is a timeout loop happening here. The Kafka producer has been configured as such (the transaction timeout has been set on the kafka server to match the producer): Properties kafkaProducerProps = new Properties(); kafkaProducerProps.setProperty("bootstrap.servers", brokerList); kafkaProducerProps.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, "3600000"); kafkaProducerProps.setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5"); kafkaProducerProps.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, UUID.randomUUID().toString()); kafkaProducerProps.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true"); kafkaProducerProps.setProperty(ProducerConfig.LINGER_MS_CONFIG, "500"); kafkaProducerProps.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, "16384"); kafkaProducerProps.setProperty(ProducerConfig.BUFFER_MEMORY_CONFIG, "33554432"); kafkaProducerProps.setProperty(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "30000"); kafkaProducerProps.setProperty(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, "120000"); FlinkKafkaProducer<String> myProducer = new FlinkKafkaProducer<>( producerTopic, (KafkaSerializationSchema<String>) (value, aLong) -> { return new ProducerRecord<>(producerTopic, value.getBytes()); }, kafkaProducerProps, Semantic.EXACTLY_ONCE, 10); And checkpoints have been configured as such: StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // configuring RocksDB state backend to use HDFS String backupFolder = props.getProperty("hdfs.backupFolder"); StateBackend backend = new RocksDBStateBackend(backupFolder, true); env.setStateBackend(backend); // start a checkpoint based on supplied interval env.enableCheckpointing(checkpointInterval); // set mode to exactly-once (this is the default) env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); // make sure progress happen between checkpoints env.getCheckpointConfig().setMinPauseBetweenCheckpoints(checkpointInterval); // checkpoints have to complete within two minute, or are discarded env.getCheckpointConfig().setCheckpointTimeout(380000); //env.getCheckpointConfig().setTolerableCheckpointFailureNumber(); // no external services which could take some time to respond, therefore 1 // allow only one checkpoint to be in progress at the same time env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); // enable externalized checkpoints which are deleted after job cancellation env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION); Additionally, each taskmanager has been configured with 4GB of memory, there is a sliding window of 10 seconds with a slide of 1 second, and the cluster setup is using flink native. Any hints would be much appreciated! Regards, M. ________________________________ From: Guowei Ma <guowei....@gmail.com> Sent: 01 April 2021 14:19 To: Geldenhuys, Morgan Karl Cc: user Subject: Re: Checkpoint timeouts at times of high load Hi, I think there are many reasons that could lead to the checkpoint timeout. Would you like to share some detailed information of checkpoint? For example, the detailed checkpoint information from the web.[1] And which Flink version do you use? [1] https://ci.apache.org/projects/flink/flink-docs-release-1.12/ops/monitoring/checkpoint_monitoring.html Best, Guowei On Thu, Apr 1, 2021 at 4:33 PM Geldenhuys, Morgan Karl <morgan.geldenh...@tu-berlin.de<mailto:morgan.geldenh...@tu-berlin.de>> wrote: Hi Community, I have a number of flink jobs running inside my session cluster with varying checkpoint intervals plus a large amount of operator state and in times of high load, the jobs fail due to checkpoint timeouts (set to 6 minutes). I can only assume this is because the latencies for saving checkpoints at these times of high load increase. I have a 30 node HDFS cluster for checkpoints... however I see that only 4 of these nodes are being used for storage. Is there a way of ensuring the load is evenly spread? Could there be another reason for these checkpoint timeouts? Events are being consumed from kafka, to kafka with EXACTLY ONCE guarantees enabled. Thank you very much! M.
2021-04-05 07:38:16,437 INFO org.apache.kafka.clients.producer.KafkaProducer [] - [Producer clientId=producer-Sink: KafkaSink-T6TNkBpAGd-46f8730428df9ecd6d7318a02bdc405e-20, transactionalId=Sink: KafkaSink-T6TNkBpAGd-46f8730428df9ecd6d7318a02bdc405e-20] Instantiated a transactional producer. 2021-04-05 07:38:16,439 INFO org.apache.kafka.clients.producer.KafkaProducer [] - [Producer clientId=producer-Sink: KafkaSink-T6TNkBpAGd-46f8730428df9ecd6d7318a02bdc405e-20, transactionalId=Sink: KafkaSink-T6TNkBpAGd-46f8730428df9ecd6d7318a02bdc405e-20] Overriding the default retries config to the recommended value of 2147483647 since the idempotent producer is enabled. 2021-04-05 07:38:16,439 INFO org.apache.kafka.clients.producer.KafkaProducer [] - [Producer clientId=producer-Sink: KafkaSink-T6TNkBpAGd-46f8730428df9ecd6d7318a02bdc405e-20, transactionalId=Sink: KafkaSink-T6TNkBpAGd-46f8730428df9ecd6d7318a02bdc405e-20] Overriding the default acks to all since idempotence is enabled. 2021-04-05 07:38:16,440 INFO org.apache.kafka.common.utils.AppInfoParser [] - Kafka version: 2.4.1 2021-04-05 07:38:16,440 INFO org.apache.kafka.common.utils.AppInfoParser [] - Kafka commitId: c57222ae8cd7866b 2021-04-05 07:38:16,440 INFO org.apache.kafka.common.utils.AppInfoParser [] - Kafka startTimeMs: 1617608296440 2021-04-05 07:38:16,440 INFO org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer [] - Starting FlinkKafkaInternalProducer (3/10) to produce into default topic iot-vehicles-notifications-1viyYcBkQo 2021-04-05 07:38:16,442 INFO org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=producer-Sink: KafkaSink-T6TNkBpAGd-46f8730428df9ecd6d7318a02bdc405e-20, transactionalId=Sink: KafkaSink-T6TNkBpAGd-46f8730428df9ecd6d7318a02bdc405e-20] ProducerId set to -1 with epoch -1 2021-04-05 07:38:16,443 INFO org.apache.kafka.clients.Metadata [] - [Producer clientId=producer-Sink: KafkaSink-T6TNkBpAGd-46f8730428df9ecd6d7318a02bdc405e-20, transactionalId=Sink: KafkaSink-T6TNkBpAGd-46f8730428df9ecd6d7318a02bdc405e-20] Cluster ID: 2xzHxtHORb-ATUKF8FGQjQ 2021-04-05 07:38:16,546 INFO org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=producer-Sink: KafkaSink-T6TNkBpAGd-46f8730428df9ecd6d7318a02bdc405e-20, transactionalId=Sink: KafkaSink-T6TNkBpAGd-46f8730428df9ecd6d7318a02bdc405e-20] ProducerId set to 8625 with epoch 4 2021-04-05 07:38:17,977 INFO org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction [] - FlinkKafkaProducer 3/10 - checkpoint 39 complete, committing transaction TransactionHolder{handle=KafkaTransactionState [transactionalId=Sink: KafkaSink-T6TNkBpAGd-46f8730428df9ecd6d7318a02bdc405e-21, producerId=5615, epoch=4], transactionStartTime=1617608271462} from checkpoint 39 2021-04-05 07:38:17,978 INFO org.apache.flink.streaming.connectors.kafka.internals.FlinkKafkaInternalProducer [] - Flushing new partitions 2021-04-05 07:38:17,978 INFO org.apache.kafka.clients.producer.KafkaProducer [] - [Producer clientId=producer-Sink: KafkaSink-T6TNkBpAGd-46f8730428df9ecd6d7318a02bdc405e-21, transactionalId=Sink: KafkaSink-T6TNkBpAGd-46f8730428df9ecd6d7318a02bdc405e-21] Closing the Kafka producer with timeoutMillis = 0 ms. 2021-04-05 07:38:17,978 INFO org.apache.kafka.clients.producer.KafkaProducer [] - [Producer clientId=producer-Sink: KafkaSink-T6TNkBpAGd-46f8730428df9ecd6d7318a02bdc405e-21, transactionalId=Sink: KafkaSink-T6TNkBpAGd-46f8730428df9ecd6d7318a02bdc405e-21] Proceeding to force close the producer since pending requests could not be completed within timeout 0 ms. 2021-04-05 07:38:40,287 INFO org.apache.flink.streaming.connectors.kafka.internals.FlinkKafkaInternalProducer [] - Flushing new partitions 2021-04-05 07:38:40,288 INFO org.apache.kafka.clients.producer.ProducerConfig [] - ProducerConfig values: acks = 1 batch.size = 16384 bootstrap.servers = [130.149.249.46:32690] buffer.memory = 33554432 client.dns.lookup = default client.id = compression.type = none connections.max.idle.ms = 540000 delivery.timeout.ms = 120000 enable.idempotence = true interceptor.classes = [] key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer linger.ms = 500 max.block.ms = 60000 max.in.flight.requests.per.connection = 5 max.request.size = 1048576 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner receive.buffer.bytes = 32768 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 30000 retries = 2147483647 retry.backoff.ms = 100 sasl.client.callback.handler.class = null sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.login.callback.handler.class = null sasl.login.class = null sasl.login.refresh.buffer.seconds = 300 sasl.login.refresh.min.period.seconds = 60 sasl.login.refresh.window.factor = 0.8 sasl.login.refresh.window.jitter = 0.05 sasl.mechanism = GSSAPI security.protocol = PLAINTEXT security.providers = null send.buffer.bytes = 131072 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.endpoint.identification.algorithm = https ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLS ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS transaction.timeout.ms = 3600000 transactional.id = Sink: KafkaSink-T6TNkBpAGd-46f8730428df9ecd6d7318a02bdc405e-29 value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer 2021-04-05 07:38:40,289 INFO org.apache.kafka.clients.producer.KafkaProducer [] - [Producer clientId=producer-Sink: KafkaSink-T6TNkBpAGd-46f8730428df9ecd6d7318a02bdc405e-29, transactionalId=Sink: KafkaSink-T6TNkBpAGd-46f8730428df9ecd6d7318a02bdc405e-29] Instantiated a transactional producer. 2021-04-05 07:38:40,293 INFO org.apache.kafka.clients.producer.KafkaProducer [] - [Producer clientId=producer-Sink: KafkaSink-T6TNkBpAGd-46f8730428df9ecd6d7318a02bdc405e-29, transactionalId=Sink: KafkaSink-T6TNkBpAGd-46f8730428df9ecd6d7318a02bdc405e-29] Overriding the default retries config to the recommended value of 2147483647 since the idempotent producer is enabled. 2021-04-05 07:38:40,293 INFO org.apache.kafka.clients.producer.KafkaProducer [] - [Producer clientId=producer-Sink: KafkaSink-T6TNkBpAGd-46f8730428df9ecd6d7318a02bdc405e-29, transactionalId=Sink: KafkaSink-T6TNkBpAGd-46f8730428df9ecd6d7318a02bdc405e-29] Overriding the default acks to all since idempotence is enabled. 2021-04-05 07:38:40,294 INFO org.apache.kafka.common.utils.AppInfoParser [] - Kafka version: 2.4.1 2021-04-05 07:38:40,294 INFO org.apache.kafka.common.utils.AppInfoParser [] - Kafka commitId: c57222ae8cd7866b 2021-04-05 07:38:40,294 INFO org.apache.kafka.common.utils.AppInfoParser [] - Kafka startTimeMs: 1617608320294 2021-04-05 07:38:40,294 INFO org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer [] - Starting FlinkKafkaInternalProducer (3/10) to produce into default topic iot-vehicles-notifications-1viyYcBkQo 2021-04-05 07:38:40,295 INFO org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=producer-Sink: KafkaSink-T6TNkBpAGd-46f8730428df9ecd6d7318a02bdc405e-29, transactionalId=Sink: KafkaSink-T6TNkBpAGd-46f8730428df9ecd6d7318a02bdc405e-29] ProducerId set to -1 with epoch -1 2021-04-05 07:38:40,297 INFO org.apache.kafka.clients.Metadata [] - [Producer clientId=producer-Sink: KafkaSink-T6TNkBpAGd-46f8730428df9ecd6d7318a02bdc405e-29, transactionalId=Sink: KafkaSink-T6TNkBpAGd-46f8730428df9ecd6d7318a02bdc405e-29] Cluster ID: 2xzHxtHORb-ATUKF8FGQjQ 2021-04-05 07:38:40,399 INFO org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=producer-Sink: KafkaSink-T6TNkBpAGd-46f8730428df9ecd6d7318a02bdc405e-29, transactionalId=Sink: KafkaSink-T6TNkBpAGd-46f8730428df9ecd6d7318a02bdc405e-29] ProducerId set to 6452 with epoch 5 2021-04-05 07:38:43,170 INFO org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction [] - FlinkKafkaProducer 3/10 - checkpoint 40 complete, committing transaction TransactionHolder{handle=KafkaTransactionState [transactionalId=Sink: KafkaSink-T6TNkBpAGd-46f8730428df9ecd6d7318a02bdc405e-20, producerId=8625, epoch=4], transactionStartTime=1617608296546} from checkpoint 40 2021-04-05 07:38:43,170 INFO org.apache.flink.streaming.connectors.kafka.internals.FlinkKafkaInternalProducer [] - Flushing new partitions 2021-04-05 07:38:43,170 INFO org.apache.kafka.clients.producer.KafkaProducer [] - [Producer clientId=producer-Sink: KafkaSink-T6TNkBpAGd-46f8730428df9ecd6d7318a02bdc405e-20, transactionalId=Sink: KafkaSink-T6TNkBpAGd-46f8730428df9ecd6d7318a02bdc405e-20] Closing the Kafka producer with timeoutMillis = 0 ms. 2021-04-05 07:38:43,171 INFO org.apache.kafka.clients.producer.KafkaProducer [] - [Producer clientId=producer-Sink: KafkaSink-T6TNkBpAGd-46f8730428df9ecd6d7318a02bdc405e-20, transactionalId=Sink: KafkaSink-T6TNkBpAGd-46f8730428df9ecd6d7318a02bdc405e-20] Proceeding to force close the producer since pending requests could not be completed within timeout 0 ms. 2021-04-05 07:39:05,403 INFO org.apache.flink.streaming.connectors.kafka.internals.FlinkKafkaInternalProducer [] - Flushing new partitions 2021-04-05 07:39:05,404 INFO org.apache.kafka.clients.producer.ProducerConfig [] - ProducerConfig values: acks = 1 batch.size = 16384 bootstrap.servers = [130.149.249.46:32690] buffer.memory = 33554432 client.dns.lookup = default client.id = compression.type = none connections.max.idle.ms = 540000 delivery.timeout.ms = 120000 enable.idempotence = true interceptor.classes = [] key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer linger.ms = 500 max.block.ms = 60000 max.in.flight.requests.per.connection = 5 max.request.size = 1048576 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner receive.buffer.bytes = 32768 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 30000 retries = 2147483647 retry.backoff.ms = 100 sasl.client.callback.handler.class = null sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.login.callback.handler.class = null sasl.login.class = null sasl.login.refresh.buffer.seconds = 300 sasl.login.refresh.min.period.seconds = 60 sasl.login.refresh.window.factor = 0.8 sasl.login.refresh.window.jitter = 0.05 sasl.mechanism = GSSAPI security.protocol = PLAINTEXT security.providers = null send.buffer.bytes = 131072 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.endpoint.identification.algorithm = https ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLS ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS transaction.timeout.ms = 3600000 transactional.id = Sink: KafkaSink-T6TNkBpAGd-46f8730428df9ecd6d7318a02bdc405e-28 value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer 2021-04-05 07:39:05,405 INFO org.apache.kafka.clients.producer.KafkaProducer [] - [Producer clientId=producer-Sink: KafkaSink-T6TNkBpAGd-46f8730428df9ecd6d7318a02bdc405e-28, transactionalId=Sink: KafkaSink-T6TNkBpAGd-46f8730428df9ecd6d7318a02bdc405e-28] Instantiated a transactional producer. 2021-04-05 07:39:05,410 INFO org.apache.kafka.clients.producer.KafkaProducer [] - [Producer clientId=producer-Sink: KafkaSink-T6TNkBpAGd-46f8730428df9ecd6d7318a02bdc405e-28, transactionalId=Sink: KafkaSink-T6TNkBpAGd-46f8730428df9ecd6d7318a02bdc405e-28] Overriding the default retries config to the recommended value of 2147483647 since the idempotent producer is enabled. 2021-04-05 07:39:05,410 INFO org.apache.kafka.clients.producer.KafkaProducer [] - [Producer clientId=producer-Sink: KafkaSink-T6TNkBpAGd-46f8730428df9ecd6d7318a02bdc405e-28, transactionalId=Sink: KafkaSink-T6TNkBpAGd-46f8730428df9ecd6d7318a02bdc405e-28] Overriding the default acks to all since idempotence is enabled. 2021-04-05 07:39:05,411 INFO org.apache.kafka.common.utils.AppInfoParser [] - Kafka version: 2.4.1 2021-04-05 07:39:05,411 INFO org.apache.kafka.common.utils.AppInfoParser [] - Kafka commitId: c57222ae8cd7866b 2021-04-05 07:39:05,411 INFO org.apache.kafka.common.utils.AppInfoParser [] - Kafka startTimeMs: 1617608345411 2021-04-05 07:39:05,411 INFO org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer [] - Starting FlinkKafkaInternalProducer (3/10) to produce into default topic iot-vehicles-notifications-1viyYcBkQo 2021-04-05 07:39:05,411 INFO org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=producer-Sink: KafkaSink-T6TNkBpAGd-46f8730428df9ecd6d7318a02bdc405e-28, transactionalId=Sink: KafkaSink-T6TNkBpAGd-46f8730428df9ecd6d7318a02bdc405e-28] ProducerId set to -1 with epoch -1 2021-04-05 07:39:05,415 INFO org.apache.kafka.clients.Metadata [] - [Producer clientId=producer-Sink: KafkaSink-T6TNkBpAGd-46f8730428df9ecd6d7318a02bdc405e-28, transactionalId=Sink: KafkaSink-T6TNkBpAGd-46f8730428df9ecd6d7318a02bdc405e-28] Cluster ID: 2xzHxtHORb-ATUKF8FGQjQ 2021-04-05 07:39:05,518 INFO org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=producer-Sink: KafkaSink-T6TNkBpAGd-46f8730428df9ecd6d7318a02bdc405e-28, transactionalId=Sink: KafkaSink-T6TNkBpAGd-46f8730428df9ecd6d7318a02bdc405e-28] ProducerId set to 9475 with epoch 5 2021-04-05 07:39:08,426 INFO org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction [] - FlinkKafkaProducer 3/10 - checkpoint 41 complete, committing transaction TransactionHolder{handle=KafkaTransactionState [transactionalId=Sink: KafkaSink-T6TNkBpAGd-46f8730428df9ecd6d7318a02bdc405e-29, producerId=6452, epoch=5], transactionStartTime=1617608320399} from checkpoint 41 2021-04-05 07:39:08,426 INFO org.apache.flink.streaming.connectors.kafka.internals.FlinkKafkaInternalProducer [] - Flushing new partitions 2021-04-05 07:39:08,427 INFO org.apache.kafka.clients.producer.KafkaProducer [] - [Producer clientId=producer-Sink: KafkaSink-T6TNkBpAGd-46f8730428df9ecd6d7318a02bdc405e-29, transactionalId=Sink: KafkaSink-T6TNkBpAGd-46f8730428df9ecd6d7318a02bdc405e-29] Closing the Kafka producer with timeoutMillis = 0 ms. 2021-04-05 07:39:08,427 INFO org.apache.kafka.clients.producer.KafkaProducer [] - [Producer clientId=producer-Sink: KafkaSink-T6TNkBpAGd-46f8730428df9ecd6d7318a02bdc405e-29, transactionalId=Sink: KafkaSink-T6TNkBpAGd-46f8730428df9ecd6d7318a02bdc405e-29] Proceeding to force close the producer since pending requests could not be completed within timeout 0 ms. 2021-04-05 07:39:30,663 INFO org.apache.flink.streaming.connectors.kafka.internals.FlinkKafkaInternalProducer [] - Flushing new partitions 2021-04-05 07:39:30,664 INFO org.apache.kafka.clients.producer.ProducerConfig [] - ProducerConfig values: acks = 1 batch.size = 16384 bootstrap.servers = [130.149.249.46:32690] buffer.memory = 33554432 client.dns.lookup = default client.id = compression.type = none connections.max.idle.ms = 540000 delivery.timeout.ms = 120000 enable.idempotence = true interceptor.classes = [] key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer linger.ms = 500 max.block.ms = 60000 max.in.flight.requests.per.connection = 5 max.request.size = 1048576 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner receive.buffer.bytes = 32768 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 30000 retries = 2147483647 retry.backoff.ms = 100 sasl.client.callback.handler.class = null sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.login.callback.handler.class = null sasl.login.class = null sasl.login.refresh.buffer.seconds = 300 sasl.login.refresh.min.period.seconds = 60 sasl.login.refresh.window.factor = 0.8 sasl.login.refresh.window.jitter = 0.05 sasl.mechanism = GSSAPI security.protocol = PLAINTEXT security.providers = null send.buffer.bytes = 131072 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.endpoint.identification.algorithm = https ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLS ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS transaction.timeout.ms = 3600000 transactional.id = Sink: KafkaSink-T6TNkBpAGd-46f8730428df9ecd6d7318a02bdc405e-27 value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer 2021-04-05 07:39:30,690 INFO org.apache.kafka.clients.producer.KafkaProducer [] - [Producer clientId=producer-Sink: KafkaSink-T6TNkBpAGd-46f8730428df9ecd6d7318a02bdc405e-27, transactionalId=Sink: KafkaSink-T6TNkBpAGd-46f8730428df9ecd6d7318a02bdc405e-27] Instantiated a transactional producer. 2021-04-05 07:39:30,692 INFO org.apache.kafka.clients.producer.KafkaProducer [] - [Producer clientId=producer-Sink: KafkaSink-T6TNkBpAGd-46f8730428df9ecd6d7318a02bdc405e-27, transactionalId=Sink: KafkaSink-T6TNkBpAGd-46f8730428df9ecd6d7318a02bdc405e-27] Overriding the default retries config to the recommended value of 2147483647 since the idempotent producer is enabled. 2021-04-05 07:39:30,692 INFO org.apache.kafka.clients.producer.KafkaProducer [] - [Producer clientId=producer-Sink: KafkaSink-T6TNkBpAGd-46f8730428df9ecd6d7318a02bdc405e-27, transactionalId=Sink: KafkaSink-T6TNkBpAGd-46f8730428df9ecd6d7318a02bdc405e-27] Overriding the default acks to all since idempotence is enabled. 2021-04-05 07:39:30,693 INFO org.apache.kafka.common.utils.AppInfoParser [] - Kafka version: 2.4.1 2021-04-05 07:39:30,693 INFO org.apache.kafka.common.utils.AppInfoParser [] - Kafka commitId: c57222ae8cd7866b 2021-04-05 07:39:30,693 INFO org.apache.kafka.common.utils.AppInfoParser [] - Kafka startTimeMs: 1617608370693 2021-04-05 07:39:30,694 INFO org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer [] - Starting FlinkKafkaInternalProducer (3/10) to produce into default topic iot-vehicles-notifications-1viyYcBkQo 2021-04-05 07:39:30,694 INFO org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=producer-Sink: KafkaSink-T6TNkBpAGd-46f8730428df9ecd6d7318a02bdc405e-27, transactionalId=Sink: KafkaSink-T6TNkBpAGd-46f8730428df9ecd6d7318a02bdc405e-27] ProducerId set to -1 with epoch -1 2021-04-05 07:39:30,696 INFO org.apache.kafka.clients.Metadata [] - [Producer clientId=producer-Sink: KafkaSink-T6TNkBpAGd-46f8730428df9ecd6d7318a02bdc405e-27, transactionalId=Sink: KafkaSink-T6TNkBpAGd-46f8730428df9ecd6d7318a02bdc405e-27] Cluster ID: 2xzHxtHORb-ATUKF8FGQjQ 2021-04-05 07:39:30,799 INFO org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=producer-Sink: KafkaSink-T6TNkBpAGd-46f8730428df9ecd6d7318a02bdc405e-27, transactionalId=Sink: KafkaSink-T6TNkBpAGd-46f8730428df9ecd6d7318a02bdc405e-27] ProducerId set to 7513 with epoch 5 2021-04-05 07:39:32,270 INFO org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction [] - FlinkKafkaProducer 3/10 - checkpoint 42 complete, committing transaction TransactionHolder{handle=KafkaTransactionState [transactionalId=Sink: KafkaSink-T6TNkBpAGd-46f8730428df9ecd6d7318a02bdc405e-28, producerId=9475, epoch=5], transactionStartTime=1617608345518} from checkpoint 42 2021-04-05 07:39:32,270 INFO org.apache.flink.streaming.connectors.kafka.internals.FlinkKafkaInternalProducer [] - Flushing new partitions 2021-04-05 07:39:32,271 INFO org.apache.kafka.clients.producer.KafkaProducer [] - [Producer clientId=producer-Sink: KafkaSink-T6TNkBpAGd-46f8730428df9ecd6d7318a02bdc405e-28, transactionalId=Sink: KafkaSink-T6TNkBpAGd-46f8730428df9ecd6d7318a02bdc405e-28] Closing the Kafka producer with timeoutMillis = 0 ms. 2021-04-05 07:39:32,271 INFO org.apache.kafka.clients.producer.KafkaProducer [] - [Producer clientId=producer-Sink: KafkaSink-T6TNkBpAGd-46f8730428df9ecd6d7318a02bdc405e-28, transactionalId=Sink: KafkaSink-T6TNkBpAGd-46f8730428df9ecd6d7318a02bdc405e-28] Proceeding to force close the producer since pending requests could not be completed within timeout 0 ms.