There is some discussion going on in 
https://issues.apache.org/jira/browse/KAFKA-3370 
<https://issues.apache.org/jira/browse/KAFKA-3370> about this, as well.

I've added a link to your JIRA about it.

-James

> On Jan 17, 2017, at 9:35 PM, Jeff Widman <j...@netskope.com> wrote:
> 
> Agree on suggesting in the docs that generally the default offset should be
> reset to "none" after the mirrormaker is going.
> 
> There is an edgecase where you want to keep offsets to earliest: When
> you've got a mirrormaker consumer subscribed to a regex pattern and have
> auto-topic creation enabled on your cluster.
> 
> If you start producing to a non-existent topic that matches the regex, then
> there will be a period of time where the producer is producing before the
> new topic's partitions have been picked up by the mirrormaker. Those
> messages will never be consumed by the mirrormaker because it will start
> from latest, ignoring those just-produced messages.
> 
> Also agree on increasing the default offsets retention minutes. I actually
> didn't realize the default was so small.
> 
> On Tue, Jan 17, 2017 at 8:16 PM, Grant Henke <ghe...@cloudera.com> wrote:
> 
>> I agree that setting the default auto.offset.reset to earliest makes sense
>> (This was actually a default choice Flume made for its Kafka channel to
>> avoid missing the first messages). However I think, at a minimum, we should
>> also document a recommendation to consider changing the value to none after
>> mirror maker has run to commit its initial offsets.
>> 
>> Setting the value to none ensures you don't replicated the entire topic
>> from scratch in the case offsets are lost or purged due to prolonged
>> downtime or other unforeseen circumstances. Having and auto.offset.reset of
>> none also allows you to ensure you don't miss data. Missing data can occur
>> when auto.offset.reset is set to latest and the offset state was lost
>> before mirrormaker was caught up or data was produced while it was down.
>> 
>> I would also suggest considering increasing the default
>> offsets.retention.minutes
>> from 1 day (1440) to 7 days (10080)...or something similar. I have seen a
>> handful of scenarios where an outage lasts longer than a day, the offsets
>> get purged causing the auto.offset.reset to kick in and in the case of
>> earliest, re-replicating billions of messages.
>> 
>> On Tue, Jan 17, 2017 at 8:09 PM, Jeff Widman <j...@netskope.com> wrote:
>> 
>>> [Moving discussion from users list to dev list]
>>> 
>>> I agree with Ewen that it's more sensible for mirrormaker to default to
>>> replicating topics from the earliest offset available, rather than just
>>> replicating from the current offset onward.
>>> 
>>> I filed a JIRA ticket https://issues.apache.org/jira/browse/KAFKA-4668
>>> As well as a PR: https://github.com/apache/kafka/pull/2394
>>> 
>>> Does this need a KIP?
>>> 
>>> The main side effect of this change is if you start mirroring a new topic
>>> you can hammer your network until it catches up or until you realize
>> what's
>>> happening and throttle the mirrormaker client.
>>> 
>>> Cheers,
>>> Jeff
>>> 
>>> 
>>> On Thu, Jan 5, 2017 at 7:55 PM, Ewen Cheslack-Postava <e...@confluent.io
>>> 
>>> wrote:
>>> 
>>>> The basic issue here is just that the auto.offset.reset defaults to
>>> latest,
>>>> right? That's not a very good setting for a mirroring tool and this
>> seems
>>>> like something we might just want to change the default for. It's
>>> debatable
>>>> whether it would even need a KIP.
>>>> 
>>>> We have other settings in MM where we override them if they aren't set
>>>> explicitly but we don't want the normal defaults. Most are producer
>>>> properties to avoid duplicates (the acks, retries, max.block.ms, and
>>>> max.in.flight.requests.per.connection settings), but there are a
>> couple
>>> of
>>>> consumer ones too (auto.commit.enable and consumer.timeout.ms).
>>>> 
>>>> This is probably something like a 1-line MM patch if someone wants to
>>>> tackle it -- the question of whether it needs a KIP or not is,
>>>> unfortunately, the more complicated question :(
>>>> 
>>>> -Ewen
>>>> 
>>>> On Thu, Jan 5, 2017 at 1:10 PM, James Cheng <wushuja...@gmail.com>
>>> wrote:
>>>> 
>>>>> 
>>>>>> On Jan 5, 2017, at 12:57 PM, Jeff Widman <j...@netskope.com>
>> wrote:
>>>>>> 
>>>>>> Thanks James and Hans.
>>>>>> 
>>>>>> Will this also happen when we expand the number of partitions in a
>>>> topic?
>>>>>> 
>>>>>> That also will trigger a rebalance, the consumer won't subscribe to
>>> the
>>>>>> partition until the rebalance finishes, etc.
>>>>>> 
>>>>>> So it'd seem that any messages published to the new partition in
>>>> between
>>>>>> the partition creation and the rebalance finishing won't be
>> consumed
>>> by
>>>>> any
>>>>>> consumers that have offset=latest
>>>>>> 
>>>>> 
>>>>> It hadn't occured to me until you mentioned it, but yes, I think it'd
>>>> also
>>>>> happen in those cases.
>>>>> 
>>>>> In the kafka consumer javadocs, they provide a list of things that
>>> would
>>>>> cause a rebalance:
>>>>> http://kafka.apache.org/0101/javadoc/org/apache/kafka/
>>> clients/consumer/
>>>>> KafkaConsumer.html#subscribe(java.util.Collection,%20org.
>>>>> apache.kafka.clients.consumer.ConsumerRebalanceListener) <
>>>>> http://kafka.apache.org/0101/javadoc/org/apache/kafka/
>>> clients/consumer/
>>>>> KafkaConsumer.html#subscribe(java.util.Collection,
>>>>> org.apache.kafka.clients.consumer.ConsumerRebalanceListener)>
>>>>> 
>>>>> "As part of group management, the consumer will keep track of the
>> list
>>> of
>>>>> consumers that belong to a particular group and will trigger a
>>> rebalance
>>>>> operation if one of the following events trigger -
>>>>> 
>>>>> Number of partitions change for any of the subscribed list of topics
>>>>> Topic is created or deleted
>>>>> An existing member of the consumer group dies
>>>>> A new member is added to an existing consumer group via the join API
>>>>> "
>>>>> 
>>>>> I'm guessing that this would affect any of those scenarios.
>>>>> 
>>>>> -James
>>>>> 
>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> On Thu, Jan 5, 2017 at 12:40 AM, James Cheng <wushuja...@gmail.com
>>> 
>>>>> wrote:
>>>>>> 
>>>>>>> Jeff,
>>>>>>> 
>>>>>>> Your analysis is correct. I would say that it is known but
>>> unintuitive
>>>>>>> behavior.
>>>>>>> 
>>>>>>> As an example of a problem caused by this behavior, it's possible
>>> for
>>>>>>> mirrormaker to miss messages on newly created topics, even thought
>>> it
>>>>> was
>>>>>>> subscribed to them before topics were creted.
>>>>>>> 
>>>>>>> See the following JIRAs:
>>>>>>> https://issues.apache.org/jira/browse/KAFKA-3848 <
>>>>>>> https://issues.apache.org/jira/browse/KAFKA-3848>
>>>>>>> https://issues.apache.org/jira/browse/KAFKA-3370 <
>>>>>>> https://issues.apache.org/jira/browse/KAFKA-3370>
>>>>>>> 
>>>>>>> -James
>>>>>>> 
>>>>>>>> On Jan 4, 2017, at 4:37 PM, h...@confluent.io wrote:
>>>>>>>> 
>>>>>>>> This sounds exactly as I would expect things to behave. If you
>>>> consume
>>>>>>> from the beginning I would think you would get all the messages
>> but
>>>> not
>>>>> if
>>>>>>> you consume from the latest offset. You can separately tune the
>>>> metadata
>>>>>>> refresh interval if you want to miss fewer messages but that still
>>>> won't
>>>>>>> get you all messages from the beginning if you don't explicitly
>>>> consume
>>>>>>> from the beginning.
>>>>>>>> 
>>>>>>>> Sent from my iPhone
>>>>>>>> 
>>>>>>>>> On Jan 4, 2017, at 6:53 PM, Jeff Widman <j...@netskope.com>
>>> wrote:
>>>>>>>>> 
>>>>>>>>> I'm seeing consumers miss messages when they subscribe before
>> the
>>>>> topic
>>>>>>> is
>>>>>>>>> actually created.
>>>>>>>>> 
>>>>>>>>> Scenario:
>>>>>>>>> 1) kafka 0.10.1.1 cluster with allow-topic no topics, but
>> supports
>>>>> topic
>>>>>>>>> auto-creation as soon as a message is published to the topic
>>>>>>>>> 2) consumer subscribes using topic string or a regex pattern.
>>>>> Currently
>>>>>>> no
>>>>>>>>> topics match. Consumer offset is "latest"
>>>>>>>>> 3) producer publishes to a topic that matches the string or
>> regex
>>>>>>> pattern.
>>>>>>>>> 4) broker immediately creates a topic, writes the message, and
>>> also
>>>>>>>>> notifies the consumer group that a rebalance needs to happen to
>>>> assign
>>>>>>> the
>>>>>>>>> topic_partition to one of the consumers..
>>>>>>>>> 5) rebalance is fairly quick, maybe a second or so
>>>>>>>>> 6) a consumer is assigned to the newly-created topic_partition
>>>>>>>>> 
>>>>>>>>> At this point, we've got a consumer steadily polling the
>> recently
>>>>>>> created
>>>>>>>>> topic_partition. However, the consumer.poll() never returns any
>>>>> messages
>>>>>>>>> published between topic creation and when the consumer was
>>> assigned
>>>> to
>>>>>>> the
>>>>>>>>> topic_partition. I'm guessing this may be because when the
>>> consumer
>>>> is
>>>>>>>>> assigned to the topic_partition it doesn't find any, so it uses
>>> the
>>>>>>> latest
>>>>>>>>> offset, which happens to be after the messages that were
>> published
>>>> to
>>>>>>>>> create the topic.
>>>>>>>>> 
>>>>>>>>> This is surprising because the consumer technically was
>> subscribed
>>>> to
>>>>>>> the
>>>>>>>>> topic before the messages were produced, so you'd think the
>>> consumer
>>>>>>> would
>>>>>>>>> receive these messages.
>>>>>>>>> 
>>>>>>>>> Is this known behavior? A bug in Kafka broker? Or a bug in my
>>> client
>>>>>>>>> library?
>>>>>>> 
>>>>>>> 
>>>>> 
>>>>> 
>>>> 
>>> 
>> 
>> 
>> 
>> --
>> Grant Henke
>> Software Engineer | Cloudera
>> gr...@cloudera.com | twitter.com/gchenke | linkedin.com/in/granthenke
>> 

Reply via email to