According to the Flink 1.12 documentation ( https://nightlies.apache.org/flink/flink-docs-release-1.12/dev/connectors/kafka.html), it states to use FlinkKafkaSource when consuming from Kafka.
However, I noticed that the newer API uses KafkaSource, which uses KafkaSourceBuilder and OffsetsInitializer. Although I am on the Flink 1.12 codebase, I preemptively decided to use KafkaSource instead in order to use the more advanced offsets feature. It worked, until I deployed it to EMR and had to connect to AWS Kafka (MSK). The logs show a few suspicious things. 1) The ConsumerConfig logs these properties: security.protocol = PLAINTEXT ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS but I actually passed the following: security.protocol = SSL ssl.truststore.location = /etc/alternatives/jre/lib/security/cacerts ssl.truststore.password = changeit ssl.truststore.type = JKS 2) The job fails and this exception is thrown: 2022-02-03 00:40:57,239 ERROR org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext [] - Exception while handling result from async call in SourceCoordinator-Source: kafka sensor tags -> Sink: s3 sink. Triggering job failover. org.apache.flink.util.FlinkRuntimeException: Failed to handle partition splits change due to at org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.handlePartitionSplitChanges(KafkaSourceEnumerator.java:223) ~[feature-LUM-5531-offset--b7c3bfee.jar:?] at org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$null$1(ExecutorNotifier.java:86) ~[flink-dist_2.12-1.12.1.jar:1.12.1] at org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:40) [flink-dist_2.12-1.12.1.jar:1.12.1] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_312] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_312] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_312] Caused by: java.lang.RuntimeException: Failed to get topic metadata. at org.apache.flink.connector.kafka.source.enumerator.subscriber.TopicListSubscriber.getPartitionChanges(TopicListSubscriber.java:59) ~[feature-LUM-5531-offset--b7c3bfee.jar:?] at org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.discoverAndInitializePartitionSplit(KafkaSourceEnumerator.java:196) ~[feature-LUM-5531-offset--b7c3bfee.jar:?] at org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$notifyReadyAsync$2(ExecutorNotifier.java:83) ~[flink-dist_2.12-1.12.1.jar:1.12.1] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_312] at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_312] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) ~[?:1.8.0_312] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) ~[?:1.8.0_312] ... 3 more Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Call(callName=describeTopics, deadlineMs=1643848916823) timed out at 9223372036854775807 after 1 attempt(s) at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45) ~[feature-LUM-5531-offset--b7c3bfee.jar:?] at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32) ~[feature-LUM-5531-offset--b7c3bfee.jar:?] at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89) ~[feature-LUM-5531-offset--b7c3bfee.jar:?] at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260) ~[feature-LUM-5531-offset--b7c3bfee.jar:?] at org.apache.flink.connector.kafka.source.enumerator.subscriber.TopicListSubscriber.getPartitionChanges(TopicListSubscriber.java:57) ~[feature-LUM-5531-offset--b7c3bfee.jar:?] at org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.discoverAndInitializePartitionSplit(KafkaSourceEnumerator.java:196) ~[feature-LUM-5531-offset--b7c3bfee.jar:?] at org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$notifyReadyAsync$2(ExecutorNotifier.java:83) ~[flink-dist_2.12-1.12.1.jar:1.12.1] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_312] at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_312] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) ~[?:1.8.0_312] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) ~[?:1.8.0_312] ... 3 more Caused by: org.apache.kafka.common.errors.TimeoutException: Call(callName=describeTopics, deadlineMs=1643848916823) timed out at 9223372036854775807 after 1 attempt(s) Caused by: org.apache.kafka.common.errors.TimeoutException: The AdminClient thread has exited. 2022-02-03 00:40:57,246 ERROR org.apache.kafka.common.utils.KafkaThread [] - Uncaught exception in thread 'kafka-admin-client-thread | null-enumerator-admin-client': java.lang.OutOfMemoryError: Java heap space at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57) ~[?:1.8.0_312] at java.nio.ByteBuffer.allocate(ByteBuffer.java:335) ~[?:1.8.0_312] at org.apache.kafka.common.memory.MemoryPool$1.tryAllocate(MemoryPool.java:30) ~[feature-LUM-5531-offset--b7c3bfee.jar:?] at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:113) ~[feature-LUM-5531-offset--b7c3bfee.jar:?] at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:448) ~[feature-LUM-5531-offset--b7c3bfee.jar:?] at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:398) ~[feature-LUM-5531-offset--b7c3bfee.jar:?] at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:678) ~[feature-LUM-5531-offset--b7c3bfee.jar:?] at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:580) ~[feature-LUM-5531-offset--b7c3bfee.jar:?] at org.apache.kafka.common.network.Selector.poll(Selector.java:485) ~[feature-LUM-5531-offset--b7c3bfee.jar:?] at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:549) ~[feature-LUM-5531-offset--b7c3bfee.jar:?] at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.processRequests(KafkaAdminClient.java:1272) ~[feature-LUM-5531-offset--b7c3bfee.jar:?] at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1203) ~[feature-LUM-5531-offset--b7c3bfee.jar:?] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_312] Which is more indicative of an SSL error than a memory error according to many sources. My code looks like this: private KafkaSource<SensorMessage> buildKafkaSource() { final KafkaSourceBuilder<SensorMessage> sourceBuilder = KafkaSource.<SensorMessage>builder() .setTopics(sensorKafkaTopic) .setDeserializer(new SensorMessageKafkaRecordDeserializer()) .setBootstrapServers(String.join(",", kafkaBootstrapServers)) .setGroupId(kafkaGroupId) .setProperty("zookeeper.connect", zookeeperConnect); if (kafkaRequiresSsl) { sourceBuilder.setProperty("ssl.truststore.location", kafkaSslTruststoreLocation); sourceBuilder.setProperty("ssl.truststore.password", kafkaSslTruststorePassword); sourceBuilder.setProperty("security.protocol", "SSL"); } assignOffsetStrategy(sourceBuilder); sourceBuilder.setProperty("commit.offsets.on.checkpoint", Boolean.TRUE.toString()); return sourceBuilder.build(); } private void assignOffsetStrategy(KafkaSourceBuilder<SensorMessage> sourceBuilder) { if (kafkaOffset != null) { switch (kafkaOffset) { case committed: if (kafkaOffsetResetStrategy != null) { switch (kafkaOffsetResetStrategy) { case earliest: sourceBuilder.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST)); break; case latest: sourceBuilder.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST)); break; } } else { sourceBuilder.setStartingOffsets(OffsetsInitializer.committedOffsets()); } break; case timestamp: sourceBuilder.setStartingOffsets(OffsetsInitializer.timestamp(kafkaOffsetTimestamp)); break; case earliest: sourceBuilder.setStartingOffsets(OffsetsInitializer.earliest()); break; case latest: sourceBuilder.setStartingOffsets(OffsetsInitializer.latest()); break; } } } public enum OffsetOption { committed, timestamp, earliest, latest } public enum OffsetResetStrategyOption { earliest, latest } So, did I prematurely use KafkaSource and KafkaSourceBuilder? Should I revert to using FlinkKafkaSource? Any advice or insight would be very helpful. Thank you. Marco A. Villalobos