Hi Feng, Thanks for working on a fix for this. I gave you contributor permission on JIRA.
Before you jump right onto the code: do we have an agreement already on whether we change the default behaviour directly, or add a configuration option (e.g. add a `filterRestoredPartitionOffsetState()` method on the consumer) to enable the filtering? I'm still slightly in favor of keeping the default behaviour for the current Kafka connector, and only change that default for the upcoming rework of the connector. Cheers, Gordon On Fri, Feb 15, 2019 at 10:13 PM Feng LI <nemoking...@gmail.com> wrote: > Hello Aljoscha, > > Thanks for sharing the ticket, I think it makes sense to reopen the ticket. > (I can work on the fix for this, should be a small patch, just add a filter > when restoring Kafka partitions with those discovered partitions). > > (btw. Can I have a contributor access for jira, my username is f.li) > > Cheers, > Feng > > Le jeu. 14 févr. 2019 à 17:07, Aljoscha Krettek <aljos...@apache.org> a > écrit : > > > I think these two Jira issues are relevant here: > > - https://issues.apache.org/jira/browse/FLINK-10342 < > > https://issues.apache.org/jira/browse/FLINK-10342> > > - https://issues.apache.org/jira/browse/FLINK-9303 < > > https://issues.apache.org/jira/browse/FLINK-9303> > > > > The second one only because it’s slightly related. The first one is > > actually exactly this thread. > > > > I was against changing this behaviour in the Jira but I can now see that > > this is quite likely an issue. > > > > Aljoscha > > > > > On 13. Feb 2019, at 18:55, Gyula Fóra <gyula.f...@gmail.com> wrote: > > > > > > Hi! > > > > > > I agree that it’s very confusing if you explicitly specify the topics > > that > > > are to be confusing and what happens is different. > > > > > > I would almost consider this to be a bug , can’t see any reasonable use > > > case just hard to debug problems . > > > > > > Having an option would be a good start but I would rather treat this > as a > > > bug. > > > > > > Gyula > > > > > > On Wed, 13 Feb 2019 at 18:27, Feng LI <nemoking...@gmail.com> wrote: > > > > > >> Hello there, > > >> > > >> I’m just wondering if there are real world use cases for maintaining > > this > > >> default behavior. It’s a bit counter intuitive and sometimes results > in > > >> serious production issues. ( We had a similar issue when changing the > > topic > > >> name, and resulting reading every message twice - both from the old > one > > and > > >> from the new). > > >> > > >> Cheers, > > >> Feng > > >> Le mer. 13 févr. 2019 à 17:56, Tzu-Li (Gordon) Tai < > tzuli...@apache.org> > > a > > >> écrit : > > >> > > >>> Hi, > > >>> > > >>> Partition offsets stored in state will always be respected when the > > >>> consumer is restored from checkpoints / savepoints. > > >>> AFAIK, this seems to have been the behaviour for quite some time now > > >> (since > > >>> FlinkKafkaConsumer08). > > >>> > > >>> I think in the past there were some discussion to at least allow some > > way > > >>> to ignore restored partition offsets. > > >>> One way to enable this is to filter the restored partition offsets > > based > > >> on > > >>> the configured list of specified topics / topic regex pattern in the > > >>> current execution. This should work, since this can only be modified > > when > > >>> restoring from savepoints (i.e. manual restores). > > >>> To avoid breaking the current behaviour, we can maybe add a > > >>> `filterRestoredPartitionOffsetState()` configuration on the consumer, > > >> which > > >>> by default is disabled to match the current behaviour. > > >>> > > >>> What do you think? > > >>> > > >>> Cheers, > > >>> Gordon > > >>> > > >>> On Wed, Feb 13, 2019 at 11:59 PM Gyula Fóra <gyula.f...@gmail.com> > > >> wrote: > > >>> > > >>>> Hi! > > >>>> > > >>>> I have run into a weird issue which I could have sworn that it > wouldnt > > >>>> happen :D > > >>>> I feel there was a discussion about this in the past but maybe im > > >> wrong, > > >>>> but I hope someone can point me to a ticket. > > >>>> > > >>>> Lets say you create a kafka consumer that consumes (t1,t2,t3), you > > >> take a > > >>>> savepoint and deploy a new version that only consumes (t1). > > >>>> > > >>>> The restore logic now still starts to consume (t1,t2,t3) which feels > > >> very > > >>>> unintuitive as those were explicitly removed from the list. It is > also > > >>> hard > > >>>> to debug as the topics causing the problem are not defined anywhere > in > > >>> your > > >>>> job, configs etc. > > >>>> > > >>>> Has anyone run into this issue? Should we change this default > > behaviour > > >>> or > > >>>> at least have an option to not do this? > > >>>> > > >>>> Cheers, > > >>>> Gyula > > >>>> > > >>> > > >> > > > > >