We wanted to have less frequent polling in kafka streams (mostly because we have noticed quite a lot of object creation when polling a queue with no new messages on it), so we have set polling to 10 seconds. On start up when rebalance first happens, the onPartitionsAssigned of ConsumerRebalanceListener keeps causing a rebalance as it takes too long and the heart beat thread hasn’t started up. I have applied the relevant configuration to the WordCountProcessor demo class from kafka streams and it does the same. The additional config I apply to the demo class is:
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG,1); props.put(StreamsConfig.POLL_MS_CONFIG, 10000); props.put(StreamsConfig.CONSUMER_PREFIX + ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 10000); props.put(StreamsConfig.CONSUMER_PREFIX + ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000); props.put(StreamsConfig.CONSUMER_PREFIX + ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 10000); This causes the following logging: 10630 [StreamThread-1] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - (Re-)joining group streams-wordcount-processor 10632 [StreamThread-1] DEBUG org.apache.kafka.streams.processor.internals.StreamPartitionAssignor - stream-thread [StreamThread-1] found [streams-file-input] topics possibly matching regex 10640 [StreamThread-1] INFO org.apache.kafka.streams.processor.internals.StreamPartitionAssignor - stream-thread [StreamThread-1] Constructed client metadata {0f988636-b4f7-42af-b84b-249c9feb5fcb=ClientMetadata{hostInfo=null, consumers=[streams-wordcount-processor-0f988636-b4f7-42af-b84b-249c9feb5fcb-StreamThread-1-consumer-774aa883-3900-4d50-b4f8-d9846486f1fe], state=[activeTasks: ([]) assignedTasks: ([]) prevActiveTasks: ([]) prevAssignedTasks: ([]) capacity: 1.0 cost: 0.0]}} from the member subscriptions. 10643 [StreamThread-1] DEBUG org.apache.kafka.streams.processor.internals.StreamPartitionAssignor - stream-thread [StreamThread-1] Starting to validate internal topics in partition assignor. 10643 [StreamThread-1] INFO org.apache.kafka.streams.processor.internals.StreamPartitionAssignor - stream-thread [StreamThread-1] Completed validating internal topics in partition assignor 10643 [StreamThread-1] DEBUG org.apache.kafka.streams.processor.internals.StreamPartitionAssignor - stream-thread [StreamThread-1] Created repartition topics [] from the parsed topology. 10644 [StreamThread-1] DEBUG org.apache.kafka.streams.processor.internals.StreamPartitionAssignor - stream-thread [StreamThread-1] Starting to validate internal topics in partition assignor. 40664 [StreamThread-1] INFO org.apache.kafka.streams.processor.internals.StreamPartitionAssignor - stream-thread [StreamThread-1] Completed validating internal topics in partition assignor 40664 [StreamThread-1] DEBUG org.apache.kafka.streams.processor.internals.StreamPartitionAssignor - stream-thread [StreamThread-1] Created state changelog topics {streams-wordcount-processor-Counts-changelog=org.apache.kafka.streams.processor.internals.StreamPartitionAssignor$InternalTopicMetadata@a3de218} from the parsed topology. 40664 [StreamThread-1] DEBUG org.apache.kafka.streams.processor.internals.StreamPartitionAssignor - stream-thread [StreamThread-1] Assigning tasks [0_0] to clients {0f988636-b4f7-42af-b84b-249c9feb5fcb=[activeTasks: ([]) assignedTasks: ([]) prevActiveTasks: ([]) prevAssignedTasks: ([]) capacity: 1.0 cost: 0.0]} with number of replicas 0 40666 [StreamThread-1] INFO org.apache.kafka.streams.processor.internals.StreamPartitionAssignor - stream-thread [StreamThread-1] Assigned tasks to clients as {0f988636-b4f7-42af-b84b-249c9feb5fcb=[activeTasks: ([0_0]) assignedTasks: ([0_0]) prevActiveTasks: ([]) prevAssignedTasks: ([]) capacity: 1.0 cost: 0.5]}. 40670 [StreamThread-1] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - (Re-)joining group streams-wordcount-processor 40670 [StreamThread-1] DEBUG org.apache.kafka.streams.processor.internals.StreamPartitionAssignor - stream-thread [StreamThread-1] found [streams-file-input] topics possibly matching regex 40672 [StreamThread-1] INFO org.apache.kafka.streams.processor.internals.StreamPartitionAssignor - stream-thread [StreamThread-1] Constructed client metadata {0f988636-b4f7-42af-b84b-249c9feb5fcb=ClientMetadata{hostInfo=null, consumers=[streams-wordcount-processor-0f988636-b4f7-42af-b84b-249c9feb5fcb-StreamThread-1-consumer-618818cd-e6dc-4ece-af8c-1a80b41de1c8], state=[activeTasks: ([]) assignedTasks: ([]) prevActiveTasks: ([]) prevAssignedTasks: ([]) capacity: 1.0 cost: 0.0]}} from the member subscriptions. 40672 [StreamThread-1] DEBUG org.apache.kafka.streams.processor.internals.StreamPartitionAssignor - stream-thread [StreamThread-1] Starting to validate internal topics in partition assignor. 40672 [StreamThread-1] INFO org.apache.kafka.streams.processor.internals.StreamPartitionAssignor - stream-thread [StreamThread-1] Completed validating internal topics in partition assignor 40672 [StreamThread-1] DEBUG org.apache.kafka.streams.processor.internals.StreamPartitionAssignor - stream-thread [StreamThread-1] Created repartition topics [] from the parsed topology. 40672 [StreamThread-1] DEBUG org.apache.kafka.streams.processor.internals.StreamPartitionAssignor - stream-thread [StreamThread-1] Starting to validate internal topics in partition assignor. 70685 [StreamThread-1] INFO org.apache.kafka.streams.processor.internals.StreamPartitionAssignor - stream-thread [StreamThread-1] Completed validating internal topics in partition assignor As far as I know the only configuration requirements are that heart beat is 1/3 or less of session timeout. We can reduce the polling to 5000ms (we don’t want to increase the session timeout beyond 30000ms), but would prefer this higher value. Any suggestions as to whether this is an expected configuration limitation, and/or whether a fix is possible. We are using 0.10.2.0 client and server, with 0.10.2.1 client (but 0.10.2.0 server) this still happens. Thanks, Tom