There is some discussion going on in https://issues.apache.org/jira/browse/KAFKA-3370 <https://issues.apache.org/jira/browse/KAFKA-3370> about this, as well.
I've added a link to your JIRA about it. -James > On Jan 17, 2017, at 9:35 PM, Jeff Widman <j...@netskope.com> wrote: > > Agree on suggesting in the docs that generally the default offset should be > reset to "none" after the mirrormaker is going. > > There is an edgecase where you want to keep offsets to earliest: When > you've got a mirrormaker consumer subscribed to a regex pattern and have > auto-topic creation enabled on your cluster. > > If you start producing to a non-existent topic that matches the regex, then > there will be a period of time where the producer is producing before the > new topic's partitions have been picked up by the mirrormaker. Those > messages will never be consumed by the mirrormaker because it will start > from latest, ignoring those just-produced messages. > > Also agree on increasing the default offsets retention minutes. I actually > didn't realize the default was so small. > > On Tue, Jan 17, 2017 at 8:16 PM, Grant Henke <ghe...@cloudera.com> wrote: > >> I agree that setting the default auto.offset.reset to earliest makes sense >> (This was actually a default choice Flume made for its Kafka channel to >> avoid missing the first messages). However I think, at a minimum, we should >> also document a recommendation to consider changing the value to none after >> mirror maker has run to commit its initial offsets. >> >> Setting the value to none ensures you don't replicated the entire topic >> from scratch in the case offsets are lost or purged due to prolonged >> downtime or other unforeseen circumstances. Having and auto.offset.reset of >> none also allows you to ensure you don't miss data. Missing data can occur >> when auto.offset.reset is set to latest and the offset state was lost >> before mirrormaker was caught up or data was produced while it was down. >> >> I would also suggest considering increasing the default >> offsets.retention.minutes >> from 1 day (1440) to 7 days (10080)...or something similar. I have seen a >> handful of scenarios where an outage lasts longer than a day, the offsets >> get purged causing the auto.offset.reset to kick in and in the case of >> earliest, re-replicating billions of messages. >> >> On Tue, Jan 17, 2017 at 8:09 PM, Jeff Widman <j...@netskope.com> wrote: >> >>> [Moving discussion from users list to dev list] >>> >>> I agree with Ewen that it's more sensible for mirrormaker to default to >>> replicating topics from the earliest offset available, rather than just >>> replicating from the current offset onward. >>> >>> I filed a JIRA ticket https://issues.apache.org/jira/browse/KAFKA-4668 >>> As well as a PR: https://github.com/apache/kafka/pull/2394 >>> >>> Does this need a KIP? >>> >>> The main side effect of this change is if you start mirroring a new topic >>> you can hammer your network until it catches up or until you realize >> what's >>> happening and throttle the mirrormaker client. >>> >>> Cheers, >>> Jeff >>> >>> >>> On Thu, Jan 5, 2017 at 7:55 PM, Ewen Cheslack-Postava <e...@confluent.io >>> >>> wrote: >>> >>>> The basic issue here is just that the auto.offset.reset defaults to >>> latest, >>>> right? That's not a very good setting for a mirroring tool and this >> seems >>>> like something we might just want to change the default for. It's >>> debatable >>>> whether it would even need a KIP. >>>> >>>> We have other settings in MM where we override them if they aren't set >>>> explicitly but we don't want the normal defaults. Most are producer >>>> properties to avoid duplicates (the acks, retries, max.block.ms, and >>>> max.in.flight.requests.per.connection settings), but there are a >> couple >>> of >>>> consumer ones too (auto.commit.enable and consumer.timeout.ms). >>>> >>>> This is probably something like a 1-line MM patch if someone wants to >>>> tackle it -- the question of whether it needs a KIP or not is, >>>> unfortunately, the more complicated question :( >>>> >>>> -Ewen >>>> >>>> On Thu, Jan 5, 2017 at 1:10 PM, James Cheng <wushuja...@gmail.com> >>> wrote: >>>> >>>>> >>>>>> On Jan 5, 2017, at 12:57 PM, Jeff Widman <j...@netskope.com> >> wrote: >>>>>> >>>>>> Thanks James and Hans. >>>>>> >>>>>> Will this also happen when we expand the number of partitions in a >>>> topic? >>>>>> >>>>>> That also will trigger a rebalance, the consumer won't subscribe to >>> the >>>>>> partition until the rebalance finishes, etc. >>>>>> >>>>>> So it'd seem that any messages published to the new partition in >>>> between >>>>>> the partition creation and the rebalance finishing won't be >> consumed >>> by >>>>> any >>>>>> consumers that have offset=latest >>>>>> >>>>> >>>>> It hadn't occured to me until you mentioned it, but yes, I think it'd >>>> also >>>>> happen in those cases. >>>>> >>>>> In the kafka consumer javadocs, they provide a list of things that >>> would >>>>> cause a rebalance: >>>>> http://kafka.apache.org/0101/javadoc/org/apache/kafka/ >>> clients/consumer/ >>>>> KafkaConsumer.html#subscribe(java.util.Collection,%20org. >>>>> apache.kafka.clients.consumer.ConsumerRebalanceListener) < >>>>> http://kafka.apache.org/0101/javadoc/org/apache/kafka/ >>> clients/consumer/ >>>>> KafkaConsumer.html#subscribe(java.util.Collection, >>>>> org.apache.kafka.clients.consumer.ConsumerRebalanceListener)> >>>>> >>>>> "As part of group management, the consumer will keep track of the >> list >>> of >>>>> consumers that belong to a particular group and will trigger a >>> rebalance >>>>> operation if one of the following events trigger - >>>>> >>>>> Number of partitions change for any of the subscribed list of topics >>>>> Topic is created or deleted >>>>> An existing member of the consumer group dies >>>>> A new member is added to an existing consumer group via the join API >>>>> " >>>>> >>>>> I'm guessing that this would affect any of those scenarios. >>>>> >>>>> -James >>>>> >>>>> >>>>>> >>>>>> >>>>>> >>>>>> On Thu, Jan 5, 2017 at 12:40 AM, James Cheng <wushuja...@gmail.com >>> >>>>> wrote: >>>>>> >>>>>>> Jeff, >>>>>>> >>>>>>> Your analysis is correct. I would say that it is known but >>> unintuitive >>>>>>> behavior. >>>>>>> >>>>>>> As an example of a problem caused by this behavior, it's possible >>> for >>>>>>> mirrormaker to miss messages on newly created topics, even thought >>> it >>>>> was >>>>>>> subscribed to them before topics were creted. >>>>>>> >>>>>>> See the following JIRAs: >>>>>>> https://issues.apache.org/jira/browse/KAFKA-3848 < >>>>>>> https://issues.apache.org/jira/browse/KAFKA-3848> >>>>>>> https://issues.apache.org/jira/browse/KAFKA-3370 < >>>>>>> https://issues.apache.org/jira/browse/KAFKA-3370> >>>>>>> >>>>>>> -James >>>>>>> >>>>>>>> On Jan 4, 2017, at 4:37 PM, h...@confluent.io wrote: >>>>>>>> >>>>>>>> This sounds exactly as I would expect things to behave. If you >>>> consume >>>>>>> from the beginning I would think you would get all the messages >> but >>>> not >>>>> if >>>>>>> you consume from the latest offset. You can separately tune the >>>> metadata >>>>>>> refresh interval if you want to miss fewer messages but that still >>>> won't >>>>>>> get you all messages from the beginning if you don't explicitly >>>> consume >>>>>>> from the beginning. >>>>>>>> >>>>>>>> Sent from my iPhone >>>>>>>> >>>>>>>>> On Jan 4, 2017, at 6:53 PM, Jeff Widman <j...@netskope.com> >>> wrote: >>>>>>>>> >>>>>>>>> I'm seeing consumers miss messages when they subscribe before >> the >>>>> topic >>>>>>> is >>>>>>>>> actually created. >>>>>>>>> >>>>>>>>> Scenario: >>>>>>>>> 1) kafka 0.10.1.1 cluster with allow-topic no topics, but >> supports >>>>> topic >>>>>>>>> auto-creation as soon as a message is published to the topic >>>>>>>>> 2) consumer subscribes using topic string or a regex pattern. >>>>> Currently >>>>>>> no >>>>>>>>> topics match. Consumer offset is "latest" >>>>>>>>> 3) producer publishes to a topic that matches the string or >> regex >>>>>>> pattern. >>>>>>>>> 4) broker immediately creates a topic, writes the message, and >>> also >>>>>>>>> notifies the consumer group that a rebalance needs to happen to >>>> assign >>>>>>> the >>>>>>>>> topic_partition to one of the consumers.. >>>>>>>>> 5) rebalance is fairly quick, maybe a second or so >>>>>>>>> 6) a consumer is assigned to the newly-created topic_partition >>>>>>>>> >>>>>>>>> At this point, we've got a consumer steadily polling the >> recently >>>>>>> created >>>>>>>>> topic_partition. However, the consumer.poll() never returns any >>>>> messages >>>>>>>>> published between topic creation and when the consumer was >>> assigned >>>> to >>>>>>> the >>>>>>>>> topic_partition. I'm guessing this may be because when the >>> consumer >>>> is >>>>>>>>> assigned to the topic_partition it doesn't find any, so it uses >>> the >>>>>>> latest >>>>>>>>> offset, which happens to be after the messages that were >> published >>>> to >>>>>>>>> create the topic. >>>>>>>>> >>>>>>>>> This is surprising because the consumer technically was >> subscribed >>>> to >>>>>>> the >>>>>>>>> topic before the messages were produced, so you'd think the >>> consumer >>>>>>> would >>>>>>>>> receive these messages. >>>>>>>>> >>>>>>>>> Is this known behavior? A bug in Kafka broker? Or a bug in my >>> client >>>>>>>>> library? >>>>>>> >>>>>>> >>>>> >>>>> >>>> >>> >> >> >> >> -- >> Grant Henke >> Software Engineer | Cloudera >> gr...@cloudera.com | twitter.com/gchenke | linkedin.com/in/granthenke >>