[ https://issues.apache.org/jira/browse/KAFKA-4799?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Guozhang Wang resolved KAFKA-4799. ---------------------------------- Resolution: Fixed Fix Version/s: 0.11.0.1 > session timeout during event processing shuts down stream > --------------------------------------------------------- > > Key: KAFKA-4799 > URL: https://issues.apache.org/jira/browse/KAFKA-4799 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 0.10.1.1 > Environment: kafka streams client running on os x, with docker > machine running broker > Reporter: Jacob Gur > Priority: Critical > Fix For: 0.11.0.1 > > > I have a simple stream application like this: > {code:title=Part of my class|borderStyle=solid} > private <T> IConsumerSubscription buildSubscriptionStream( > Class<T> clazz, Consumer<T> consumer, String group, > Function<KStreamBuilder, KStream<String, String>> > topicStreamFunc) > { > KStreamBuilder builder = new KStreamBuilder(); > KStream<String, String> stream = topicStreamFunc.apply(builder); > stream.foreach((k, v) -> { > try { > T value = > _jsonObjectMapper.mapFromJsonString(v, clazz); > consumer.accept(value); > Logger.trace("Consumed message {}", value); > } catch (Throwable th) { > Logger.warn("Error while consuming message", > th); > } > }); > final KafkaStreams streams = new KafkaStreams(builder, > constructProperties(group)); > streams.start(); > return streams::close; > } > {code} > There is just one client running this application stream. > If I run the client in a debugger with a breakpoint on the event processor > (i.e., inside the foreach lambda) with debugger suspending all threads for > perhaps more than 10 seconds, then when I resume the application: > Actual behavior - the stream shuts down > Expected behavior - the stream should recover, perhaps temporarily removed > from partition but then re-added and recovered. > It looks like what happens is this: > 1) The kafka client session times out. > 2) The partition is revoked > 3) The streams library has a rebalance listener that tries to commit offsets, > but that commit fails due to a rebalance exception. > 4) Stream shuts down. > Steps 3 and 4 occur in StreamThread's rebalance listener. > It seems that it should be more resilient and recover just like a regular > KafkaConsumer would. Its partition would be revoked, and then it would get it > back again and resume processing at the last offset. > Is current behavior expected and I'm not understanding the intention? Or is > this a bug? > Thanks! -- This message was sent by Atlassian JIRA (v7.6.3#76005)