Hi Mahendra, Are you able to share the complete logs? It is pretty hard to tell what is happening just from a few snippets of information.
Thanks, Damian On Wed, 22 Mar 2017 at 12:16 Mahendra Kariya <mahendra.kar...@go-jek.com> wrote: > To test Kafka streams on 0.10.2.0, we setup a new Kafka cluster with the > latest version and used mirror maker to replicate the data from the > 0.10.0.0 Kafka cluster. We pointed our streaming app to the newly created > Kafka cluster. > > We have 5 nodes, each running the streaming app with 10 threads. In less > than 10 minutes, the process on all the 5 nodes died with different > exceptions. Below are the different stack traces we got. > > Any help would be really appreciated. > > *Stacktrace # 1 (got on 3 of 5 nodes):* > > 18:58:00.349 [StreamThread-2] INFO o.a.k.s.p.i.StreamThread - stream-thread > [StreamThread-2] Stream thread shutdown complete > 18:58:00.349 [StreamThread-2] WARN o.a.k.s.p.i.StreamThread - Unexpected > state transition from RUNNING to NOT_RUNNING > Exception in thread "StreamThread-2" > org.apache.kafka.streams.errors.StreamsException: Exception caught in > process. taskId=1_396, processor=KSTREAM-SOURCE-0000000004, > topic=topicname, partition=396, offset=66839 > at > > org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:216) > at > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:641) > at > > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368) > Caused by: org.apache.kafka.streams.errors.InvalidStateStoreException: > store %s has closed > at > > org.apache.kafka.streams.state.internals.RocksDBStore$RocksDbIterator.hasNext(RocksDBStore.java:398) > at > > org.apache.kafka.streams.state.internals.RocksDBStore$RocksDBRangeIterator.hasNext(RocksDBStore.java:457) > at > > org.apache.kafka.streams.state.internals.WindowStoreKeySchema$1.hasNext(WindowStoreKeySchema.java:30) > at > > org.apache.kafka.streams.state.internals.SegmentIterator.hasNext(SegmentIterator.java:69) > at > > org.apache.kafka.streams.state.internals.MeteredSegmentedBytesStore$MeteredSegmentedBytesStoreIterator.hasNext(MeteredSegmentedBytesStore.java:131) > at > > org.apache.kafka.streams.state.internals.RocksDBWindowStore$TheWindowStoreIterator.hasNext(RocksDBWindowStore.java:131) > at > > org.apache.kafka.streams.state.internals.AbstractMergedSortedCacheStoreIterator.hasNext(AbstractMergedSortedCacheStoreIterator.java:74) > at > > org.apache.kafka.streams.kstream.internals.KStreamWindowAggregate$KStreamWindowAggregateProcessor.process(KStreamWindowAggregate.java:97) > at > > org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:48) > at > > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188) > at > > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:134) > at > > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:83) > at > > org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:70) > at > > org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:197) > ... 2 more > > > *Stacktrace # 2 (got on 1 node):* > > 18:57:44.692 [StreamThread-2] INFO o.a.k.s.p.i.StreamThread - stream-thread > [StreamThread-2] Stream thread shutdown complete > 18:57:44.692 [StreamThread-2] WARN o.a.k.s.p.i.StreamThread - Unexpected > state transition from ASSIGNING_PARTITIONS to NOT_RUNNING > Exception in thread "StreamThread-2" > org.apache.kafka.streams.errors.StreamsException: stream-thread > [StreamThread-2] Failed to rebalance > at > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:612) > at > > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368) > Caused by: java.lang.IllegalArgumentException: A metric named 'MetricName > [name=1_234-sometopic-hitRatio-avg, group=stream-record-cache-metrics, > description=The current count of 1_234-sometopic hitRatio operation., > tags={record-cache-id=1_234-sometopic}]' already exists, can't register > another one. > at > org.apache.kafka.common.metrics.Metrics.registerMetric(Metrics.java:433) > at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:249) > at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:234) > at > > org.apache.kafka.streams.state.internals.NamedCache$NamedCacheMetrics.<init>(NamedCache.java:388) > at > > org.apache.kafka.streams.state.internals.NamedCache.<init>(NamedCache.java:62) > at > > org.apache.kafka.streams.state.internals.ThreadCache.getOrCreateCache(ThreadCache.java:226) > at > > org.apache.kafka.streams.state.internals.ThreadCache.addDirtyEntryFlushListener(ThreadCache.java:87) > at > > org.apache.kafka.streams.state.internals.CachingWindowStore.initInternal(CachingWindowStore.java:74) > at > > org.apache.kafka.streams.state.internals.CachingWindowStore.init(CachingWindowStore.java:62) > at > > org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:86) > at > > org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:141) > at > > org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:834) > at > > org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1207) > at > > org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1180) > at > > org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:937) > at > > org.apache.kafka.streams.processor.internals.StreamThread.access$500(StreamThread.java:69) > at > > org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:236) > at > > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:255) > at > > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:339) > at > > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303) > at > > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286) > at > > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030) > at > > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) > at > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:582) > ... 1 more > > *Stacktrace # 3 (got on 1 node):* > > 19:07:34.827 [StreamThread-1] WARN o.a.k.s.p.i.StreamThread - Could not > create task 0_192. Will retry. > org.apache.kafka.streams.errors.LockException: task [0_192] Failed to lock > the state directory: /tmp/kafka-streams/streams_test_2/0_192 > at > > org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:102) > ~[app-name-1.2.1.jar:na] > at > > org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:73) > ~[app-name-1.2.1.jar:na] > at > > org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:108) > ~[app-name-1.2.1.jar:na] > at > > org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:834) > [app-name-1.2.1.jar:na] > at > > org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1207) > ~[app-name-1.2.1.jar:na] > at > > org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1180) > ~[app-name-1.2.1.jar:na] > at > > org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:937) > [app-name-1.2.1.jar:na] > at > > org.apache.kafka.streams.processor.internals.StreamThread.access$500(StreamThread.java:69) > [app-name-1.2.1.jar:na] > at > > org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:236) > [app-name-1.2.1.jar:na] > at > > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:255) > [app-name-1.2.1.jar:na] > at > > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:339) > [app-name-1.2.1.jar:na] > at > > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303) > [app-name-1.2.1.jar:na] > at > > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286) > [app-name-1.2.1.jar:na] > at > > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030) > [app-name-1.2.1.jar:na] > at > > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) > [app-name-1.2.1.jar:na] > at > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:582) > [app-name-1.2.1.jar:na] > at > > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368) > [app-name-1.2.1.jar:na] > > > > On Sat, Mar 18, 2017 at 5:58 AM, Mahendra Kariya < > mahendra.kar...@go-jek.com > > wrote: > > > Thanks for the heads up Guozhang! > > > > The problem is our brokers are on 0.10.0.x. So we will have to upgrade > > them. > > > > On Sat, Mar 18, 2017 at 12:30 AM, Guozhang Wang <wangg...@gmail.com> > > wrote: > > > >> Hi Mahendra, > >> > >> Just a kind reminder that upgrading Streams to 0.10.2 does not > necessarily > >> require you to upgrade brokers to 0.10.2 as well. Since we have added a > >> new > >> feature since 0.10.2 to allow newer versioned clients (producer, > consumer, > >> streams) to talk to older versioned brokers, and for Streams > specifically > >> it only requires brokers to be no older than 0.10.1. > >> > >> > >> Guozhang > >> > >> > >> On Mon, Mar 13, 2017 at 5:12 AM, Mahendra Kariya < > >> mahendra.kar...@go-jek.com > >> > wrote: > >> > >> > We are planning to migrate to the newer version of Kafka. But that's a > >> few > >> > weeks away. > >> > > >> > We will try setting the socket config and see how it turns out. > >> > > >> > Thanks a lot for your response! > >> > > >> > > >> > > >> > On Mon, Mar 13, 2017 at 3:21 PM, Eno Thereska <eno.there...@gmail.com > > > >> > wrote: > >> > > >> > > Thanks, > >> > > > >> > > A couple of things: > >> > > - I’d recommend moving to 0.10.2 (latest release) if you can since > >> > several > >> > > improvements were made in the last two releases that make > rebalancing > >> and > >> > > performance better. > >> > > > >> > > - When running on environments with large latency on AWS at least > >> > (haven’t > >> > > tried Google cloud), one parameter we have found useful to increase > >> > > performance is the receive and send socket size for the consumer and > >> > > producer in streams. We’d recommend setting them to 1MB like this > >> (where > >> > > “props” is your own properties object when you start streams): > >> > > > >> > > // the socket buffer needs to be large, especially when running in > AWS > >> > with > >> > > // high latency. if running locally the default is fine. > >> > > props.put(ProducerConfig.SEND_BUFFER_CONFIG, 1024 * 1024); > >> > > props.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 1024 * 1024); > >> > > > >> > > Make sure the OS allows the larger socket size too. > >> > > > >> > > Thanks > >> > > Eno > >> > > > >> > > > On Mar 13, 2017, at 9:21 AM, Mahendra Kariya < > >> > mahendra.kar...@go-jek.com> > >> > > wrote: > >> > > > > >> > > > Hi Eno, > >> > > > > >> > > > Please find my answers inline. > >> > > > > >> > > > > >> > > > We are in the process of documenting capacity planning for > streams, > >> > stay > >> > > tuned. > >> > > > > >> > > > This would be great! Looking forward to it. > >> > > > > >> > > > Could you send some more info on your problem? What Kafka version > >> are > >> > > you using? > >> > > > > >> > > > We are using Kafka 0.10.0.0. > >> > > > > >> > > > Are the VMs on the same or different hosts? > >> > > > > >> > > > The VMs are on Google Cloud. Two of them are in asia-east1-a and > >> one is > >> > > in asia-east1-c. All three are n1-standard-4 Ubuntu instances. > >> > > > > >> > > > Also what exactly do you mean by “the lag keeps fluctuating”, what > >> > > metric are you looking at? > >> > > > > >> > > > We are looking at Kafka Manager for the time being. By > fluctuating, > >> I > >> > > mean the lag is few thousands at one time, we refresh it the next > >> second, > >> > > it is in few lakhs, and again refresh it and it is few thousands. I > >> > > understand this may not be very accurate. We will soon have more > >> accurate > >> > > data once we start pushing the consumer lag metric to Datadog. > >> > > > > >> > > > But on a separate note, the difference between lags on different > >> > > partitions is way too high. I have attached a tab separated file > >> herewith > >> > > which shows the consumer lag (from Kafka Manager) for the first the > 50 > >> > > partitions. As is clear, the lag on partition 2 is 530 while the lag > >> on > >> > > partition 18 is 23K. Note that the same VM is pulling data from both > >> the > >> > > partitions. > >> > > > > >> > > > > >> > > > > >> > > > > >> > > > <KafkaLags.tsv> > >> > > > >> > > > >> > > >> > >> > >> > >> -- > >> -- Guozhang > >> > > > > >