I assume that you are hitting a know bug. KIP-91 is addressing this issue.

The recommended workaround is to increase the request.timeout.ms parameter.


-Matthias

On 1/25/18 10:04 AM, Dan Bress wrote:
> Hi,
> 
> I'm running an app on Kafka Streams 1.0.0, and in the past day a lot of
> nodes are failing and I see this in the log.
> 
> These appear to be failures when attempting to update the changelog.  Any
> ideas on what I should do to work around this?  Should I configure separate
> retry and timeouts for the changelog producer?  If so How do I do that?
> 
> 
> org.apache.kafka.streams.errors.StreamsException: stream-thread
> [dp-insightstrack-staging1-9cc75371-000c-420a-b9d7-b46b2b4bab5c-StreamThread-3]
> Failed to rebalance.
> at
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:860)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:808)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744)
> Caused by: org.apache.kafka.streams.errors.StreamsException: stream-thread
> [dp-insightstrack-staging1-9cc75371-000c-420a-b9d7-b46b2b4bab5c-StreamThread-3]
> failed to suspend stream tasks
> at
> org.apache.kafka.streams.processor.internals.TaskManager.suspendTasksAndState(TaskManager.java:204)
> at
> org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsRevoked(StreamThread.java:291)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinPrepare(ConsumerCoordinator.java:418)
> at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:359)
> at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:316)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:295)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1138)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1103)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:851)
> ... 3 common frames omitted
> Caused by: org.apache.kafka.streams.errors.ProcessorStateException: task
> [0_18] Failed to flush state store summarykey-to-summary
> at
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:248)
> at
> org.apache.kafka.streams.processor.internals.AbstractTask.flushState(AbstractTask.java:196)
> at
> org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:324)
> at
> org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:304)
> at
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
> at
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:299)
> at
> org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:414)
> at
> org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:396)
> at
> org.apache.kafka.streams.processor.internals.AssignedTasks.suspendTasks(AssignedTasks.java:219)
> at
> org.apache.kafka.streams.processor.internals.AssignedTasks.suspend(AssignedTasks.java:184)
> at
> org.apache.kafka.streams.processor.internals.TaskManager.suspendTasksAndState(TaskManager.java:197)
> ... 11 common frames omitted
> Caused by: org.apache.kafka.streams.errors.StreamsException: task [0_18]
> Abort sending since an error caught with a previous record (key
> ComparableSummaryKey(SummaryKey(GnipStream(172819),RuleId(951574687410094106),Some(TweetId(954107387677433858))))
> value
> TweetSummaryCountsWindow(SummaryKey(GnipStream(172819),RuleId(951574687410094106),Some(TweetId(954107387677433858))),168,1516311902337,WindowCounts(1516742462337,LongHyperLogLogPlus(com.clearspring.analytics.stream.cardinality.HyperLogLogPlus@2e76a045
> ),LongHyperLogLogPlus(com.clearspring.analytics.stream.cardinality.HyperLogLogPlus@8610f558
> ),None,-1,false,MetricMap(true,[I@74cc9b69
> ),com.twitter.datainsights.insightstrack.domain.InteractionSourceSummary@5653a508),None,false)
> timestamp 1516742492445) to topic
> dp-insightstrack-staging1-summarykey-to-summary-changelog due to
> org.apache.kafka.common.errors.TimeoutException: Expiring 7 record(s) for
> dp-insightstrack-staging1-summarykey-to-summary-changelog-18: 30011 ms has
> passed since last append.
> at
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:118)
> at
> org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:204)
> at
> org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:187)
> at
> org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:627)
> at
> org.apache.kafka.clients.producer.internals.Sender.sendProducerData(Sender.java:287)
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:238)
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 7
> record(s) for dp-insightstrack-staging1-summarykey-to-summary-changelog-18:
> 30011 ms has passed since last append
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to