We wanted to have less frequent polling in kafka streams (mostly because we 
have noticed quite a lot of object creation when polling a queue with no new 
messages on it), so we have set polling to 10 seconds.  On start up when 
rebalance first happens, the onPartitionsAssigned of ConsumerRebalanceListener 
keeps causing a rebalance as it takes too long and the heart beat thread hasn’t 
started up. I have applied the relevant configuration to the WordCountProcessor 
demo class from kafka streams and it does the same. The additional config I 
apply to the demo class is:

        props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG,1);
        props.put(StreamsConfig.POLL_MS_CONFIG, 10000);
        props.put(StreamsConfig.CONSUMER_PREFIX + 
ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 10000);
        props.put(StreamsConfig.CONSUMER_PREFIX + 
ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
        props.put(StreamsConfig.CONSUMER_PREFIX + 
ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 10000);


This causes the following logging:

10630 [StreamThread-1] INFO 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator - (Re-)joining 
group streams-wordcount-processor
10632 [StreamThread-1] DEBUG 
org.apache.kafka.streams.processor.internals.StreamPartitionAssignor - 
stream-thread [StreamThread-1] found [streams-file-input] topics possibly 
matching regex
10640 [StreamThread-1] INFO 
org.apache.kafka.streams.processor.internals.StreamPartitionAssignor - 
stream-thread [StreamThread-1] Constructed client metadata 
{0f988636-b4f7-42af-b84b-249c9feb5fcb=ClientMetadata{hostInfo=null, 
consumers=[streams-wordcount-processor-0f988636-b4f7-42af-b84b-249c9feb5fcb-StreamThread-1-consumer-774aa883-3900-4d50-b4f8-d9846486f1fe],
 state=[activeTasks: ([]) assignedTasks: ([]) prevActiveTasks: ([]) 
prevAssignedTasks: ([]) capacity: 1.0 cost: 0.0]}} from the member 
subscriptions.
10643 [StreamThread-1] DEBUG 
org.apache.kafka.streams.processor.internals.StreamPartitionAssignor - 
stream-thread [StreamThread-1] Starting to validate internal topics in 
partition assignor.
10643 [StreamThread-1] INFO 
org.apache.kafka.streams.processor.internals.StreamPartitionAssignor - 
stream-thread [StreamThread-1] Completed validating internal topics in 
partition assignor
10643 [StreamThread-1] DEBUG 
org.apache.kafka.streams.processor.internals.StreamPartitionAssignor - 
stream-thread [StreamThread-1] Created repartition topics [] from the parsed 
topology.
10644 [StreamThread-1] DEBUG 
org.apache.kafka.streams.processor.internals.StreamPartitionAssignor - 
stream-thread [StreamThread-1] Starting to validate internal topics in 
partition assignor.
40664 [StreamThread-1] INFO 
org.apache.kafka.streams.processor.internals.StreamPartitionAssignor - 
stream-thread [StreamThread-1] Completed validating internal topics in 
partition assignor
40664 [StreamThread-1] DEBUG 
org.apache.kafka.streams.processor.internals.StreamPartitionAssignor - 
stream-thread [StreamThread-1] Created state changelog topics 
{streams-wordcount-processor-Counts-changelog=org.apache.kafka.streams.processor.internals.StreamPartitionAssignor$InternalTopicMetadata@a3de218}
 from the parsed topology.
40664 [StreamThread-1] DEBUG 
org.apache.kafka.streams.processor.internals.StreamPartitionAssignor - 
stream-thread [StreamThread-1] Assigning tasks [0_0] to clients 
{0f988636-b4f7-42af-b84b-249c9feb5fcb=[activeTasks: ([]) assignedTasks: ([]) 
prevActiveTasks: ([]) prevAssignedTasks: ([]) capacity: 1.0 cost: 0.0]} with 
number of replicas 0
40666 [StreamThread-1] INFO 
org.apache.kafka.streams.processor.internals.StreamPartitionAssignor - 
stream-thread [StreamThread-1] Assigned tasks to clients as 
{0f988636-b4f7-42af-b84b-249c9feb5fcb=[activeTasks: ([0_0]) assignedTasks: 
([0_0]) prevActiveTasks: ([]) prevAssignedTasks: ([]) capacity: 1.0 cost: 0.5]}.
40670 [StreamThread-1] INFO 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator - (Re-)joining 
group streams-wordcount-processor
40670 [StreamThread-1] DEBUG 
org.apache.kafka.streams.processor.internals.StreamPartitionAssignor - 
stream-thread [StreamThread-1] found [streams-file-input] topics possibly 
matching regex
40672 [StreamThread-1] INFO 
org.apache.kafka.streams.processor.internals.StreamPartitionAssignor - 
stream-thread [StreamThread-1] Constructed client metadata 
{0f988636-b4f7-42af-b84b-249c9feb5fcb=ClientMetadata{hostInfo=null, 
consumers=[streams-wordcount-processor-0f988636-b4f7-42af-b84b-249c9feb5fcb-StreamThread-1-consumer-618818cd-e6dc-4ece-af8c-1a80b41de1c8],
 state=[activeTasks: ([]) assignedTasks: ([]) prevActiveTasks: ([]) 
prevAssignedTasks: ([]) capacity: 1.0 cost: 0.0]}} from the member 
subscriptions.
40672 [StreamThread-1] DEBUG 
org.apache.kafka.streams.processor.internals.StreamPartitionAssignor - 
stream-thread [StreamThread-1] Starting to validate internal topics in 
partition assignor.
40672 [StreamThread-1] INFO 
org.apache.kafka.streams.processor.internals.StreamPartitionAssignor - 
stream-thread [StreamThread-1] Completed validating internal topics in 
partition assignor
40672 [StreamThread-1] DEBUG 
org.apache.kafka.streams.processor.internals.StreamPartitionAssignor - 
stream-thread [StreamThread-1] Created repartition topics [] from the parsed 
topology.
40672 [StreamThread-1] DEBUG 
org.apache.kafka.streams.processor.internals.StreamPartitionAssignor - 
stream-thread [StreamThread-1] Starting to validate internal topics in 
partition assignor.
70685 [StreamThread-1] INFO 
org.apache.kafka.streams.processor.internals.StreamPartitionAssignor - 
stream-thread [StreamThread-1] Completed validating internal topics in 
partition assignor


As far as I know the only configuration requirements are that heart beat is 1/3 
or less of session timeout.  We can reduce the polling to 5000ms (we don’t want 
to increase the session timeout beyond 30000ms), but would prefer this higher 
value.  Any suggestions as to whether this is an expected configuration 
limitation, and/or whether a fix is possible.  We are using 0.10.2.0 client and 
server, with 0.10.2.1 client (but 0.10.2.0 server) this still  happens.

Thanks,

Tom

Reply via email to