Hi,

I indeed don't have a specific use case in mind that justifies the current
behaviour.
My only concern is that since this was the default behaviour for a quite a
while already, I can't be 100% that there are some user who actually
expects the current behaviour. That may be dangerous since we would be
essentially dropping state.

But I agree that this isn't a strong argument for not considering it a bug.
Therefore, I agree with the following:

- Change the default behaviour to filter restored partition offset states
with the current configured KafkaTopicsDescriptor. The topics descriptor
should only ever change when the job is manually restored.
- Add a `disableFilteringRestoredPartitionOffsets()` (name TBD) to at least
provide a fallback for users who were somehow expecting the legacy
behaviour. I don't think adding this configuration would add too much
complication to the implementation, so it would be worth to still add that
as a safeguard, just in case.

What do you think?

Cheers,
Gordon

On Fri, Feb 15, 2019, 11:06 PM Feng LI <nemoking...@gmail.com> wrote:

> Hello Gordon,
>
> Thanks for adding the contributor permission. :)
>
> Agree with Gyula, I would vote +1 for changing this behavior unless we have
> use cases for maintaining it. I consider it more like a bug other than
> expected behavior.
>
> We had one counter example in production when migrating from one topic to
> another, restoring back and start consuming both topics, results in serious
> production issue (read the same message twice).
>
> Cheers,
> Feng
> Le ven. 15 févr. 2019 à 15:44, Gyula Fóra <gyula.f...@gmail.com> a écrit :
>
> > Gordon,
> > Do you have an example where the current default behaviour makes sense
> and
> > it doesnt cause unexpected problems?
> > Or an example where someone might reasonably expect the current behaviour
> > instead of the newly suggested one.
> >
> > If we have such cases I would agree lets keep it as is. If we cant come
> up
> > with anything reasonable I vote for changing the default.
> >
> >
> > Gyula
> >
> > On Fri, Feb 15, 2019 at 3:31 PM Tzu-Li (Gordon) Tai <tzuli...@apache.org
> >
> > wrote:
> >
> > > Hi Feng,
> > >
> > > Thanks for working on a fix for this.
> > > I gave you contributor permission on JIRA.
> > >
> > > Before you jump right onto the code:
> > > do we have an agreement already on whether we change the default
> > behaviour
> > > directly,
> > > or add a configuration option (e.g. add a
> > > `filterRestoredPartitionOffsetState()` method on the consumer) to
> enable
> > > the filtering?
> > >
> > > I'm still slightly in favor of keeping the default behaviour for the
> > > current Kafka connector,
> > > and only change that default for the upcoming rework of the connector.
> > >
> > > Cheers,
> > > Gordon
> > >
> > > On Fri, Feb 15, 2019 at 10:13 PM Feng LI <nemoking...@gmail.com>
> wrote:
> > >
> > > > Hello Aljoscha,
> > > >
> > > > Thanks for sharing the ticket, I think it makes sense to reopen the
> > > ticket.
> > > > (I can work on the fix for this, should be a small patch, just add a
> > > filter
> > > > when restoring Kafka partitions with those discovered partitions).
> > > >
> > > > (btw. Can I have a contributor access for jira, my username is f.li)
> > > >
> > > > Cheers,
> > > > Feng
> > > >
> > > > Le jeu. 14 févr. 2019 à 17:07, Aljoscha Krettek <aljos...@apache.org
> >
> > a
> > > > écrit :
> > > >
> > > > > I think these two Jira issues are relevant here:
> > > > >  - https://issues.apache.org/jira/browse/FLINK-10342 <
> > > > > https://issues.apache.org/jira/browse/FLINK-10342>
> > > > >  - https://issues.apache.org/jira/browse/FLINK-9303 <
> > > > > https://issues.apache.org/jira/browse/FLINK-9303>
> > > > >
> > > > > The second one only because it’s slightly related. The first one is
> > > > > actually exactly this thread.
> > > > >
> > > > > I was against changing this behaviour in the Jira but I can now see
> > > that
> > > > > this is quite likely an issue.
> > > > >
> > > > > Aljoscha
> > > > >
> > > > > > On 13. Feb 2019, at 18:55, Gyula Fóra <gyula.f...@gmail.com>
> > wrote:
> > > > > >
> > > > > > Hi!
> > > > > >
> > > > > > I agree that it’s very confusing if you explicitly specify the
> > topics
> > > > > that
> > > > > > are to be confusing and what happens is different.
> > > > > >
> > > > > > I would almost consider this to be a bug , can’t see any
> reasonable
> > > use
> > > > > > case just hard to debug problems .
> > > > > >
> > > > > > Having an option would be a good start but I would rather treat
> > this
> > > > as a
> > > > > > bug.
> > > > > >
> > > > > > Gyula
> > > > > >
> > > > > > On Wed, 13 Feb 2019 at 18:27, Feng LI <nemoking...@gmail.com>
> > wrote:
> > > > > >
> > > > > >> Hello there,
> > > > > >>
> > > > > >> I’m just wondering if there are real world use cases for
> > maintaining
> > > > > this
> > > > > >> default behavior. It’s a bit counter intuitive and sometimes
> > results
> > > > in
> > > > > >> serious production issues. ( We had a similar issue when
> changing
> > > the
> > > > > topic
> > > > > >> name, and resulting reading every message twice - both from the
> > old
> > > > one
> > > > > and
> > > > > >> from the new).
> > > > > >>
> > > > > >> Cheers,
> > > > > >> Feng
> > > > > >> Le mer. 13 févr. 2019 à 17:56, Tzu-Li (Gordon) Tai <
> > > > tzuli...@apache.org>
> > > > > a
> > > > > >> écrit :
> > > > > >>
> > > > > >>> Hi,
> > > > > >>>
> > > > > >>> Partition offsets stored in state will always be respected when
> > the
> > > > > >>> consumer is restored from checkpoints / savepoints.
> > > > > >>> AFAIK, this seems to have been the behaviour for quite some
> time
> > > now
> > > > > >> (since
> > > > > >>> FlinkKafkaConsumer08).
> > > > > >>>
> > > > > >>> I think in the past there were some discussion to at least
> allow
> > > some
> > > > > way
> > > > > >>> to ignore restored partition offsets.
> > > > > >>> One way to enable this is to filter the restored partition
> > offsets
> > > > > based
> > > > > >> on
> > > > > >>> the configured list of specified topics / topic regex pattern
> in
> > > the
> > > > > >>> current execution. This should work, since this can only be
> > > modified
> > > > > when
> > > > > >>> restoring from savepoints (i.e. manual restores).
> > > > > >>> To avoid breaking the current behaviour, we can maybe add a
> > > > > >>> `filterRestoredPartitionOffsetState()` configuration on the
> > > consumer,
> > > > > >> which
> > > > > >>> by default is disabled to match the current behaviour.
> > > > > >>>
> > > > > >>> What do you think?
> > > > > >>>
> > > > > >>> Cheers,
> > > > > >>> Gordon
> > > > > >>>
> > > > > >>> On Wed, Feb 13, 2019 at 11:59 PM Gyula Fóra <
> > gyula.f...@gmail.com>
> > > > > >> wrote:
> > > > > >>>
> > > > > >>>> Hi!
> > > > > >>>>
> > > > > >>>> I have run into a weird issue which I could have sworn that it
> > > > wouldnt
> > > > > >>>> happen :D
> > > > > >>>> I feel there was a discussion about this in the past but maybe
> > im
> > > > > >> wrong,
> > > > > >>>> but I hope someone can point me to a ticket.
> > > > > >>>>
> > > > > >>>> Lets say you create a kafka consumer that consumes (t1,t2,t3),
> > you
> > > > > >> take a
> > > > > >>>> savepoint and deploy a new version that only consumes (t1).
> > > > > >>>>
> > > > > >>>> The restore logic now still starts to consume (t1,t2,t3) which
> > > feels
> > > > > >> very
> > > > > >>>> unintuitive as those were explicitly removed from the list. It
> > is
> > > > also
> > > > > >>> hard
> > > > > >>>> to debug as the topics causing the problem are not defined
> > > anywhere
> > > > in
> > > > > >>> your
> > > > > >>>> job, configs etc.
> > > > > >>>>
> > > > > >>>> Has anyone run into this issue? Should we change this default
> > > > > behaviour
> > > > > >>> or
> > > > > >>>> at least have an option to not do this?
> > > > > >>>>
> > > > > >>>> Cheers,
> > > > > >>>> Gyula
> > > > > >>>>
> > > > > >>>
> > > > > >>
> > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to