Great. Thanks a lot for confirming! :)

-Matthias

On 6/26/17 4:58 AM, Tom Dearman wrote:
> Hi Matthias,
> 
> This problem seems to be fixed in 0.11.0.0 client.
> 
> Thanks,
> 
> Tom
>> On 17 Jun 2017, at 01:11, Matthias J. Sax <matth...@confluent.io> wrote:
>>
>> Hi Tom,
>>
>> Thanks a lot for reporting this. We dug into it. It's easy to reproduce
>> (thank a lot to describe a simple way to do that) and it seems to be a
>> bug in Streams... I did open a JIRA:
>> https://issues.apache.org/jira/browse/KAFKA-5464
>>
>> For using Streams 0.10.2.1, there is nothing we can advice atm -- only,
>> to not increase `POLL_MS_CONFIG`.
>>
>> The good news is, that Kafka 0.11 will be release soon, and we did some
>> rework of the main poll-loop in Streams (cf.
>> https://issues.apache.org/jira/browse/KAFKA-4843). This should resolve
>> the issue without the need to increase `POLL_MS_CONFIG` in the first place.
>>
>> It would be great, if you could verify this. :)
>>
>>
>> -Matthias
>>
>>
>> On 6/8/17 7:18 AM, Tom Dearman wrote:
>>> We wanted to have less frequent polling in kafka streams (mostly because we 
>>> have noticed quite a lot of object creation when polling a queue with no 
>>> new messages on it), so we have set polling to 10 seconds.  On start up 
>>> when rebalance first happens, the onPartitionsAssigned of 
>>> ConsumerRebalanceListener keeps causing a rebalance as it takes too long 
>>> and the heart beat thread hasn’t started up. I have applied the relevant 
>>> configuration to the WordCountProcessor demo class from kafka streams and 
>>> it does the same. The additional config I apply to the demo class is:
>>>
>>>        props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG,1);
>>>        props.put(StreamsConfig.POLL_MS_CONFIG, 10000);
>>>        props.put(StreamsConfig.CONSUMER_PREFIX + 
>>> ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 10000);
>>>        props.put(StreamsConfig.CONSUMER_PREFIX + 
>>> ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
>>>        props.put(StreamsConfig.CONSUMER_PREFIX + 
>>> ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 10000);
>>>
>>>
>>> This causes the following logging:
>>>
>>> 10630 [StreamThread-1] INFO 
>>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator - 
>>> (Re-)joining group streams-wordcount-processor
>>> 10632 [StreamThread-1] DEBUG 
>>> org.apache.kafka.streams.processor.internals.StreamPartitionAssignor - 
>>> stream-thread [StreamThread-1] found [streams-file-input] topics possibly 
>>> matching regex
>>> 10640 [StreamThread-1] INFO 
>>> org.apache.kafka.streams.processor.internals.StreamPartitionAssignor - 
>>> stream-thread [StreamThread-1] Constructed client metadata 
>>> {0f988636-b4f7-42af-b84b-249c9feb5fcb=ClientMetadata{hostInfo=null, 
>>> consumers=[streams-wordcount-processor-0f988636-b4f7-42af-b84b-249c9feb5fcb-StreamThread-1-consumer-774aa883-3900-4d50-b4f8-d9846486f1fe],
>>>  state=[activeTasks: ([]) assignedTasks: ([]) prevActiveTasks: ([]) 
>>> prevAssignedTasks: ([]) capacity: 1.0 cost: 0.0]}} from the member 
>>> subscriptions.
>>> 10643 [StreamThread-1] DEBUG 
>>> org.apache.kafka.streams.processor.internals.StreamPartitionAssignor - 
>>> stream-thread [StreamThread-1] Starting to validate internal topics in 
>>> partition assignor.
>>> 10643 [StreamThread-1] INFO 
>>> org.apache.kafka.streams.processor.internals.StreamPartitionAssignor - 
>>> stream-thread [StreamThread-1] Completed validating internal topics in 
>>> partition assignor
>>> 10643 [StreamThread-1] DEBUG 
>>> org.apache.kafka.streams.processor.internals.StreamPartitionAssignor - 
>>> stream-thread [StreamThread-1] Created repartition topics [] from the 
>>> parsed topology.
>>> 10644 [StreamThread-1] DEBUG 
>>> org.apache.kafka.streams.processor.internals.StreamPartitionAssignor - 
>>> stream-thread [StreamThread-1] Starting to validate internal topics in 
>>> partition assignor.
>>> 40664 [StreamThread-1] INFO 
>>> org.apache.kafka.streams.processor.internals.StreamPartitionAssignor - 
>>> stream-thread [StreamThread-1] Completed validating internal topics in 
>>> partition assignor
>>> 40664 [StreamThread-1] DEBUG 
>>> org.apache.kafka.streams.processor.internals.StreamPartitionAssignor - 
>>> stream-thread [StreamThread-1] Created state changelog topics 
>>> {streams-wordcount-processor-Counts-changelog=org.apache.kafka.streams.processor.internals.StreamPartitionAssignor$InternalTopicMetadata@a3de218}
>>>  from the parsed topology.
>>> 40664 [StreamThread-1] DEBUG 
>>> org.apache.kafka.streams.processor.internals.StreamPartitionAssignor - 
>>> stream-thread [StreamThread-1] Assigning tasks [0_0] to clients 
>>> {0f988636-b4f7-42af-b84b-249c9feb5fcb=[activeTasks: ([]) assignedTasks: 
>>> ([]) prevActiveTasks: ([]) prevAssignedTasks: ([]) capacity: 1.0 cost: 
>>> 0.0]} with number of replicas 0
>>> 40666 [StreamThread-1] INFO 
>>> org.apache.kafka.streams.processor.internals.StreamPartitionAssignor - 
>>> stream-thread [StreamThread-1] Assigned tasks to clients as 
>>> {0f988636-b4f7-42af-b84b-249c9feb5fcb=[activeTasks: ([0_0]) assignedTasks: 
>>> ([0_0]) prevActiveTasks: ([]) prevAssignedTasks: ([]) capacity: 1.0 cost: 
>>> 0.5]}.
>>> 40670 [StreamThread-1] INFO 
>>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator - 
>>> (Re-)joining group streams-wordcount-processor
>>> 40670 [StreamThread-1] DEBUG 
>>> org.apache.kafka.streams.processor.internals.StreamPartitionAssignor - 
>>> stream-thread [StreamThread-1] found [streams-file-input] topics possibly 
>>> matching regex
>>> 40672 [StreamThread-1] INFO 
>>> org.apache.kafka.streams.processor.internals.StreamPartitionAssignor - 
>>> stream-thread [StreamThread-1] Constructed client metadata 
>>> {0f988636-b4f7-42af-b84b-249c9feb5fcb=ClientMetadata{hostInfo=null, 
>>> consumers=[streams-wordcount-processor-0f988636-b4f7-42af-b84b-249c9feb5fcb-StreamThread-1-consumer-618818cd-e6dc-4ece-af8c-1a80b41de1c8],
>>>  state=[activeTasks: ([]) assignedTasks: ([]) prevActiveTasks: ([]) 
>>> prevAssignedTasks: ([]) capacity: 1.0 cost: 0.0]}} from the member 
>>> subscriptions.
>>> 40672 [StreamThread-1] DEBUG 
>>> org.apache.kafka.streams.processor.internals.StreamPartitionAssignor - 
>>> stream-thread [StreamThread-1] Starting to validate internal topics in 
>>> partition assignor.
>>> 40672 [StreamThread-1] INFO 
>>> org.apache.kafka.streams.processor.internals.StreamPartitionAssignor - 
>>> stream-thread [StreamThread-1] Completed validating internal topics in 
>>> partition assignor
>>> 40672 [StreamThread-1] DEBUG 
>>> org.apache.kafka.streams.processor.internals.StreamPartitionAssignor - 
>>> stream-thread [StreamThread-1] Created repartition topics [] from the 
>>> parsed topology.
>>> 40672 [StreamThread-1] DEBUG 
>>> org.apache.kafka.streams.processor.internals.StreamPartitionAssignor - 
>>> stream-thread [StreamThread-1] Starting to validate internal topics in 
>>> partition assignor.
>>> 70685 [StreamThread-1] INFO 
>>> org.apache.kafka.streams.processor.internals.StreamPartitionAssignor - 
>>> stream-thread [StreamThread-1] Completed validating internal topics in 
>>> partition assignor
>>>
>>>
>>> As far as I know the only configuration requirements are that heart beat is 
>>> 1/3 or less of session timeout.  We can reduce the polling to 5000ms (we 
>>> don’t want to increase the session timeout beyond 30000ms), but would 
>>> prefer this higher value.  Any suggestions as to whether this is an 
>>> expected configuration limitation, and/or whether a fix is possible.  We 
>>> are using 0.10.2.0 client and server, with 0.10.2.1 client (but 0.10.2.0 
>>> server) this still  happens.
>>>
>>> Thanks,
>>>
>>> Tom
>>>
>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to