Hello Pushkar,

I think the memory pressure may not come from the topic data consumption,
but from rocksDB used for materializing the global table. Note rocksDB
allocates large chunk of memory beforehand in mem-table / page cache /
reader cache with default configs. You can get some detailed information
from this KIP:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-607%3A+Add+Metrics+to+Kafka+Streams+to+Record+the+Memory+Used+by+RocksDB


Guozhang


On Wed, May 27, 2020 at 8:44 PM Pushkar Deole <pdeole2...@gmail.com> wrote:

> Hello All,
>
> I am using Stream DSL API just to create a GlobalKTable backed by a topic.
> The topology is simple, just create a global table from a topic and that's
> it (pasted below code snippet), when I run this service on K8S cluster
> (container in a pod), the service gets OutOfMemoryError during
> kafkaStreams.start() method call (exception trace pasted below). Note that
> the topic is newly created so there is no data in the topic. POD memory was
> set initially to 500MiB which I doubled to 1000MiB but no luck.
> kafka-streams and kafka-clients jar at 2.3.1 version. Broker might be a
> version ahead I think 2.4 but that should not be an issue. Any help would
> be appreciated since I am blocked at this point.
>
> Properties props = new Properties();
> props.put(StreamsConfig.APPLICATION_ID_CONFIG, DEFAULT_APPLICATION_ID);
> props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, theKafkaServers);
> StreamsBuilder streamsBuilder = new StreamsBuilder();
> GlobalKTable<String, Map<String, String>> groupCacheTable =
>     streamsBuilder.globalTable(GROUP_CACHE_TOPIC,
> Consumed.with(Serdes.String(), GroupCacheSerdes.groupCache()),
> Materialized.as(GROUP_CACHE_STORE_NAME));
> Topology groupCacheTopology = streamsBuilder.build();
> kafkaStreams = new KafkaStreams(groupCacheTopology, props);
> kafkaStreams.start();
>
> Runtime.getRuntime().addShutdownHook(new Thread(() -> {
> LOG.info("Stopping the stream");
> kafkaStreams.close();
> }));
>
>
> {"@timestamp":"2020-05-28T03:11:39.719+00:00","@version":"1","message":"stream-client
> [DsiApplication-3db7d0ab-c84b-4e39-9038-61fa235786b9] State transition from
> CREATED to
>
> REBALANCING","logger_name":"org.apache.kafka.streams.KafkaStreams","thread_name":"main","level":"INFO","level_value":20000}
>
> {"@timestamp":"2020-05-28T03:11:43.532+00:00","@version":"1","message":"Uncaught
> exception in thread 'kafka-admin-client-thread |
>
> DsiApplication-3db7d0ab-c84b-4e39-9038-61fa235786b9-admin':","logger_name":"org.apache.kafka.common.utils.KafkaThread","thread_name":"kafka-admin-client-thread
> |
>
> DsiApplication-3db7d0ab-c84b-4e39-9038-61fa235786b9-admin","level":"ERROR","level_value":40000,"stack_trace":"java.lang.OutOfMemoryError:
> Java heap space\n\tat java.base/java.nio.HeapByteBuffer.<init>(Unknown
> Source)\n\tat java.base/java.nio.ByteBuffer.allocate(Unknown Source)\n\tat
>
> org.apache.kafka.common.memory.MemoryPool$1.tryAllocate(MemoryPool.java:30)\n\tat
>
> org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:112)\n\tat
>
> org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:424)\n\tat
>
> org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:385)\n\tat
>
> org.apache.kafka.common.network.Selector.attemptRead(Selector.java:651)\n\tat
>
> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:572)\n\tat
> org.apache.kafka.common.network.Selector.poll(Selector.java:483)\n\tat
> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:539)\n\tat
>
> org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1152)\n\tat
> java.base/java.lang.Thread.run(Unknown Source)\n"}
>
> {"@timestamp":"2020-05-28T03:11:44.641+00:00","@version":"1","message":"Uncaught
> exception in thread 'kafka-producer-network-thread |
>
> DsiApplication-3db7d0ab-c84b-4e39-9038-61fa235786b9-StreamThread-1-producer':","logger_name":"org.apache.kafka.common.utils.KafkaThread","thread_name":"kafka-producer-network-thread
> |
>
> DsiApplication-3db7d0ab-c84b-4e39-9038-61fa235786b9-StreamThread-1-producer","level":"ERROR","level_value":40000,"stack_trace":"java.lang.OutOfMemoryError:
> Java heap space\n\tat java.base/java.nio.HeapByteBuffer.<init>(Unknown
> Source)\n\tat java.base/java.nio.ByteBuffer.allocate(Unknown Source)\n\tat
>
> org.apache.kafka.common.memory.MemoryPool$1.tryAllocate(MemoryPool.java:30)\n\tat
>
> org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:112)\n\tat
>
> org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:424)\n\tat
>
> org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:385)\n\tat
>
> org.apache.kafka.common.network.Selector.attemptRead(Selector.java:651)\n\tat
>
> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:572)\n\tat
> org.apache.kafka.common.network.Selector.poll(Selector.java:483)\n\tat
> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:539)\n\tat
>
> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:335)\n\tat
>
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:244)\n\tat
> java.base/java.lang.Thread.run(Unknown Source)\n"}
>
> {"@timestamp":"2020-05-28T03:11:45.017+00:00","@version":"1","message":"Opening
> store group-cache-store in regular
>
> mode","logger_name":"org.apache.kafka.streams.state.internals.RocksDBTimestampedStore","thread_name":"DsiApplication-3db7d0ab-c84b-4e39-9038-61fa235786b9-GlobalStreamThread","level":"INFO","level_value":20000}
>
> {"@timestamp":"2020-05-28T03:11:45.020+00:00","@version":"1","message":"global-stream-thread
> [DsiApplication-3db7d0ab-c84b-4e39-9038-61fa235786b9-GlobalStreamThread]
> Restoring state for global store
>
> group-cache-store","logger_name":"org.apache.kafka.streams.processor.internals.GlobalStateManagerImpl","thread_name":"DsiApplication-3db7d0ab-c84b-4e39-9038-61fa235786b9-GlobalStreamThread","level":"INFO","level_value":20000}
>


-- 
-- Guozhang

Reply via email to