Hi Ian,

It looks like you have a lot of partitions for the count store. Each RocksDb 
database uses off heap memory (around 60-70MB in 0.10.2) which will add up if 
you have these many stores in one instance. One solution would be to scale out 
your streams application by using another Kafka Streams instance on another 
machine. That would explain the running out of memory part.

The locking part was supposed to have been fixed in 0.10.2, however it seems 
like there are still cases when it happens. Could you confirm that you are 
using the latest release 0.10.2 (that just came out last week)? Just 
double-checking before re-opening the JIRA. As a work around, try setting 
num.stream.threads to 1 and run instead multiple instances/processes on the 
same machine.


> On 27 Feb 2017, at 14:04, Ian Duffy <i...@ianduffy.ie> wrote:
> Hi All,
> I'm using Kafka Client 10.2 with Kafka Streams.
> I'm performing a groupByKey on a stream and seeing large files appear
> within my state directory. Is this expected?
> 90M 1_0/rocksdb/content-count-store
> 82M 1_1/rocksdb/content-count-store
> 102M 1_10/rocksdb/content-count-store
> 86M 1_11/rocksdb/content-count-store
> 87M 1_12/rocksdb/content-count-store
> 85M 1_13/rocksdb/content-count-store
> 93M 1_14/rocksdb/content-count-store
> 87M 1_15/rocksdb/content-count-store
> 92M 1_16/rocksdb/content-count-store
> 97M 1_17/rocksdb/content-count-store
> 91M 1_18/rocksdb/content-count-store
> 94M 1_19/rocksdb/content-count-store
> 89M 1_2/rocksdb/content-count-store
> 88M 1_20/rocksdb/content-count-store
> 92M 1_21/rocksdb/content-count-store
> 83M 1_22/rocksdb/content-count-store
> 82M 1_23/rocksdb/content-count-store
> 83M 1_24/rocksdb/content-count-store
> 89M 1_25/rocksdb/content-count-store
> 91M 1_26/rocksdb/content-count-store
> 84M 1_27/rocksdb/content-count-store
> 87M 1_28/rocksdb/content-count-store
> 93M 1_29/rocksdb/content-count-store
> 88M 1_3/rocksdb/content-count-store
> 77M 1_30/rocksdb/content-count-store
> 101M 1_31/rocksdb/content-count-store
> 73M 1_32/rocksdb/content-count-store
> 89M 1_33/rocksdb/content-count-store
> 89M 1_34/rocksdb/content-count-store
> 82M 1_35/rocksdb/content-count-store
> 88M 1_36/rocksdb/content-count-store
> 82M 1_37/rocksdb/content-count-store
> 83M 1_38/rocksdb/content-count-store
> 92M 1_39/rocksdb/content-count-store
> 99M 1_4/rocksdb/content-count-store
> 88M 1_40/rocksdb/content-count-store
> 89M 1_41/rocksdb/content-count-store
> 84M 1_42/rocksdb/content-count-store
> 88M 1_43/rocksdb/content-count-store
> 91M 1_44/rocksdb/content-count-store
> 90M 1_45/rocksdb/content-count-store
> 81M 1_46/rocksdb/content-count-store
> 89M 1_47/rocksdb/content-count-store
> 81M 1_48/rocksdb/content-count-store
> 81M 1_49/rocksdb/content-count-store
> 82M 1_5/rocksdb/content-count-store
> 88M 1_50/rocksdb/content-count-store
> 88M 1_51/rocksdb/content-count-store
> 75M 1_52/rocksdb/content-count-store
> 85M 1_53/rocksdb/content-count-store
> 72M 1_54/rocksdb/content-count-store
> 89M 1_55/rocksdb/content-count-store
> 86M 1_56/rocksdb/content-count-store
> 87M 1_57/rocksdb/content-count-store
> 87M 1_58/rocksdb/content-count-store
> 94M 1_59/rocksdb/content-count-store
> 83M 1_6/rocksdb/content-count-store
> 88M 1_60/rocksdb/content-count-store
> 87M 1_61/rocksdb/content-count-store
> 102M 1_62/rocksdb/content-count-store
> 86M 1_63/rocksdb/content-count-store
> 85M 1_64/rocksdb/content-count-store
> 91M 1_65/rocksdb/content-count-store
> 86M 1_66/rocksdb/content-count-store
> 82M 1_67/rocksdb/content-count-store
> 85M 1_68/rocksdb/content-count-store
> 85M 1_69/rocksdb/content-count-store
> 87M 1_7/rocksdb/content-count-store
> 83M 1_70/rocksdb/content-count-store
> 84M 1_71/rocksdb/content-count-store
> 89M 1_72/rocksdb/content-count-store
> 82M 1_73/rocksdb/content-count-store
> 84M 1_74/rocksdb/content-count-store
> 86M 1_75/rocksdb/content-count-store
> 92M 1_76/rocksdb/content-count-store
> 85M 1_77/rocksdb/content-count-store
> 92M 1_78/rocksdb/content-count-store
> 84M 1_79/rocksdb/content-count-store
> 109M 1_8/rocksdb/content-count-store
> 99M 1_80/rocksdb/content-count-store
> 88M 1_81/rocksdb/content-count-store
> 103M 1_82/rocksdb/content-count-store
> 95M 1_83/rocksdb/content-count-store
> 89M 1_84/rocksdb/content-count-store
> 93M 1_85/rocksdb/content-count-store
> 84M 1_86/rocksdb/content-count-store
> 89M 1_87/rocksdb/content-count-store
> 95M 1_88/rocksdb/content-count-store
> 87M 1_89/rocksdb/content-count-store
> 89M 1_9/rocksdb/content-count-store
> 87M 1_90/rocksdb/content-count-store
> 100M 1_91/rocksdb/content-count-store
> 93M 1_92/rocksdb/content-count-store
> 93M 1_93/rocksdb/content-count-store
> 88M 1_94/rocksdb/content-count-store
> 88M 1_95/rocksdb/content-count-store
> When the application starts up it goes through the joining process, it
> takes a couple of minutes and pulls down this
> 8gb of data into the state directory. After that, the application starts
> processing, sometimes it will give an exception
> like org.apache.kafka.streams.errors.LockException: task [1_57] Failed to
> lock the state directory: /data/kafka-streams/stream-processors-id/1_57
> Other times it runs successfully and eventually throws a garbage collection
> error:
> 2017-02-27 11:57:55,894 - [ERROR] - [StreamThread-3]
> i.z.a.s.processors.RunProcessors - Unexpected Exception caught in thread
> [StreamThread-3]:
> java.lang.OutOfMemoryError: GC overhead limit exceeded
> #011at java.util.Arrays.copyOf(Arrays.java:3332)
> #011at
> java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:124)
> #011at
> java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:448)
> #011at java.lang.StringBuilder.append(StringBuilder.java:136)
> #011at
> org.apache.kafka.common.metrics.JmxReporter.getMBeanName(JmxReporter.java:132)
> #011at
> org.apache.kafka.common.metrics.JmxReporter.removeAttribute(JmxReporter.java:96)
> #011at
> org.apache.kafka.common.metrics.JmxReporter.metricRemoval(JmxReporter.java:84)
> #011at
> org.apache.kafka.common.metrics.Metrics.removeMetric(Metrics.java:417)
> #011at
> org.apache.kafka.common.metrics.Metrics.removeSensor(Metrics.java:367)
> #011at
> org.apache.kafka.common.metrics.Metrics.removeSensor(Metrics.java:375)
> #011at
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.removeSensor(StreamsMetricsImpl.java:205)
> #011at
> org.apache.kafka.streams.processor.internals.ProcessorNode$NodeMetrics.removeAllSensors(ProcessorNode.java:199)
> #011at
> org.apache.kafka.streams.processor.internals.ProcessorNode.close(ProcessorNode.java:123)
> #011at
> org.apache.kafka.streams.processor.internals.StreamTask.closeTopology(StreamTask.java:355)
> #011at
> org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:376)
> #011at
> org.apache.kafka.streams.processor.internals.StreamThread$5.apply(StreamThread.java:1046)
> #011at
> org.apache.kafka.streams.processor.internals.StreamThread.performOnAllTasks(StreamThread.java:503)
> #011at
> org.apache.kafka.streams.processor.internals.StreamThread.closeAllTasks(StreamThread.java:1042)
> #011at
> org.apache.kafka.streams.processor.internals.StreamThread.shutdownTasksAndState(StreamThread.java:451)
> #011at
> org.apache.kafka.streams.processor.internals.StreamThread.shutdown(StreamThread.java:398)
> #011at
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:379)
> I've set the following Java Options to try avoid this:
> "-Xms1024m"
> , "-Xmx6144m"
> , "-XX:+UseConcMarkSweepGC"
> , "-XX:+DisableExplicitGC"
> , "-XX:+CMSClassUnloadingEnabled"
> The streams are running with the following properties
> streams {
>  client.id = "stream-processors-id"
>  application.id = "stream-processors-id"
>  bootstrap.servers = "localhost:9092"
>  replication.factor = "3"
>  num.stream.threads = "4"
>  state.dir = "/tmp/kafka-streams"
>  buffered.records.per.partition = "100"
>  poll.ms = "100"
>  cache.max.bytes.buffering = 10485760
>  commit.interval.ms = "30000"
>  consumer {
>    group.id = "stream-processors-id"
>    auto.offset.reset = "earliest"
>    max.poll.records = "100"
>    session.timeout.ms = "90000"
>    heartbeat.interval.ms = "40000"
>    commit.interval.ms = "30000"
>    max.poll.interval.ms = "300000"
>    max.partition.fetch.bytes = "524288"
>  }
> }
> Thanks,
> Ian.

Reply via email to