Hello Elias, >From the error messages it does seem that brokers are truncating log segments too fast that both the stream's fetcher (or more generally any consumer fetcher) as well as the replica's fetcher cannot catch up, resulting their fetch offset is smaller than the leader's smallest offset.
Also as you mentioned that after you stopped the streams tasks, the repartition topics data will be gone after some time, and it makes me wonder that these topics are configured to have time-based retention whose period is very short; on the other hand, you were pretty sure that *"That makes no sense, as the topic is not configured to delete any messages."* This indeed sounds weird to me. Note that for log retention, Kafka brokers have a global config that could be applied to any topics, and topics themselves have a per-topic config that can override the broker-level global config, you may want to check both the broker configs as well as the topic configs (e.g. with the kafka-topics command tool) to make sure that time-based retention is properly set on both levels. Guozhang On Fri, Jan 20, 2017 at 9:07 AM, Elias Levy <fearsome.lucid...@gmail.com> wrote: > Suggestions? > > On Thu, Jan 19, 2017 at 6:23 PM, Elias Levy <fearsome.lucid...@gmail.com> > wrote: > > > In the process of testing a Kafka Streams application I've come across a > > few issues that are baffling me. > > > > For testing I am executing a job on 20 nodes with four cores per node, > > each instance configured to use 4 threads, against a 5 node broker > cluster > > running 0.10.1.1. > > > > Before execution kafka-streams-application-reset.sh is ran to reset > > offset of input topics to zero. The app calls KafkaStreams.cleanUp() on > > startup to clean up the local state stores. All workers are started > > simultaneously. All topics have 100 partitions. The main input topic has > > 1TB of data 3x replicated. min.insync.replicas is set to 3. > > > > The application consumes from the main input topic, transforms the input, > > and repartitions it using KStream.through() to write to another topic. > It > > reads from the repartitioned topic and continues processing. > > > > In the brokers we are seeing errors such as: > > > > [2017-01-19 21:24:41,298] WARN [ReplicaFetcherThread-3-1009], Replica > 1010 > > for partition some_topic-91 reset its fetch offset from 424762 to current > > leader 1009's start offset 424779 (kafka.server.ReplicaFetcherThread) > > [2017-01-19 21:24:41,350] WARN [ReplicaFetcherThread-2-1009], Replica > 1010 > > for partition some_topic-66 reset its fetch offset from 401243 to current > > leader 1009's start offset 401376 (kafka.server.ReplicaFetcherThread) > > [2017-01-19 21:24:41,381] ERROR [ReplicaFetcherThread-3-1009], Current > > offset 424762 for partition [some_topic,91] out of range; reset offset to > > 424779 (kafka.server.ReplicaFetcherThread) > > [2017-01-19 21:24:41,399] WARN [ReplicaFetcherThread-3-1009], Replica > 1010 > > for partition some_topic-71 reset its fetch offset from 456158 to current > > leader 1009's start offset 456189 (kafka.server.ReplicaFetcherThread) > > [2017-01-19 21:24:41,400] WARN [ReplicaFetcherThread-0-1007], Replica > 1010 > > for partition some_topic-84 reset its fetch offset from 399325 to current > > leader 1007's start offset 399327 (kafka.server.ReplicaFetcherThread) > > [2017-01-19 21:24:41,446] ERROR [ReplicaFetcherThread-2-1009], Current > > offset 401243 for partition [some_topic,66] out of range; reset offset to > > 401376 (kafka.server.ReplicaFetcherThread) > > > > If I understand these errors correctly, they are saying that the broker's > > replica fetcher thread for these partitions failed to fetch at its > current > > offset because the leader's start offset is higher. It basically says > the > > leader no longer has the messages at the offset requested. That makes no > > sense, as the topic is not configured to delete any messages. I observed > > these errors 512 times in total across all brokers while executing the > > application. > > > > From there is seems to cascade to the Streams application: > > > > INFO 2017-01-19 21:24:41,417 [StreamThread-4][Fetcher.java:714] : Fetch > > offset 1051824 is out of range for partition some_topic-14, resetting > offset > > ERROR 2017-01-19 21:24:41,421 [StreamThread-4][StreamThread.java:249] : > > stream-thread [StreamThread-4] Streams application error during > processing: > > java.lang.NullPointerException > > at org.apache.kafka.clients.consumer.internals.Fetcher. > > resetOffset(Fetcher.java:341) > > at org.apache.kafka.clients.consumer.internals.Fetcher. > > resetOffsetsIfNeeded(Fetcher.java:197) > > at org.apache.kafka.clients.consumer.KafkaConsumer. > > updateFetchPositions(KafkaConsumer.java:1524) > > at org.apache.kafka.clients.consumer.KafkaConsumer. > > pollOnce(KafkaConsumer.java:1018) > > at org.apache.kafka.clients.consumer.KafkaConsumer.poll( > > KafkaConsumer.java:979) > > at org.apache.kafka.streams.processor.internals. > > StreamThread.runLoop(StreamThread.java:407) > > at org.apache.kafka.streams.processor.internals. > > StreamThread.run(StreamThread.java:242) > > INFO 2017-01-19 21:24:41,425 [StreamThread-4][StreamThread.java:268] : > > stream-thread [StreamThread-4] Shutting down > > > > Saw this error 731 times across the workers. > > > > If we look at just one partition across brokers and workers and we group > > the logs by time, we see this: > > > > worker-3 INFO 2017-01-19 21:*24:41*,806 [StreamThread-3][Fetcher.java: > 714] > > : Fetch offset 429851 is out of range for partition some_topic-94, > > resetting offset > > > > worker-3 INFO 2017-01-19 21:*29:41*,496 [StreamThread-1][Fetcher.java: > 714] > > : Fetch offset 1317721 is out of range for partition some_topic-94, > > resetting offset > > > > worker-1 INFO 2017-01-19 21:*34:41*,977 [StreamThread-2][Fetcher.java: > 714] > > : Fetch offset 2014017 is out of range for partition some_topic-94, > > resetting offset > > > > worker-1 INFO 2017-01-19 21:*39:41*,425 [StreamThread-3][Fetcher.java: > 714] > > : Fetch offset 2588834 is out of range for partition some_topic-94, > > resetting offset > > > > broker-3 [2017-01-19 21:*44:41*,595] WARN [ReplicaFetcherThread-2-1007], > > Replica 1008 for partition some_topic-94 reset its fetch offset from > > 3093739 to current leader 1007's start offset 3093742 (kafka.server. > > ReplicaFetcherThread) > > broker-3 [2017-01-19 21:*44:41*,642] ERROR [ReplicaFetcherThread-2-1007], > > Current offset 3093739 for partition [some_topic,94] out of range; reset > > offset to 3093742 (kafka.server.ReplicaFetcherThread) > > worker-2 INFO 2017-01-19 21:*45:03*,011 [StreamThread-2][Fetcher.java: > 714] > > : Fetch offset 3093075 is out of range for partition some_topic-94, > > resetting offset > > > > broker-3 [2017-01-19 21:*49:41*,344] WARN [ReplicaFetcherThread-2-1007], > > Replica 1008 for partition some_topic-94 reset its fetch offset from > > 3417421 to current leader 1007's start offset 3417435 (kafka.server. > > ReplicaFetcherThread > > broker-2 [2017-01-19 21:*49:41*,346] WARN [ReplicaFetcherThread-2-1007], > > Replica 1009 for partition some_topic-94 reset its fetch offset from > > 3417421 to current leader 1007's start offset 3417435 (kafka.server. > > ReplicaFetcherThread) > > worker-2 INFO 2017-01-19 21*:49:41*,393 [StreamThread-2][Fetcher.java: > 714] > > : Fetch offset 3416859 is out of range for partition some_topic-94, > > resetting offset > > broker-2 [2017-01-19 21:*49:41*,564] ERROR [ReplicaFetcherThread-2-1007], > > Current offset 3417421 for partition [some_topic,94] out of range; reset > > offset to 3417435 (kafka.server.ReplicaFetcherThread) > > broker-3 [2017-01-19 21:*49:41*,614] ERROR [ReplicaFetcherThread-2-1007], > > Current offset 3417421 for partition [some_topic,94] out of range; reset > > offset to 3417435 (kafka.server.ReplicaFetcherThread) > > > > worker-4 INFO 2017-01-19 21:*55:05*,072 [StreamThread-3][Fetcher.java: > 714] > > : Fetch offset 3762043 is out of range for partition some_topic-94, > > resetting offset > > > > > > Interestingly, the error repeats roughly every 5 minutes. > > > > Further more, while the job is running, it is writing data to the > > repartition topic: > > > > $ ansible tag_aws_autoscaling_groupName_Kafka_Cluster2 -u ec2-user -f 10 > > -a 'du -sch /data/kafka/logs/some_topic-*/*.log | tail -1' -m shell > > --become > > 54.172.57.255 | SUCCESS | rc=0 >> > > 2.1G total > > > > 54.175.254.148 | SUCCESS | rc=0 >> > > 2.2G total > > > > 54.174.72.110 | SUCCESS | rc=0 >> > > 2.5G total > > > > 54.167.181.186 | SUCCESS | rc=0 >> > > 2.0G total > > > > 52.3.220.225 | SUCCESS | rc=0 >> > > 2.3G total > > > > But after I stop the job and wait a while all the data disappears: > > > > $ ansible tag_aws_autoscaling_groupName_Kafka_Cluster2 -u ec2-user -f 10 > > -a 'du -sch /data/kafka/logs/some_topic-*/*.log | tail -1' -m shell > > --become > > 54.174.72.110 | SUCCESS | rc=0 >> > > 0 total > > > > 54.167.181.186 | SUCCESS | rc=0 >> > > 0 total > > > > 54.175.254.148 | SUCCESS | rc=0 >> > > 0 total > > > > 54.172.57.255 | SUCCESS | rc=0 >> > > 0 total > > > > 52.3.220.225 | SUCCESS | rc=0 >> > > 0 total > > > > > > Performing the test with unclean.leader.election.enable set to true or > > false makes no difference. > > > > While the 5 minute cadence of errors matches thee default value of the > > *leader.imbalance.check.interval.seconds* setting, increasing it appears > > to have no effect (errors still occur at 5 minute intervals). > > > > Anyone have any idea what might occur every five minutes that may result > > on this error?? > > > > > > > -- -- Guozhang