Hi, I indeed don't have a specific use case in mind that justifies the current behaviour. My only concern is that since this was the default behaviour for a quite a while already, I can't be 100% that there are some user who actually expects the current behaviour. That may be dangerous since we would be essentially dropping state.
But I agree that this isn't a strong argument for not considering it a bug. Therefore, I agree with the following: - Change the default behaviour to filter restored partition offset states with the current configured KafkaTopicsDescriptor. The topics descriptor should only ever change when the job is manually restored. - Add a `disableFilteringRestoredPartitionOffsets()` (name TBD) to at least provide a fallback for users who were somehow expecting the legacy behaviour. I don't think adding this configuration would add too much complication to the implementation, so it would be worth to still add that as a safeguard, just in case. What do you think? Cheers, Gordon On Fri, Feb 15, 2019, 11:06 PM Feng LI <nemoking...@gmail.com> wrote: > Hello Gordon, > > Thanks for adding the contributor permission. :) > > Agree with Gyula, I would vote +1 for changing this behavior unless we have > use cases for maintaining it. I consider it more like a bug other than > expected behavior. > > We had one counter example in production when migrating from one topic to > another, restoring back and start consuming both topics, results in serious > production issue (read the same message twice). > > Cheers, > Feng > Le ven. 15 févr. 2019 à 15:44, Gyula Fóra <gyula.f...@gmail.com> a écrit : > > > Gordon, > > Do you have an example where the current default behaviour makes sense > and > > it doesnt cause unexpected problems? > > Or an example where someone might reasonably expect the current behaviour > > instead of the newly suggested one. > > > > If we have such cases I would agree lets keep it as is. If we cant come > up > > with anything reasonable I vote for changing the default. > > > > > > Gyula > > > > On Fri, Feb 15, 2019 at 3:31 PM Tzu-Li (Gordon) Tai <tzuli...@apache.org > > > > wrote: > > > > > Hi Feng, > > > > > > Thanks for working on a fix for this. > > > I gave you contributor permission on JIRA. > > > > > > Before you jump right onto the code: > > > do we have an agreement already on whether we change the default > > behaviour > > > directly, > > > or add a configuration option (e.g. add a > > > `filterRestoredPartitionOffsetState()` method on the consumer) to > enable > > > the filtering? > > > > > > I'm still slightly in favor of keeping the default behaviour for the > > > current Kafka connector, > > > and only change that default for the upcoming rework of the connector. > > > > > > Cheers, > > > Gordon > > > > > > On Fri, Feb 15, 2019 at 10:13 PM Feng LI <nemoking...@gmail.com> > wrote: > > > > > > > Hello Aljoscha, > > > > > > > > Thanks for sharing the ticket, I think it makes sense to reopen the > > > ticket. > > > > (I can work on the fix for this, should be a small patch, just add a > > > filter > > > > when restoring Kafka partitions with those discovered partitions). > > > > > > > > (btw. Can I have a contributor access for jira, my username is f.li) > > > > > > > > Cheers, > > > > Feng > > > > > > > > Le jeu. 14 févr. 2019 à 17:07, Aljoscha Krettek <aljos...@apache.org > > > > a > > > > écrit : > > > > > > > > > I think these two Jira issues are relevant here: > > > > > - https://issues.apache.org/jira/browse/FLINK-10342 < > > > > > https://issues.apache.org/jira/browse/FLINK-10342> > > > > > - https://issues.apache.org/jira/browse/FLINK-9303 < > > > > > https://issues.apache.org/jira/browse/FLINK-9303> > > > > > > > > > > The second one only because it’s slightly related. The first one is > > > > > actually exactly this thread. > > > > > > > > > > I was against changing this behaviour in the Jira but I can now see > > > that > > > > > this is quite likely an issue. > > > > > > > > > > Aljoscha > > > > > > > > > > > On 13. Feb 2019, at 18:55, Gyula Fóra <gyula.f...@gmail.com> > > wrote: > > > > > > > > > > > > Hi! > > > > > > > > > > > > I agree that it’s very confusing if you explicitly specify the > > topics > > > > > that > > > > > > are to be confusing and what happens is different. > > > > > > > > > > > > I would almost consider this to be a bug , can’t see any > reasonable > > > use > > > > > > case just hard to debug problems . > > > > > > > > > > > > Having an option would be a good start but I would rather treat > > this > > > > as a > > > > > > bug. > > > > > > > > > > > > Gyula > > > > > > > > > > > > On Wed, 13 Feb 2019 at 18:27, Feng LI <nemoking...@gmail.com> > > wrote: > > > > > > > > > > > >> Hello there, > > > > > >> > > > > > >> I’m just wondering if there are real world use cases for > > maintaining > > > > > this > > > > > >> default behavior. It’s a bit counter intuitive and sometimes > > results > > > > in > > > > > >> serious production issues. ( We had a similar issue when > changing > > > the > > > > > topic > > > > > >> name, and resulting reading every message twice - both from the > > old > > > > one > > > > > and > > > > > >> from the new). > > > > > >> > > > > > >> Cheers, > > > > > >> Feng > > > > > >> Le mer. 13 févr. 2019 à 17:56, Tzu-Li (Gordon) Tai < > > > > tzuli...@apache.org> > > > > > a > > > > > >> écrit : > > > > > >> > > > > > >>> Hi, > > > > > >>> > > > > > >>> Partition offsets stored in state will always be respected when > > the > > > > > >>> consumer is restored from checkpoints / savepoints. > > > > > >>> AFAIK, this seems to have been the behaviour for quite some > time > > > now > > > > > >> (since > > > > > >>> FlinkKafkaConsumer08). > > > > > >>> > > > > > >>> I think in the past there were some discussion to at least > allow > > > some > > > > > way > > > > > >>> to ignore restored partition offsets. > > > > > >>> One way to enable this is to filter the restored partition > > offsets > > > > > based > > > > > >> on > > > > > >>> the configured list of specified topics / topic regex pattern > in > > > the > > > > > >>> current execution. This should work, since this can only be > > > modified > > > > > when > > > > > >>> restoring from savepoints (i.e. manual restores). > > > > > >>> To avoid breaking the current behaviour, we can maybe add a > > > > > >>> `filterRestoredPartitionOffsetState()` configuration on the > > > consumer, > > > > > >> which > > > > > >>> by default is disabled to match the current behaviour. > > > > > >>> > > > > > >>> What do you think? > > > > > >>> > > > > > >>> Cheers, > > > > > >>> Gordon > > > > > >>> > > > > > >>> On Wed, Feb 13, 2019 at 11:59 PM Gyula Fóra < > > gyula.f...@gmail.com> > > > > > >> wrote: > > > > > >>> > > > > > >>>> Hi! > > > > > >>>> > > > > > >>>> I have run into a weird issue which I could have sworn that it > > > > wouldnt > > > > > >>>> happen :D > > > > > >>>> I feel there was a discussion about this in the past but maybe > > im > > > > > >> wrong, > > > > > >>>> but I hope someone can point me to a ticket. > > > > > >>>> > > > > > >>>> Lets say you create a kafka consumer that consumes (t1,t2,t3), > > you > > > > > >> take a > > > > > >>>> savepoint and deploy a new version that only consumes (t1). > > > > > >>>> > > > > > >>>> The restore logic now still starts to consume (t1,t2,t3) which > > > feels > > > > > >> very > > > > > >>>> unintuitive as those were explicitly removed from the list. It > > is > > > > also > > > > > >>> hard > > > > > >>>> to debug as the topics causing the problem are not defined > > > anywhere > > > > in > > > > > >>> your > > > > > >>>> job, configs etc. > > > > > >>>> > > > > > >>>> Has anyone run into this issue? Should we change this default > > > > > behaviour > > > > > >>> or > > > > > >>>> at least have an option to not do this? > > > > > >>>> > > > > > >>>> Cheers, > > > > > >>>> Gyula > > > > > >>>> > > > > > >>> > > > > > >> > > > > > > > > > > > > > > > > > > > >