Great. Thanks a lot for confirming! :) -Matthias
On 6/26/17 4:58 AM, Tom Dearman wrote: > Hi Matthias, > > This problem seems to be fixed in 0.11.0.0 client. > > Thanks, > > Tom >> On 17 Jun 2017, at 01:11, Matthias J. Sax <matth...@confluent.io> wrote: >> >> Hi Tom, >> >> Thanks a lot for reporting this. We dug into it. It's easy to reproduce >> (thank a lot to describe a simple way to do that) and it seems to be a >> bug in Streams... I did open a JIRA: >> https://issues.apache.org/jira/browse/KAFKA-5464 >> >> For using Streams 0.10.2.1, there is nothing we can advice atm -- only, >> to not increase `POLL_MS_CONFIG`. >> >> The good news is, that Kafka 0.11 will be release soon, and we did some >> rework of the main poll-loop in Streams (cf. >> https://issues.apache.org/jira/browse/KAFKA-4843). This should resolve >> the issue without the need to increase `POLL_MS_CONFIG` in the first place. >> >> It would be great, if you could verify this. :) >> >> >> -Matthias >> >> >> On 6/8/17 7:18 AM, Tom Dearman wrote: >>> We wanted to have less frequent polling in kafka streams (mostly because we >>> have noticed quite a lot of object creation when polling a queue with no >>> new messages on it), so we have set polling to 10 seconds. On start up >>> when rebalance first happens, the onPartitionsAssigned of >>> ConsumerRebalanceListener keeps causing a rebalance as it takes too long >>> and the heart beat thread hasn’t started up. I have applied the relevant >>> configuration to the WordCountProcessor demo class from kafka streams and >>> it does the same. The additional config I apply to the demo class is: >>> >>> props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG,1); >>> props.put(StreamsConfig.POLL_MS_CONFIG, 10000); >>> props.put(StreamsConfig.CONSUMER_PREFIX + >>> ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 10000); >>> props.put(StreamsConfig.CONSUMER_PREFIX + >>> ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000); >>> props.put(StreamsConfig.CONSUMER_PREFIX + >>> ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 10000); >>> >>> >>> This causes the following logging: >>> >>> 10630 [StreamThread-1] INFO >>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator - >>> (Re-)joining group streams-wordcount-processor >>> 10632 [StreamThread-1] DEBUG >>> org.apache.kafka.streams.processor.internals.StreamPartitionAssignor - >>> stream-thread [StreamThread-1] found [streams-file-input] topics possibly >>> matching regex >>> 10640 [StreamThread-1] INFO >>> org.apache.kafka.streams.processor.internals.StreamPartitionAssignor - >>> stream-thread [StreamThread-1] Constructed client metadata >>> {0f988636-b4f7-42af-b84b-249c9feb5fcb=ClientMetadata{hostInfo=null, >>> consumers=[streams-wordcount-processor-0f988636-b4f7-42af-b84b-249c9feb5fcb-StreamThread-1-consumer-774aa883-3900-4d50-b4f8-d9846486f1fe], >>> state=[activeTasks: ([]) assignedTasks: ([]) prevActiveTasks: ([]) >>> prevAssignedTasks: ([]) capacity: 1.0 cost: 0.0]}} from the member >>> subscriptions. >>> 10643 [StreamThread-1] DEBUG >>> org.apache.kafka.streams.processor.internals.StreamPartitionAssignor - >>> stream-thread [StreamThread-1] Starting to validate internal topics in >>> partition assignor. >>> 10643 [StreamThread-1] INFO >>> org.apache.kafka.streams.processor.internals.StreamPartitionAssignor - >>> stream-thread [StreamThread-1] Completed validating internal topics in >>> partition assignor >>> 10643 [StreamThread-1] DEBUG >>> org.apache.kafka.streams.processor.internals.StreamPartitionAssignor - >>> stream-thread [StreamThread-1] Created repartition topics [] from the >>> parsed topology. >>> 10644 [StreamThread-1] DEBUG >>> org.apache.kafka.streams.processor.internals.StreamPartitionAssignor - >>> stream-thread [StreamThread-1] Starting to validate internal topics in >>> partition assignor. >>> 40664 [StreamThread-1] INFO >>> org.apache.kafka.streams.processor.internals.StreamPartitionAssignor - >>> stream-thread [StreamThread-1] Completed validating internal topics in >>> partition assignor >>> 40664 [StreamThread-1] DEBUG >>> org.apache.kafka.streams.processor.internals.StreamPartitionAssignor - >>> stream-thread [StreamThread-1] Created state changelog topics >>> {streams-wordcount-processor-Counts-changelog=org.apache.kafka.streams.processor.internals.StreamPartitionAssignor$InternalTopicMetadata@a3de218} >>> from the parsed topology. >>> 40664 [StreamThread-1] DEBUG >>> org.apache.kafka.streams.processor.internals.StreamPartitionAssignor - >>> stream-thread [StreamThread-1] Assigning tasks [0_0] to clients >>> {0f988636-b4f7-42af-b84b-249c9feb5fcb=[activeTasks: ([]) assignedTasks: >>> ([]) prevActiveTasks: ([]) prevAssignedTasks: ([]) capacity: 1.0 cost: >>> 0.0]} with number of replicas 0 >>> 40666 [StreamThread-1] INFO >>> org.apache.kafka.streams.processor.internals.StreamPartitionAssignor - >>> stream-thread [StreamThread-1] Assigned tasks to clients as >>> {0f988636-b4f7-42af-b84b-249c9feb5fcb=[activeTasks: ([0_0]) assignedTasks: >>> ([0_0]) prevActiveTasks: ([]) prevAssignedTasks: ([]) capacity: 1.0 cost: >>> 0.5]}. >>> 40670 [StreamThread-1] INFO >>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator - >>> (Re-)joining group streams-wordcount-processor >>> 40670 [StreamThread-1] DEBUG >>> org.apache.kafka.streams.processor.internals.StreamPartitionAssignor - >>> stream-thread [StreamThread-1] found [streams-file-input] topics possibly >>> matching regex >>> 40672 [StreamThread-1] INFO >>> org.apache.kafka.streams.processor.internals.StreamPartitionAssignor - >>> stream-thread [StreamThread-1] Constructed client metadata >>> {0f988636-b4f7-42af-b84b-249c9feb5fcb=ClientMetadata{hostInfo=null, >>> consumers=[streams-wordcount-processor-0f988636-b4f7-42af-b84b-249c9feb5fcb-StreamThread-1-consumer-618818cd-e6dc-4ece-af8c-1a80b41de1c8], >>> state=[activeTasks: ([]) assignedTasks: ([]) prevActiveTasks: ([]) >>> prevAssignedTasks: ([]) capacity: 1.0 cost: 0.0]}} from the member >>> subscriptions. >>> 40672 [StreamThread-1] DEBUG >>> org.apache.kafka.streams.processor.internals.StreamPartitionAssignor - >>> stream-thread [StreamThread-1] Starting to validate internal topics in >>> partition assignor. >>> 40672 [StreamThread-1] INFO >>> org.apache.kafka.streams.processor.internals.StreamPartitionAssignor - >>> stream-thread [StreamThread-1] Completed validating internal topics in >>> partition assignor >>> 40672 [StreamThread-1] DEBUG >>> org.apache.kafka.streams.processor.internals.StreamPartitionAssignor - >>> stream-thread [StreamThread-1] Created repartition topics [] from the >>> parsed topology. >>> 40672 [StreamThread-1] DEBUG >>> org.apache.kafka.streams.processor.internals.StreamPartitionAssignor - >>> stream-thread [StreamThread-1] Starting to validate internal topics in >>> partition assignor. >>> 70685 [StreamThread-1] INFO >>> org.apache.kafka.streams.processor.internals.StreamPartitionAssignor - >>> stream-thread [StreamThread-1] Completed validating internal topics in >>> partition assignor >>> >>> >>> As far as I know the only configuration requirements are that heart beat is >>> 1/3 or less of session timeout. We can reduce the polling to 5000ms (we >>> don’t want to increase the session timeout beyond 30000ms), but would >>> prefer this higher value. Any suggestions as to whether this is an >>> expected configuration limitation, and/or whether a fix is possible. We >>> are using 0.10.2.0 client and server, with 0.10.2.1 client (but 0.10.2.0 >>> server) this still happens. >>> >>> Thanks, >>> >>> Tom >>> >> >
signature.asc
Description: OpenPGP digital signature