Hello All,

I am using Stream DSL API just to create a GlobalKTable backed by a topic.
The topology is simple, just create a global table from a topic and that's
it (pasted below code snippet), when I run this service on K8S cluster
(container in a pod), the service gets OutOfMemoryError during
kafkaStreams.start() method call (exception trace pasted below). Note that
the topic is newly created so there is no data in the topic. POD memory was
set initially to 500MiB which I doubled to 1000MiB but no luck.
kafka-streams and kafka-clients jar at 2.3.1 version. Broker might be a
version ahead I think 2.4 but that should not be an issue. Any help would
be appreciated since I am blocked at this point.

Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, DEFAULT_APPLICATION_ID);
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, theKafkaServers);
StreamsBuilder streamsBuilder = new StreamsBuilder();
GlobalKTable<String, Map<String, String>> groupCacheTable =
    streamsBuilder.globalTable(GROUP_CACHE_TOPIC,
Consumed.with(Serdes.String(), GroupCacheSerdes.groupCache()),
Materialized.as(GROUP_CACHE_STORE_NAME));
Topology groupCacheTopology = streamsBuilder.build();
kafkaStreams = new KafkaStreams(groupCacheTopology, props);
kafkaStreams.start();

Runtime.getRuntime().addShutdownHook(new Thread(() -> {
LOG.info("Stopping the stream");
kafkaStreams.close();
}));

{"@timestamp":"2020-05-28T03:11:39.719+00:00","@version":"1","message":"stream-client
[DsiApplication-3db7d0ab-c84b-4e39-9038-61fa235786b9] State transition from
CREATED to
REBALANCING","logger_name":"org.apache.kafka.streams.KafkaStreams","thread_name":"main","level":"INFO","level_value":20000}
{"@timestamp":"2020-05-28T03:11:43.532+00:00","@version":"1","message":"Uncaught
exception in thread 'kafka-admin-client-thread |
DsiApplication-3db7d0ab-c84b-4e39-9038-61fa235786b9-admin':","logger_name":"org.apache.kafka.common.utils.KafkaThread","thread_name":"kafka-admin-client-thread
|
DsiApplication-3db7d0ab-c84b-4e39-9038-61fa235786b9-admin","level":"ERROR","level_value":40000,"stack_trace":"java.lang.OutOfMemoryError:
Java heap space\n\tat java.base/java.nio.HeapByteBuffer.<init>(Unknown
Source)\n\tat java.base/java.nio.ByteBuffer.allocate(Unknown Source)\n\tat
org.apache.kafka.common.memory.MemoryPool$1.tryAllocate(MemoryPool.java:30)\n\tat
org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:112)\n\tat
org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:424)\n\tat
org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:385)\n\tat
org.apache.kafka.common.network.Selector.attemptRead(Selector.java:651)\n\tat
org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:572)\n\tat
org.apache.kafka.common.network.Selector.poll(Selector.java:483)\n\tat
org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:539)\n\tat
org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1152)\n\tat
java.base/java.lang.Thread.run(Unknown Source)\n"}
{"@timestamp":"2020-05-28T03:11:44.641+00:00","@version":"1","message":"Uncaught
exception in thread 'kafka-producer-network-thread |
DsiApplication-3db7d0ab-c84b-4e39-9038-61fa235786b9-StreamThread-1-producer':","logger_name":"org.apache.kafka.common.utils.KafkaThread","thread_name":"kafka-producer-network-thread
|
DsiApplication-3db7d0ab-c84b-4e39-9038-61fa235786b9-StreamThread-1-producer","level":"ERROR","level_value":40000,"stack_trace":"java.lang.OutOfMemoryError:
Java heap space\n\tat java.base/java.nio.HeapByteBuffer.<init>(Unknown
Source)\n\tat java.base/java.nio.ByteBuffer.allocate(Unknown Source)\n\tat
org.apache.kafka.common.memory.MemoryPool$1.tryAllocate(MemoryPool.java:30)\n\tat
org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:112)\n\tat
org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:424)\n\tat
org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:385)\n\tat
org.apache.kafka.common.network.Selector.attemptRead(Selector.java:651)\n\tat
org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:572)\n\tat
org.apache.kafka.common.network.Selector.poll(Selector.java:483)\n\tat
org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:539)\n\tat
org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:335)\n\tat
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:244)\n\tat
java.base/java.lang.Thread.run(Unknown Source)\n"}
{"@timestamp":"2020-05-28T03:11:45.017+00:00","@version":"1","message":"Opening
store group-cache-store in regular
mode","logger_name":"org.apache.kafka.streams.state.internals.RocksDBTimestampedStore","thread_name":"DsiApplication-3db7d0ab-c84b-4e39-9038-61fa235786b9-GlobalStreamThread","level":"INFO","level_value":20000}
{"@timestamp":"2020-05-28T03:11:45.020+00:00","@version":"1","message":"global-stream-thread
[DsiApplication-3db7d0ab-c84b-4e39-9038-61fa235786b9-GlobalStreamThread]
Restoring state for global store
group-cache-store","logger_name":"org.apache.kafka.streams.processor.internals.GlobalStateManagerImpl","thread_name":"DsiApplication-3db7d0ab-c84b-4e39-9038-61fa235786b9-GlobalStreamThread","level":"INFO","level_value":20000}

Reply via email to