Hi Mahendra,

Are you able to share the complete logs? It is pretty hard to tell what is
happening just from a few snippets of information.

Thanks,
Damian

On Wed, 22 Mar 2017 at 12:16 Mahendra Kariya <mahendra.kar...@go-jek.com>
wrote:

> To test Kafka streams on 0.10.2.0, we setup a new Kafka cluster with the
> latest version and used mirror maker to replicate the data from the
> 0.10.0.0 Kafka cluster. We pointed our streaming app to the newly created
> Kafka cluster.
>
> We have 5 nodes, each running the streaming app with 10 threads. In less
> than 10 minutes, the process on all the 5 nodes died with different
> exceptions. Below are the different stack traces we got.
>
> Any help would be really appreciated.
>
> *Stacktrace # 1 (got on 3 of 5 nodes):*
>
> 18:58:00.349 [StreamThread-2] INFO o.a.k.s.p.i.StreamThread - stream-thread
> [StreamThread-2] Stream thread shutdown complete
> 18:58:00.349 [StreamThread-2] WARN o.a.k.s.p.i.StreamThread - Unexpected
> state transition from RUNNING to NOT_RUNNING
> Exception in thread "StreamThread-2"
> org.apache.kafka.streams.errors.StreamsException: Exception caught in
> process. taskId=1_396, processor=KSTREAM-SOURCE-0000000004,
> topic=topicname, partition=396, offset=66839
>         at
>
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:216)
>         at
>
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:641)
>         at
>
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368)
> Caused by: org.apache.kafka.streams.errors.InvalidStateStoreException:
> store %s has closed
>         at
>
> org.apache.kafka.streams.state.internals.RocksDBStore$RocksDbIterator.hasNext(RocksDBStore.java:398)
>         at
>
> org.apache.kafka.streams.state.internals.RocksDBStore$RocksDBRangeIterator.hasNext(RocksDBStore.java:457)
>         at
>
> org.apache.kafka.streams.state.internals.WindowStoreKeySchema$1.hasNext(WindowStoreKeySchema.java:30)
>         at
>
> org.apache.kafka.streams.state.internals.SegmentIterator.hasNext(SegmentIterator.java:69)
>         at
>
> org.apache.kafka.streams.state.internals.MeteredSegmentedBytesStore$MeteredSegmentedBytesStoreIterator.hasNext(MeteredSegmentedBytesStore.java:131)
>         at
>
> org.apache.kafka.streams.state.internals.RocksDBWindowStore$TheWindowStoreIterator.hasNext(RocksDBWindowStore.java:131)
>         at
>
> org.apache.kafka.streams.state.internals.AbstractMergedSortedCacheStoreIterator.hasNext(AbstractMergedSortedCacheStoreIterator.java:74)
>         at
>
> org.apache.kafka.streams.kstream.internals.KStreamWindowAggregate$KStreamWindowAggregateProcessor.process(KStreamWindowAggregate.java:97)
>         at
>
> org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:48)
>         at
>
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
>         at
>
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:134)
>         at
>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:83)
>         at
>
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:70)
>         at
>
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:197)
>         ... 2 more
>
>
> *Stacktrace # 2 (got on 1 node):*
>
> 18:57:44.692 [StreamThread-2] INFO o.a.k.s.p.i.StreamThread - stream-thread
> [StreamThread-2] Stream thread shutdown complete
> 18:57:44.692 [StreamThread-2] WARN o.a.k.s.p.i.StreamThread - Unexpected
> state transition from ASSIGNING_PARTITIONS to NOT_RUNNING
> Exception in thread "StreamThread-2"
> org.apache.kafka.streams.errors.StreamsException: stream-thread
> [StreamThread-2] Failed to rebalance
>         at
>
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:612)
>         at
>
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368)
> Caused by: java.lang.IllegalArgumentException: A metric named 'MetricName
> [name=1_234-sometopic-hitRatio-avg, group=stream-record-cache-metrics,
> description=The current count of 1_234-sometopic hitRatio operation.,
> tags={record-cache-id=1_234-sometopic}]' already exists, can't register
> another one.
>         at
> org.apache.kafka.common.metrics.Metrics.registerMetric(Metrics.java:433)
>         at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:249)
>         at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:234)
>         at
>
> org.apache.kafka.streams.state.internals.NamedCache$NamedCacheMetrics.<init>(NamedCache.java:388)
>         at
>
> org.apache.kafka.streams.state.internals.NamedCache.<init>(NamedCache.java:62)
>         at
>
> org.apache.kafka.streams.state.internals.ThreadCache.getOrCreateCache(ThreadCache.java:226)
>         at
>
> org.apache.kafka.streams.state.internals.ThreadCache.addDirtyEntryFlushListener(ThreadCache.java:87)
>         at
>
> org.apache.kafka.streams.state.internals.CachingWindowStore.initInternal(CachingWindowStore.java:74)
>         at
>
> org.apache.kafka.streams.state.internals.CachingWindowStore.init(CachingWindowStore.java:62)
>         at
>
> org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:86)
>         at
>
> org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:141)
>         at
>
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:834)
>         at
>
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1207)
>         at
>
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1180)
>         at
>
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:937)
>         at
>
> org.apache.kafka.streams.processor.internals.StreamThread.access$500(StreamThread.java:69)
>         at
>
> org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:236)
>         at
>
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:255)
>         at
>
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:339)
>         at
>
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
>         at
>
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286)
>         at
>
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030)
>         at
>
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
>         at
>
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:582)
>         ... 1 more
>
> *Stacktrace # 3 (got on 1 node):*
>
> 19:07:34.827 [StreamThread-1] WARN o.a.k.s.p.i.StreamThread - Could not
> create task 0_192. Will retry.
> org.apache.kafka.streams.errors.LockException: task [0_192] Failed to lock
> the state directory: /tmp/kafka-streams/streams_test_2/0_192
>         at
>
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:102)
> ~[app-name-1.2.1.jar:na]
>         at
>
> org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:73)
> ~[app-name-1.2.1.jar:na]
>         at
>
> org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:108)
> ~[app-name-1.2.1.jar:na]
>         at
>
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:834)
> [app-name-1.2.1.jar:na]
>         at
>
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1207)
> ~[app-name-1.2.1.jar:na]
>         at
>
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1180)
> ~[app-name-1.2.1.jar:na]
>         at
>
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:937)
> [app-name-1.2.1.jar:na]
>         at
>
> org.apache.kafka.streams.processor.internals.StreamThread.access$500(StreamThread.java:69)
> [app-name-1.2.1.jar:na]
>         at
>
> org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:236)
> [app-name-1.2.1.jar:na]
>         at
>
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:255)
> [app-name-1.2.1.jar:na]
>         at
>
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:339)
> [app-name-1.2.1.jar:na]
>         at
>
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
> [app-name-1.2.1.jar:na]
>         at
>
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286)
> [app-name-1.2.1.jar:na]
>         at
>
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030)
> [app-name-1.2.1.jar:na]
>         at
>
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
> [app-name-1.2.1.jar:na]
>         at
>
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:582)
> [app-name-1.2.1.jar:na]
>         at
>
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368)
> [app-name-1.2.1.jar:na]
>
>
>
> On Sat, Mar 18, 2017 at 5:58 AM, Mahendra Kariya <
> mahendra.kar...@go-jek.com
> > wrote:
>
> > Thanks for the heads up Guozhang!
> >
> > The problem is our brokers are on 0.10.0.x. So we will have to upgrade
> > them.
> >
> > On Sat, Mar 18, 2017 at 12:30 AM, Guozhang Wang <wangg...@gmail.com>
> > wrote:
> >
> >> Hi Mahendra,
> >>
> >> Just a kind reminder that upgrading Streams to 0.10.2 does not
> necessarily
> >> require you to upgrade brokers to 0.10.2 as well. Since we have added a
> >> new
> >> feature since 0.10.2 to allow newer versioned clients (producer,
> consumer,
> >> streams) to talk to older versioned brokers, and for Streams
> specifically
> >> it only requires brokers to be no older than 0.10.1.
> >>
> >>
> >> Guozhang
> >>
> >>
> >> On Mon, Mar 13, 2017 at 5:12 AM, Mahendra Kariya <
> >> mahendra.kar...@go-jek.com
> >> > wrote:
> >>
> >> > We are planning to migrate to the newer version of Kafka. But that's a
> >> few
> >> > weeks away.
> >> >
> >> > We will try setting the socket config and see how it turns out.
> >> >
> >> > Thanks a lot for your response!
> >> >
> >> >
> >> >
> >> > On Mon, Mar 13, 2017 at 3:21 PM, Eno Thereska <eno.there...@gmail.com
> >
> >> > wrote:
> >> >
> >> > > Thanks,
> >> > >
> >> > > A couple of things:
> >> > > - I’d recommend moving to 0.10.2 (latest release) if you can since
> >> > several
> >> > > improvements were made in the last two releases that make
> rebalancing
> >> and
> >> > > performance better.
> >> > >
> >> > > - When running on environments with large latency on AWS at least
> >> > (haven’t
> >> > > tried Google cloud), one parameter we have found useful to increase
> >> > > performance is the receive and send socket size for the consumer and
> >> > > producer in streams. We’d recommend setting them to 1MB like this
> >> (where
> >> > > “props” is your own properties object when you start streams):
> >> > >
> >> > > // the socket buffer needs to be large, especially when running in
> AWS
> >> > with
> >> > > // high latency. if running locally the default is fine.
> >> > > props.put(ProducerConfig.SEND_BUFFER_CONFIG, 1024 * 1024);
> >> > > props.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 1024 * 1024);
> >> > >
> >> > > Make sure the OS allows the larger socket size too.
> >> > >
> >> > > Thanks
> >> > > Eno
> >> > >
> >> > > > On Mar 13, 2017, at 9:21 AM, Mahendra Kariya <
> >> > mahendra.kar...@go-jek.com>
> >> > > wrote:
> >> > > >
> >> > > > Hi Eno,
> >> > > >
> >> > > > Please find my answers inline.
> >> > > >
> >> > > >
> >> > > > We are in the process of documenting capacity planning for
> streams,
> >> > stay
> >> > > tuned.
> >> > > >
> >> > > > This would be great! Looking forward to it.
> >> > > >
> >> > > > Could you send some more info on your problem? What Kafka version
> >> are
> >> > > you using?
> >> > > >
> >> > > > We are using Kafka 0.10.0.0.
> >> > > >
> >> > > > Are the VMs on the same or different hosts?
> >> > > >
> >> > > > The VMs are on Google Cloud. Two of them are in asia-east1-a and
> >> one is
> >> > > in asia-east1-c. All three are n1-standard-4 Ubuntu instances.
> >> > > >
> >> > > > Also what exactly do you mean by “the lag keeps fluctuating”, what
> >> > > metric are you looking at?
> >> > > >
> >> > > > We are looking at Kafka Manager for the time being. By
> fluctuating,
> >> I
> >> > > mean the lag is few thousands at one time, we refresh it the next
> >> second,
> >> > > it is in few lakhs, and again refresh it and it is few thousands. I
> >> > > understand this may not be very accurate. We will soon have more
> >> accurate
> >> > > data once we start pushing the consumer lag metric to Datadog.
> >> > > >
> >> > > > But on a separate note, the difference between lags on different
> >> > > partitions is way too high. I have attached a tab separated file
> >> herewith
> >> > > which shows the consumer lag (from Kafka Manager) for the first the
> 50
> >> > > partitions. As is clear, the lag on partition 2 is 530 while the lag
> >> on
> >> > > partition 18 is 23K. Note that the same VM is pulling data from both
> >> the
> >> > > partitions.
> >> > > >
> >> > > >
> >> > > >
> >> > > >
> >> > > > <KafkaLags.tsv>
> >> > >
> >> > >
> >> >
> >>
> >>
> >>
> >> --
> >> -- Guozhang
> >>
> >
> >
>

Reply via email to