[ 
https://issues.apache.org/jira/browse/KAFKA-4799?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-4799.
----------------------------------
       Resolution: Fixed
    Fix Version/s: 0.11.0.1

> session timeout during event processing shuts down stream
> ---------------------------------------------------------
>
>                 Key: KAFKA-4799
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4799
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 0.10.1.1
>         Environment: kafka streams client running on os x, with docker 
> machine running broker
>            Reporter: Jacob Gur
>            Priority: Critical
>             Fix For: 0.11.0.1
>
>
> I have a simple stream application like this:
> {code:title=Part of my class|borderStyle=solid}
>       private <T> IConsumerSubscription buildSubscriptionStream(
>                       Class<T> clazz, Consumer<T> consumer, String group,
>                       Function<KStreamBuilder, KStream<String, String>> 
> topicStreamFunc)
>       {
>               KStreamBuilder builder = new KStreamBuilder();
>               KStream<String, String> stream = topicStreamFunc.apply(builder);
>               stream.foreach((k, v) -> {
>                       try {
>                               T value = 
> _jsonObjectMapper.mapFromJsonString(v, clazz);
>                               consumer.accept(value);
>                               Logger.trace("Consumed message {}", value);
>                       } catch (Throwable th) {
>                               Logger.warn("Error while consuming message", 
> th);
>                       }
>               });
>               final KafkaStreams streams = new KafkaStreams(builder, 
> constructProperties(group));
>               streams.start();
>               return streams::close;
>       }
> {code}
> There is just one client running this application stream.
> If I run the client in a debugger with a breakpoint on the event processor 
> (i.e., inside the foreach lambda) with debugger suspending all threads for 
> perhaps more than 10 seconds, then when I resume the application:
> Actual behavior - the stream shuts down
> Expected behavior - the stream should recover, perhaps temporarily removed 
> from partition but then re-added and recovered.
> It looks like what happens is this:
> 1) The kafka client session times out.
> 2) The partition is revoked
> 3) The streams library has a rebalance listener that tries to commit offsets, 
> but that commit fails due to a rebalance exception.
> 4) Stream shuts down.
> Steps 3 and 4 occur in StreamThread's rebalance listener.
> It seems that it should be more resilient and recover just like a regular 
> KafkaConsumer would. Its partition would be revoked, and then it would get it 
> back again and resume processing at the last offset.
> Is current behavior expected and I'm not understanding the intention? Or is 
> this a bug?
> Thanks!



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to