Hi Damian, The rest of the logs were INFO messages about offset being committed.
Anyways, the problem is resolved for now, after we increased the max.poll.interval.ms. For anyone else who is facing similar problem, please refer this thread. https://groups.google.com/forum/#!topic/confluent-platform/wgCSuwIJo5g On Wed, Mar 22, 2017 at 7:11 PM, Damian Guy <damian....@gmail.com> wrote: > Hi Mahendra, > > Are you able to share the complete logs? It is pretty hard to tell what is > happening just from a few snippets of information. > > Thanks, > Damian > > On Wed, 22 Mar 2017 at 12:16 Mahendra Kariya <mahendra.kar...@go-jek.com> > wrote: > > > To test Kafka streams on 0.10.2.0, we setup a new Kafka cluster with the > > latest version and used mirror maker to replicate the data from the > > 0.10.0.0 Kafka cluster. We pointed our streaming app to the newly created > > Kafka cluster. > > > > We have 5 nodes, each running the streaming app with 10 threads. In less > > than 10 minutes, the process on all the 5 nodes died with different > > exceptions. Below are the different stack traces we got. > > > > Any help would be really appreciated. > > > > *Stacktrace # 1 (got on 3 of 5 nodes):* > > > > 18:58:00.349 [StreamThread-2] INFO o.a.k.s.p.i.StreamThread - > stream-thread > > [StreamThread-2] Stream thread shutdown complete > > 18:58:00.349 [StreamThread-2] WARN o.a.k.s.p.i.StreamThread - Unexpected > > state transition from RUNNING to NOT_RUNNING > > Exception in thread "StreamThread-2" > > org.apache.kafka.streams.errors.StreamsException: Exception caught in > > process. taskId=1_396, processor=KSTREAM-SOURCE-0000000004, > > topic=topicname, partition=396, offset=66839 > > at > > > > org.apache.kafka.streams.processor.internals. > StreamTask.process(StreamTask.java:216) > > at > > > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop( > StreamThread.java:641) > > at > > > > org.apache.kafka.streams.processor.internals. > StreamThread.run(StreamThread.java:368) > > Caused by: org.apache.kafka.streams.errors.InvalidStateStoreException: > > store %s has closed > > at > > > > org.apache.kafka.streams.state.internals.RocksDBStore$ > RocksDbIterator.hasNext(RocksDBStore.java:398) > > at > > > > org.apache.kafka.streams.state.internals.RocksDBStore$ > RocksDBRangeIterator.hasNext(RocksDBStore.java:457) > > at > > > > org.apache.kafka.streams.state.internals.WindowStoreKeySchema$1. > hasNext(WindowStoreKeySchema.java:30) > > at > > > > org.apache.kafka.streams.state.internals.SegmentIterator.hasNext( > SegmentIterator.java:69) > > at > > > > org.apache.kafka.streams.state.internals.MeteredSegmentedBytesStore$ > MeteredSegmentedBytesStoreIterator.hasNext(MeteredSegmentedBytesStore. > java:131) > > at > > > > org.apache.kafka.streams.state.internals.RocksDBWindowStore$ > TheWindowStoreIterator.hasNext(RocksDBWindowStore.java:131) > > at > > > > org.apache.kafka.streams.state.internals.AbstractMergedSortedCacheStore > Iterator.hasNext(AbstractMergedSortedCacheStoreIterator.java:74) > > at > > > > org.apache.kafka.streams.kstream.internals.KStreamWindowAggregate$ > KStreamWindowAggregateProcessor.process(KStreamWindowAggregate.java:97) > > at > > > > org.apache.kafka.streams.processor.internals.ProcessorNode$1.run( > ProcessorNode.java:48) > > at > > > > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl. > measureLatencyNs(StreamsMetricsImpl.java:188) > > at > > > > org.apache.kafka.streams.processor.internals.ProcessorNode.process( > ProcessorNode.java:134) > > at > > > > org.apache.kafka.streams.processor.internals. > ProcessorContextImpl.forward(ProcessorContextImpl.java:83) > > at > > > > org.apache.kafka.streams.processor.internals. > SourceNode.process(SourceNode.java:70) > > at > > > > org.apache.kafka.streams.processor.internals. > StreamTask.process(StreamTask.java:197) > > ... 2 more > > > > > > *Stacktrace # 2 (got on 1 node):* > > > > 18:57:44.692 [StreamThread-2] INFO o.a.k.s.p.i.StreamThread - > stream-thread > > [StreamThread-2] Stream thread shutdown complete > > 18:57:44.692 [StreamThread-2] WARN o.a.k.s.p.i.StreamThread - Unexpected > > state transition from ASSIGNING_PARTITIONS to NOT_RUNNING > > Exception in thread "StreamThread-2" > > org.apache.kafka.streams.errors.StreamsException: stream-thread > > [StreamThread-2] Failed to rebalance > > at > > > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop( > StreamThread.java:612) > > at > > > > org.apache.kafka.streams.processor.internals. > StreamThread.run(StreamThread.java:368) > > Caused by: java.lang.IllegalArgumentException: A metric named > 'MetricName > > [name=1_234-sometopic-hitRatio-avg, group=stream-record-cache-metrics, > > description=The current count of 1_234-sometopic hitRatio operation., > > tags={record-cache-id=1_234-sometopic}]' already exists, can't register > > another one. > > at > > org.apache.kafka.common.metrics.Metrics.registerMetric(Metrics.java:433) > > at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:249) > > at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:234) > > at > > > > org.apache.kafka.streams.state.internals.NamedCache$ > NamedCacheMetrics.<init>(NamedCache.java:388) > > at > > > > org.apache.kafka.streams.state.internals.NamedCache.< > init>(NamedCache.java:62) > > at > > > > org.apache.kafka.streams.state.internals.ThreadCache. > getOrCreateCache(ThreadCache.java:226) > > at > > > > org.apache.kafka.streams.state.internals.ThreadCache. > addDirtyEntryFlushListener(ThreadCache.java:87) > > at > > > > org.apache.kafka.streams.state.internals.CachingWindowStore. > initInternal(CachingWindowStore.java:74) > > at > > > > org.apache.kafka.streams.state.internals.CachingWindowStore.init( > CachingWindowStore.java:62) > > at > > > > org.apache.kafka.streams.processor.internals.AbstractTask. > initializeStateStores(AbstractTask.java:86) > > at > > > > org.apache.kafka.streams.processor.internals. > StreamTask.<init>(StreamTask.java:141) > > at > > > > org.apache.kafka.streams.processor.internals. > StreamThread.createStreamTask(StreamThread.java:834) > > at > > > > org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator. > createTask(StreamThread.java:1207) > > at > > > > org.apache.kafka.streams.processor.internals.StreamThread$ > AbstractTaskCreator.retryWithBackoff(StreamThread.java:1180) > > at > > > > org.apache.kafka.streams.processor.internals. > StreamThread.addStreamTasks(StreamThread.java:937) > > at > > > > org.apache.kafka.streams.processor.internals.StreamThread.access$500( > StreamThread.java:69) > > at > > > > org.apache.kafka.streams.processor.internals.StreamThread$1. > onPartitionsAssigned(StreamThread.java:236) > > at > > > > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator. > onJoinComplete(ConsumerCoordinator.java:255) > > at > > > > org.apache.kafka.clients.consumer.internals.AbstractCoordinator. > joinGroupIfNeeded(AbstractCoordinator.java:339) > > at > > > > org.apache.kafka.clients.consumer.internals.AbstractCoordinator. > ensureActiveGroup(AbstractCoordinator.java:303) > > at > > > > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll( > ConsumerCoordinator.java:286) > > at > > > > org.apache.kafka.clients.consumer.KafkaConsumer. > pollOnce(KafkaConsumer.java:1030) > > at > > > > org.apache.kafka.clients.consumer.KafkaConsumer.poll( > KafkaConsumer.java:995) > > at > > > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop( > StreamThread.java:582) > > ... 1 more > > > > *Stacktrace # 3 (got on 1 node):* > > > > 19:07:34.827 [StreamThread-1] WARN o.a.k.s.p.i.StreamThread - Could not > > create task 0_192. Will retry. > > org.apache.kafka.streams.errors.LockException: task [0_192] Failed to > lock > > the state directory: /tmp/kafka-streams/streams_test_2/0_192 > > at > > > > org.apache.kafka.streams.processor.internals. > ProcessorStateManager.<init>(ProcessorStateManager.java:102) > > ~[app-name-1.2.1.jar:na] > > at > > > > org.apache.kafka.streams.processor.internals.AbstractTask.<init>( > AbstractTask.java:73) > > ~[app-name-1.2.1.jar:na] > > at > > > > org.apache.kafka.streams.processor.internals. > StreamTask.<init>(StreamTask.java:108) > > ~[app-name-1.2.1.jar:na] > > at > > > > org.apache.kafka.streams.processor.internals. > StreamThread.createStreamTask(StreamThread.java:834) > > [app-name-1.2.1.jar:na] > > at > > > > org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator. > createTask(StreamThread.java:1207) > > ~[app-name-1.2.1.jar:na] > > at > > > > org.apache.kafka.streams.processor.internals.StreamThread$ > AbstractTaskCreator.retryWithBackoff(StreamThread.java:1180) > > ~[app-name-1.2.1.jar:na] > > at > > > > org.apache.kafka.streams.processor.internals. > StreamThread.addStreamTasks(StreamThread.java:937) > > [app-name-1.2.1.jar:na] > > at > > > > org.apache.kafka.streams.processor.internals.StreamThread.access$500( > StreamThread.java:69) > > [app-name-1.2.1.jar:na] > > at > > > > org.apache.kafka.streams.processor.internals.StreamThread$1. > onPartitionsAssigned(StreamThread.java:236) > > [app-name-1.2.1.jar:na] > > at > > > > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator. > onJoinComplete(ConsumerCoordinator.java:255) > > [app-name-1.2.1.jar:na] > > at > > > > org.apache.kafka.clients.consumer.internals.AbstractCoordinator. > joinGroupIfNeeded(AbstractCoordinator.java:339) > > [app-name-1.2.1.jar:na] > > at > > > > org.apache.kafka.clients.consumer.internals.AbstractCoordinator. > ensureActiveGroup(AbstractCoordinator.java:303) > > [app-name-1.2.1.jar:na] > > at > > > > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll( > ConsumerCoordinator.java:286) > > [app-name-1.2.1.jar:na] > > at > > > > org.apache.kafka.clients.consumer.KafkaConsumer. > pollOnce(KafkaConsumer.java:1030) > > [app-name-1.2.1.jar:na] > > at > > > > org.apache.kafka.clients.consumer.KafkaConsumer.poll( > KafkaConsumer.java:995) > > [app-name-1.2.1.jar:na] > > at > > > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop( > StreamThread.java:582) > > [app-name-1.2.1.jar:na] > > at > > > > org.apache.kafka.streams.processor.internals. > StreamThread.run(StreamThread.java:368) > > [app-name-1.2.1.jar:na] > > > > > > > > On Sat, Mar 18, 2017 at 5:58 AM, Mahendra Kariya < > > mahendra.kar...@go-jek.com > > > wrote: > > > > > Thanks for the heads up Guozhang! > > > > > > The problem is our brokers are on 0.10.0.x. So we will have to upgrade > > > them. > > > > > > On Sat, Mar 18, 2017 at 12:30 AM, Guozhang Wang <wangg...@gmail.com> > > > wrote: > > > > > >> Hi Mahendra, > > >> > > >> Just a kind reminder that upgrading Streams to 0.10.2 does not > > necessarily > > >> require you to upgrade brokers to 0.10.2 as well. Since we have added > a > > >> new > > >> feature since 0.10.2 to allow newer versioned clients (producer, > > consumer, > > >> streams) to talk to older versioned brokers, and for Streams > > specifically > > >> it only requires brokers to be no older than 0.10.1. > > >> > > >> > > >> Guozhang > > >> > > >> > > >> On Mon, Mar 13, 2017 at 5:12 AM, Mahendra Kariya < > > >> mahendra.kar...@go-jek.com > > >> > wrote: > > >> > > >> > We are planning to migrate to the newer version of Kafka. But > that's a > > >> few > > >> > weeks away. > > >> > > > >> > We will try setting the socket config and see how it turns out. > > >> > > > >> > Thanks a lot for your response! > > >> > > > >> > > > >> > > > >> > On Mon, Mar 13, 2017 at 3:21 PM, Eno Thereska < > eno.there...@gmail.com > > > > > >> > wrote: > > >> > > > >> > > Thanks, > > >> > > > > >> > > A couple of things: > > >> > > - I’d recommend moving to 0.10.2 (latest release) if you can since > > >> > several > > >> > > improvements were made in the last two releases that make > > rebalancing > > >> and > > >> > > performance better. > > >> > > > > >> > > - When running on environments with large latency on AWS at least > > >> > (haven’t > > >> > > tried Google cloud), one parameter we have found useful to > increase > > >> > > performance is the receive and send socket size for the consumer > and > > >> > > producer in streams. We’d recommend setting them to 1MB like this > > >> (where > > >> > > “props” is your own properties object when you start streams): > > >> > > > > >> > > // the socket buffer needs to be large, especially when running in > > AWS > > >> > with > > >> > > // high latency. if running locally the default is fine. > > >> > > props.put(ProducerConfig.SEND_BUFFER_CONFIG, 1024 * 1024); > > >> > > props.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 1024 * 1024); > > >> > > > > >> > > Make sure the OS allows the larger socket size too. > > >> > > > > >> > > Thanks > > >> > > Eno > > >> > > > > >> > > > On Mar 13, 2017, at 9:21 AM, Mahendra Kariya < > > >> > mahendra.kar...@go-jek.com> > > >> > > wrote: > > >> > > > > > >> > > > Hi Eno, > > >> > > > > > >> > > > Please find my answers inline. > > >> > > > > > >> > > > > > >> > > > We are in the process of documenting capacity planning for > > streams, > > >> > stay > > >> > > tuned. > > >> > > > > > >> > > > This would be great! Looking forward to it. > > >> > > > > > >> > > > Could you send some more info on your problem? What Kafka > version > > >> are > > >> > > you using? > > >> > > > > > >> > > > We are using Kafka 0.10.0.0. > > >> > > > > > >> > > > Are the VMs on the same or different hosts? > > >> > > > > > >> > > > The VMs are on Google Cloud. Two of them are in asia-east1-a and > > >> one is > > >> > > in asia-east1-c. All three are n1-standard-4 Ubuntu instances. > > >> > > > > > >> > > > Also what exactly do you mean by “the lag keeps fluctuating”, > what > > >> > > metric are you looking at? > > >> > > > > > >> > > > We are looking at Kafka Manager for the time being. By > > fluctuating, > > >> I > > >> > > mean the lag is few thousands at one time, we refresh it the next > > >> second, > > >> > > it is in few lakhs, and again refresh it and it is few thousands. > I > > >> > > understand this may not be very accurate. We will soon have more > > >> accurate > > >> > > data once we start pushing the consumer lag metric to Datadog. > > >> > > > > > >> > > > But on a separate note, the difference between lags on different > > >> > > partitions is way too high. I have attached a tab separated file > > >> herewith > > >> > > which shows the consumer lag (from Kafka Manager) for the first > the > > 50 > > >> > > partitions. As is clear, the lag on partition 2 is 530 while the > lag > > >> on > > >> > > partition 18 is 23K. Note that the same VM is pulling data from > both > > >> the > > >> > > partitions. > > >> > > > > > >> > > > > > >> > > > > > >> > > > > > >> > > > <KafkaLags.tsv> > > >> > > > > >> > > > > >> > > > >> > > >> > > >> > > >> -- > > >> -- Guozhang > > >> > > > > > > > > >