Hi Timo, Thanks for the explanation, it's much clearer now.
One thing I want to confirm about `supportedPlanFormat ` and `supportedSavepointFormat `: `supportedPlanFormat ` supports multiple versions, while `supportedSavepointFormat ` supports only one version ? A json plan can be deserialized by multiple versions because default value will be set for new fields. In theory, a Savepoint can be restored by more than one version of the operators even if a state layout is changed, such as deleting a whole state and starting job with `allowNonRestoredState`=true. I think this is a corner case, and it's hard to understand comparing to `supportedPlanFormat ` supporting multiple versions. So, for most cases, when the state layout is changed, the savepoint is incompatible, and `supportedSavepointFormat` and version need to be changed. I think we need a detail explanation about the annotations change story in the java doc of `ExecNodeMetadata` class for all developers (esp. those unfamiliar with this part). Best, Godfrey Timo Walther <twal...@apache.org> 于2021年12月8日周三 下午4:57写道: > > Hi Wenlong, > > thanks for the feedback. Great that we reached consensus here. I will > update the entire document with my previous example shortly. > > > if we don't update the version when plan format changes, we can't > find that the plan can't not be deserialized in 1.15 > > This should not be a problem as the entire plan file has a version as > well. We should not allow reading a 1.16 plan in 1.15. We can throw a > helpful exception early. > > Reading a 1.15 plan in 1.16 is possible until we drop the old > `supportedPlanFormat` from one of used ExecNodes. Afterwards all > `supportedPlanFormat` of ExecNodes must be equal or higher then the plan > version. > > Regards, > Timo > > On 08.12.21 03:07, wenlong.lwl wrote: > > Hi, Timo, +1 for multi metadata. > > > > The compatible change I mean in the last email is the slight state change > > example you gave, so we have got consensus on this actually, IMO. > > > > Another question based on the example you gave: > > In the example "JSON node gets an additional property in 1.16", if we don't > > update the version when plan format changes, we can't find that the plan > > can't not be deserialized in 1.15, although the savepoint state is > > compatible. > > The error message may be not so friendly if we just throw deserialization > > failure. > > > > On Tue, 7 Dec 2021 at 16:49, Timo Walther <twal...@apache.org> wrote: > > > >> Hi Wenlong, > >> > >> > First, we add a newStateLayout because of some improvement in state, > >> in > >> > order to keep compatibility we may still keep the old state for the > >> first > >> > version. We need to update the version, so that we can generate a new > >> > version plan for the new job and keep the exec node compatible with > >> the old > >> > version plan. > >> > >> The problem that I see here for contributors is that the actual update > >> of a version is more complicated than just updating an integer value. It > >> means copying a lot of ExecNode code for a change that happens locally > >> in an operator. Let's assume multiple ExecNodes use a similar operator. > >> Why do we need to update all ExecNode versions, if the operator itself > >> can deal with the incompatibility. The ExecNode version is meant for > >> topology changes or fundamental state changes. > >> > >> If we don't find consensus on this topic, I would at least vote for > >> supporting multiple annotations for an ExecNode class. This way we don't > >> need to copy code but only add two ExecNode annotations with different > >> ExecNode versions. > >> > >> > Maybe we can add support for this case : > >> > when an exec node is changed in 1.16, but is compatible with 1.15, > >> > we can use the node of 1.16 to deserialize the plan of 1.15. > >> > >> If the ExecNode is compatible, there is no reason to increase the > >> ExecNode version. > >> > >> > >> > >> I tried to come up with a reworked solution to make all parties happy: > >> > >> 1. Let's assume the following annotations: > >> > >> supportedPlanFormat = [1.15] > >> > >> supportedSavepointFormat = 1.15 > >> > >> we drop `added` as it is equal to `supportedSavepointFormat` > >> > >> 2. Multiple annotations over ExecNodes are possible: > >> > >> // operator state changes > >> > >> // initial introduction in 1.15 > >> @ExecNodeMetadata(name=A, version=1, supportedPlanFormat=1.15, > >> supportedSavepointFormat=1.15) > >> > >> // state layout changed slightly in 1.16 > >> // - operator migration is possible > >> // - operator supports state of both versions and will perform operator > >> state migration > >> // - new plans will get new ExecNode version > >> @ExecNodeMetadata(name=A, version=1, supportedPlanFormat=1.15, > >> supportedSavepointFormat=1.15) > >> @ExecNodeMetadata(name=A, version=2, supportedPlanFormat=1.15, > >> supportedSavepointFormat=1.16) > >> > >> // we force a plan migration in 1.17 > >> // - we assume that all operator states have been migrated in the > >> previous version > >> // - we can safely replace the old version `1` with `2` and only keep > >> the new savepoint format > >> @ExecNodeMetadata(name=A, version=2, supportedPlanFormat=1.15, > >> supportedSavepointFormat=1.16) > >> > >> > >> // plan changes > >> > >> // initial introduction in 1.15 > >> @ExecNodeMetadata(name=A, version=1, supportedPlanFormat=1.15, > >> supportedSavepointFormat=1.15) > >> > >> // JSON node gets an additional property in 1.16 > >> // e.g. { some-prop: 42 } -> { some-prop: 42, some-flag: false} > >> // - ExecNode version does not change > >> // - ExecNode version only changes when topology or state is affected > >> // - we support both JSON plan formats, the old and the newest one > >> @ExecNodeMetadata(name=A, version=1, supportedPlanFormat=[1.15, 1.16], > >> supportedSavepointFormat=1.15) > >> > >> // we force a plan migration in 1.17 > >> // - now we only support 1.16 plan format > >> @ExecNodeMetadata(name=A, version=1, supportedPlanFormat=1.16, > >> supportedSavepointFormat=1.15) > >> > >> > >> // topology change > >> > >> // initial introduction in 1.15 > >> @ExecNodeMetadata(name=A, version=1, supportedPlanFormat=1.15, > >> supportedSavepointFormat=1.15) > >> > >> // complete new class structure in 1.16 annotated with > >> @ExecNodeMetadata(name=A, version=2, supportedPlanFormat=1.15, > >> supportedSavepointFormat=1.16) > >> > >> > >> > >> What do you think? > >> > >> > >> Regards, > >> Timo > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> On 07.12.21 08:20, wenlong.lwl wrote: > >>> Maybe we can add support for this case : > >>> when an exec node is changed in 1.16, but is compatible with > >> 1.15, > >>> we can use the node of 1.16 to deserialize the plan of 1.15. > >>> By this way, we don't need to fork the code if the change is compatible, > >>> and can avoid fork code frequently. > >>> > >>> > >>> Best, > >>> Wenlong > >>> > >>> > >>> On Tue, 7 Dec 2021 at 15:08, wenlong.lwl <wenlong88....@gmail.com> > >> wrote: > >>> > >>>> hi, Timo, I would prefer to update the version every time we change the > >>>> state layer too. > >>>> > >>>> It could be possible that we change the exec node in 2 steps: > >>>> First, we add a newStateLayout because of some improvement in state, in > >>>> order to keep compatibility we may still keep the old state for the > >> first > >>>> version. We need to update the version, so that we can generate a new > >>>> version plan for the new job and keep the exec node compatible with the > >> old > >>>> version plan. > >>>> After some versions, we may remove the old version state layout and > >> clean > >>>> up the deprecated code. We still need to update the version, so that we > >> can > >>>> verify that we are compatible with the plan after the first change, but > >> not > >>>> compatible with the plan earlier. > >>>> > >>>> > >>>> Best, > >>>> Wenlong > >>>> > >>>> On Mon, 6 Dec 2021 at 21:27, Timo Walther <twal...@apache.org> wrote: > >>>> > >>>>> Hi Godfrey, > >>>>> > >>>>> > design makes thing more complex. > >>>>> > >>>>> Yes, the design might be a bit more complex. But operator migration is > >>>>> way easier than ExecNode migration at a later point in time for code > >>>>> maintenance. We know that ExecNodes can become pretty complex. Even > >>>>> though we have put a lot of code into `CommonXXExecNode` it will be a > >>>>> lot of work to maintain multiple versions of ExecNodes. If we can avoid > >>>>> this with operator state migration, this should always be preferred > >> over > >>>>> a new ExecNode version. > >>>>> > >>>>> I'm aware that operator state migration might only be important for > >>>>> roughly 10 % of all changes. A new ExecNode version will be used for > >> 90% > >>>>> of all changes. > >>>>> > >>>>> > If there are multiple state layouts, which layout the ExecNode > >> should > >>>>> use? > >>>>> > >>>>> It is not the responsibility of the ExecNode to decide this but the > >>>>> operator. Something like: > >>>>> > >>>>> class X extends ProcessFunction { > >>>>> ValueState<A> oldStateLayout; > >>>>> ValueState<B> newStateLayout; > >>>>> > >>>>> open() { > >>>>> if (oldStateLayout.get() != null) { > >>>>> performOperatorMigration(); > >>>>> } > >>>>> useNewStateLayout(); > >>>>> } > >>>>> } > >>>>> > >>>>> Operator migration is meant for smaller "more local" changes without > >>>>> touching the ExecNode layer. The CEP library and DataStream API sources > >>>>> are performing operator migration for years already. > >>>>> > >>>>> > >>>>> > `supportedPlanChanges ` and `supportedSavepointChanges ` are a bit > >>>>> obscure. > >>>>> > >>>>> Let me try to come up with more examples why I think both annotation > >>>>> make sense and are esp. important *for test coverage*. > >>>>> > >>>>> supportedPlanChanges: > >>>>> > >>>>> Let's assume we have some JSON in Flink 1.15: > >>>>> > >>>>> { > >>>>> some-prop: 42 > >>>>> } > >>>>> > >>>>> And we want to extend the JSON in Flink 1.16: > >>>>> > >>>>> { > >>>>> some-prop: 42, > >>>>> some-flag: false > >>>>> } > >>>>> > >>>>> Maybe we don't need to increase the ExecNode version but only ensure > >>>>> that the flag is set to `false` by default for the older versions. > >>>>> > >>>>> We need a location to track changes and document the changelog. With > >> the > >>>>> help of the annotation supportedPlanChanges = [1.15, 1.16] we can > >> verify > >>>>> that we have tests for both JSON formats. > >>>>> > >>>>> And once we decide to drop the 1.15 format, we enforce plan migration > >>>>> and fill-in the default value `false` into the old plans and bump their > >>>>> JSON plan version to 1.16 or higher. > >>>>> > >>>>> > >>>>> > >>>>> > once the state layout is changed, the ExecNode version needs also > >> be > >>>>> updated > >>>>> > >>>>> This will still be the majority of cases. But if we can avoid this, we > >>>>> should do it for not having too much duplicate code to maintain. > >>>>> > >>>>> > >>>>> > >>>>> Thanks, > >>>>> Timo > >>>>> > >>>>> > >>>>> On 06.12.21 09:58, godfrey he wrote: > >>>>>> Hi, Timo, > >>>>>> > >>>>>> Thanks for the detailed explanation. > >>>>>> > >>>>>>> We change an operator state of B in Flink 1.16. We perform the change > >>>>> in the operator of B in a way to support both state layouts. Thus, no > >> need > >>>>> for a new ExecNode version. > >>>>>> > >>>>>> I think this design makes thing more complex. > >>>>>> 1. If there are multiple state layouts, which layout the ExecNode > >>>>> should use ? > >>>>>> It increases the cost of understanding for developers (especially for > >>>>>> Flink newer), > >>>>>> making them prone to mistakes. > >>>>>> 2. `supportedPlanChanges ` and `supportedSavepointChanges ` are a bit > >>>>> obscure. > >>>>>> > >>>>>> The purpose of ExecNode annotations are not only to support powerful > >>>>> validation, > >>>>>> but more importantly to make it easy for developers to understand > >>>>>> to ensure that every modification is easy and state compatible. > >>>>>> > >>>>>> I prefer, once the state layout is changed, the ExecNode version needs > >>>>>> also be updated. > >>>>>> which could make thing simple. How about > >>>>>> rename `supportedPlanChanges ` to `planCompatibleVersion` > >>>>>> (which means the plan is compatible with the plan generated by the > >>>>>> given version node) > >>>>>> and rename `supportedSavepointChanges` to > >> `savepointCompatibleVersion > >>>>> ` > >>>>>> (which means the state is compatible with the state generated by the > >>>>>> given version node) ? > >>>>>> The names also indicate that only one version value can be set. > >>>>>> > >>>>>> WDYT? > >>>>>> > >>>>>> Best, > >>>>>> Godfrey > >>>>>> > >>>>>> > >>>>>> > >>>>>> > >>>>>> > >>>>>> > >>>>>> > >>>>>> > >>>>>> > >>>>>> Timo Walther <twal...@apache.org> 于2021年12月2日周四 下午11:42写道: > >>>>>>> > >>>>>>> Response to Marios's feedback: > >>>>>>> > >>>>>>> > there should be some good logging in place when the upgrade is > >>>>> taking > >>>>>>> place > >>>>>>> > >>>>>>> Yes, I agree. I added this part to the FLIP. > >>>>>>> > >>>>>>> > config option instead that doesn't provide the flexibility to > >>>>>>> overwrite certain plans > >>>>>>> > >>>>>>> One can set the config option also around sections of the > >>>>>>> multi-statement SQL script. > >>>>>>> > >>>>>>> SET 'table.plan.force-recompile'='true'; > >>>>>>> > >>>>>>> COMPILE ... > >>>>>>> > >>>>>>> SET 'table.plan.force-recompile'='false'; > >>>>>>> > >>>>>>> But the question is why a user wants to run COMPILE multiple times. > >> If > >>>>>>> it is during development, then running EXECUTE (or just the statement > >>>>>>> itself) without calling COMPILE should be sufficient. The file can > >> also > >>>>>>> manually be deleted if necessary. > >>>>>>> > >>>>>>> What do you think? > >>>>>>> > >>>>>>> Regards, > >>>>>>> Timo > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> On 02.12.21 16:09, Timo Walther wrote: > >>>>>>>> Hi Till, > >>>>>>>> > >>>>>>>> Yes, you might have to. But not a new plan from the SQL query but a > >>>>>>>> migration from the old plan to the new plan. This will not happen > >>>>> often. > >>>>>>>> But we need a way to evolve the format of the JSON plan itself. > >>>>>>>> > >>>>>>>> Maybe this confuses a bit, so let me clarify it again: Mostly > >> ExecNode > >>>>>>>> versions and operator state layouts will evolve. Not the plan files, > >>>>>>>> those will be pretty stable. But also not infinitely. > >>>>>>>> > >>>>>>>> Regards, > >>>>>>>> Timo > >>>>>>>> > >>>>>>>> > >>>>>>>> On 02.12.21 16:01, Till Rohrmann wrote: > >>>>>>>>> Then for migrating from Flink 1.10 to 1.12, I might have to create > >> a > >>>>> new > >>>>>>>>> plan using Flink 1.11 in order to migrate from Flink 1.11 to 1.12, > >>>>> right? > >>>>>>>>> > >>>>>>>>> Cheers, > >>>>>>>>> Till > >>>>>>>>> > >>>>>>>>> On Thu, Dec 2, 2021 at 3:39 PM Timo Walther <twal...@apache.org> > >>>>> wrote: > >>>>>>>>> > >>>>>>>>>> Response to Till's feedback: > >>>>>>>>>> > >>>>>>>>>> > compiled plan won't be changed after being written initially > >>>>>>>>>> > >>>>>>>>>> This is not entirely correct. We give guarantees for keeping the > >>>>> query > >>>>>>>>>> up and running. We reserve us the right to force plan migrations. > >> In > >>>>>>>>>> this case, the plan might not be created from the SQL statement > >> but > >>>>> from > >>>>>>>>>> the old plan. I have added an example in section 10.1.1. In > >> general, > >>>>>>>>>> both persisted entities "plan" and "savepoint" can evolve > >>>>> independently > >>>>>>>>>> from each other. > >>>>>>>>>> > >>>>>>>>>> Thanks, > >>>>>>>>>> Timo > >>>>>>>>>> > >>>>>>>>>> On 02.12.21 15:10, Timo Walther wrote: > >>>>>>>>>>> Response to Godfrey's feedback: > >>>>>>>>>>> > >>>>>>>>>>> > "EXPLAIN PLAN EXECUTE STATEMENT SET BEGIN ... END" is > >> missing. > >>>>>>>>>>> > >>>>>>>>>>> Thanks for the hint. I added a dedicated section 7.1.3. > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > it's hard to maintain the supported versions for > >>>>>>>>>>> "supportedPlanChanges" and "supportedSavepointChanges" > >>>>>>>>>>> > >>>>>>>>>>> Actually, I think we are mostly on the same page. > >>>>>>>>>>> > >>>>>>>>>>> The annotation does not need to be updated for every Flink > >>>>> version. As > >>>>>>>>>>> the name suggests it is about "Changes" (in other words: > >>>>>>>>>>> incompatibilities) that require some kind of migration. Either > >> plan > >>>>>>>>>>> migration (= PlanChanges) or savepoint migration > >>>>> (=SavepointChanges, > >>>>>>>>>>> using operator migration or savepoint migration). > >>>>>>>>>>> > >>>>>>>>>>> Let's assume we introduced two ExecNodes A and B in Flink 1.15. > >>>>>>>>>>> > >>>>>>>>>>> The annotations are: > >>>>>>>>>>> > >>>>>>>>>>> @ExecNodeMetadata(name=A, supportedPlanChanges=1.15, > >>>>>>>>>>> supportedSavepointChanges=1.15) > >>>>>>>>>>> > >>>>>>>>>>> @ExecNodeMetadata(name=B, supportedPlanChanges=1.15, > >>>>>>>>>>> supportedSavepointChanges=1.15) > >>>>>>>>>>> > >>>>>>>>>>> We change an operator state of B in Flink 1.16. > >>>>>>>>>>> > >>>>>>>>>>> We perform the change in the operator of B in a way to support > >> both > >>>>>>>>>>> state layouts. Thus, no need for a new ExecNode version. > >>>>>>>>>>> > >>>>>>>>>>> The annotations in 1.16 are: > >>>>>>>>>>> > >>>>>>>>>>> @ExecNodeMetadata(name=A, supportedPlanChanges=1.15, > >>>>>>>>>>> supportedSavepointChanges=1.15) > >>>>>>>>>>> > >>>>>>>>>>> @ExecNodeMetadata(name=B, supportedPlanChanges=1.15, > >>>>>>>>>>> supportedSavepointChanges=1.15, 1.16) > >>>>>>>>>>> > >>>>>>>>>>> So the versions in the annotations are "start version"s. > >>>>>>>>>>> > >>>>>>>>>>> I don't think we need end versions? End version would mean that > >> we > >>>>> drop > >>>>>>>>>>> the ExecNode from the code base? > >>>>>>>>>>> > >>>>>>>>>>> Please check the section 10.1.1 again. I added a more complex > >>>>> example. > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> Thanks, > >>>>>>>>>>> Timo > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> On 01.12.21 16:29, Timo Walther wrote: > >>>>>>>>>>>> Response to Francesco's feedback: > >>>>>>>>>>>> > >>>>>>>>>>>> > *Proposed changes #6*: Other than defining this rule of > >>>>> thumb, we > >>>>>>>>>>>> must also make sure that compiling plans with these objects that > >>>>>>>>>>>> cannot be serialized in the plan must fail hard > >>>>>>>>>>>> > >>>>>>>>>>>> Yes, I totally agree. We will fail hard with a helpful > >> exception. > >>>>> Any > >>>>>>>>>>>> mistake e.g. using a inline object in Table API or an invalid > >>>>>>>>>>>> DataStream API source without uid should immediately fail a plan > >>>>>>>>>>>> compilation step. I added a remark to the FLIP again. > >>>>>>>>>>>> > >>>>>>>>>>>> > What worries me is breaking changes, in particular > >>>>> behavioural > >>>>>>>>>>>> changes that might happen in connectors/formats > >>>>>>>>>>>> > >>>>>>>>>>>> Breaking changes in connectors and formats need to be encoded in > >>>>> the > >>>>>>>>>>>> options. I could also imagine to versioning in the factory > >>>>> identifier > >>>>>>>>>>>> `connector=kafka` and `connector=kafka-2`. If this is necessary. > >>>>>>>>>>>> > >>>>>>>>>>>> After thinking about your question again, I think we will also > >>>>> need > >>>>>>>>>>>> the same testing infrastructure for our connectors and formats. > >>>>> Esp. > >>>>>>>>>>>> restore tests and completeness test. I updated the document > >>>>>>>>>>>> accordingly. Also I added a way to generate UIDs for DataStream > >>>>> API > >>>>>>>>>>>> providers. > >>>>>>>>>>>> > >>>>>>>>>>>> > *Functions:* Are we talking about the function name or the > >>>>>>>>>>>> function > >>>>>>>>>>>> complete signature? > >>>>>>>>>>>> > >>>>>>>>>>>> For catalog functions, the identifier contains catalog name and > >>>>>>>>>>>> database name. For system functions, identifier contains only a > >>>>> name > >>>>>>>>>>>> which make function name and identifier identical. I reworked > >> the > >>>>>>>>>>>> section again and also fixed some of the naming conflicts you > >>>>>>>>>>>> mentioned. > >>>>>>>>>>>> > >>>>>>>>>>>> > we should perhaps use a logically defined unique id like > >>>>>>>>>>>> /bigIntToTimestamp/ > >>>>>>>>>>>> > >>>>>>>>>>>> I added a concrete example for the resolution and restoration. > >> The > >>>>>>>>>>>> unique id is composed of name + version. Internally, this is > >>>>>>>>>>>> represented as `$TO_TIMESTAMP_LTZ$1`. > >>>>>>>>>>>> > >>>>>>>>>>>> > I think we should rather keep JSON out of the concept > >>>>>>>>>>>> > >>>>>>>>>>>> Sounds ok to me. In SQL we also just call it "plan". I will > >>>>> change the > >>>>>>>>>>>> file sections. But would suggest to keep the fromJsonString > >>>>> method. > >>>>>>>>>>>> > >>>>>>>>>>>> > write it back in the original plan file > >>>>>>>>>>>> > >>>>>>>>>>>> I updated the terminology section for what we consider an > >>>>> "upgrade". > >>>>>>>>>>>> We might need to update the orginal plan file. This is already > >>>>>>>>>>>> considered in the COMPILE PLAN ... FROM ... even though this is > >>>>> future > >>>>>>>>>>>> work. Also savepoint migration. > >>>>>>>>>>>> > >>>>>>>>>>>> Thanks for all the feedback! > >>>>>>>>>>>> > >>>>>>>>>>>> Timo > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> On 30.11.21 14:28, Timo Walther wrote: > >>>>>>>>>>>>> Response to Wenlongs's feedback: > >>>>>>>>>>>>> > >>>>>>>>>>>>> > I would prefer not to provide such a shortcut, let users > >> use > >>>>>>>>>>>>> COMPILE PLAN IF NOT EXISTS and EXECUTE explicitly, which can be > >>>>>>>>>>>>> understood by new users even without inferring the docs. > >>>>>>>>>>>>> > >>>>>>>>>>>>> I would like to hear more opinions on this topic. Personally, I > >>>>> find > >>>>>>>>>>>>> a combined statement very useful. Not only for quicker > >>>>> development > >>>>>>>>>>>>> and debugging but also for readability. It helps in keeping the > >>>>> JSON > >>>>>>>>>>>>> path and the query close to each other in order to know the > >>>>> origin of > >>>>>>>>>>>>> the plan. > >>>>>>>>>>>>> > >>>>>>>>>>>>> > but the plan and SQL are not matched. The result would be > >>>>> quite > >>>>>>>>>>>>> confusing if we still execute the plan directly, we may need to > >>>>> add a > >>>>>>>>>>>>> validation. > >>>>>>>>>>>>> > >>>>>>>>>>>>> You are right that there could be a mismatch. But we have a > >>>>> similar > >>>>>>>>>>>>> problem when executing CREATE TABLE IF NOT EXISTS. The schema > >> or > >>>>>>>>>>>>> options of a table could have changed completely in the catalog > >>>>> but > >>>>>>>>>>>>> the CREATE TABLE IF NOT EXISTS is not executed again. So a > >>>>> mismatch > >>>>>>>>>>>>> could also occur there. > >>>>>>>>>>>>> > >>>>>>>>>>>>> Regards, > >>>>>>>>>>>>> Timo > >>>>>>>>>>>>> > >>>>>>>>>>>>> On 30.11.21 14:17, Timo Walther wrote: > >>>>>>>>>>>>>> Hi everyone, > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> thanks for the feedback so far. Let me answer each email > >>>>>>>>>>>>>> indvidually. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> I will start with a response to Ingo's feedback: > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > Will the JSON plan's schema be considered an API? > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> No, not in the first version. This is explicitly mentioned in > >>>>> the > >>>>>>>>>>>>>> `General JSON Plan Assumptions`. I tried to improve the > >> section > >>>>> once > >>>>>>>>>>>>>> more to make it clearer. However, the JSON plan is definitely > >>>>> stable > >>>>>>>>>>>>>> per minor version. And since the plan is versioned by Flink > >>>>> version, > >>>>>>>>>>>>>> external tooling could be build around it. We might make it > >>>>> public > >>>>>>>>>>>>>> API once the design has settled. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > Given that upgrades across multiple versions at once are > >>>>>>>>>>>>>> unsupported, do we verify this somehow? > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Good question. I extended the `General JSON Plan Assumptions`. > >>>>> Now > >>>>>>>>>>>>>> yes: the Flink version is part of the JSON plan and will be > >>>>> verified > >>>>>>>>>>>>>> during restore. But keep in mind that we might support more > >> that > >>>>>>>>>>>>>> just the last version at least until the JSON plan has been > >>>>>>>>>>>>>> migrated. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Regards, > >>>>>>>>>>>>>> Timo > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> On 30.11.21 09:39, Marios Trivyzas wrote: > >>>>>>>>>>>>>>> I have a question regarding the `COMPILE PLAN OVEWRITE`. If > >> we > >>>>>>>>>>>>>>> choose to go > >>>>>>>>>>>>>>> with the config option instead, > >>>>>>>>>>>>>>> that doesn't provide the flexibility to overwrite certain > >>>>> plans but > >>>>>>>>>>>>>>> not > >>>>>>>>>>>>>>> others, since the config applies globally, isn't that > >>>>>>>>>>>>>>> something to consider? > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> On Mon, Nov 29, 2021 at 10:15 AM Marios Trivyzas < > >>>>> mat...@gmail.com> > >>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Hi Timo! > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Thanks a lot for taking all that time and effort to put > >>>>> together > >>>>>>>>>> this > >>>>>>>>>>>>>>>> proposal! > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Regarding: > >>>>>>>>>>>>>>>>> For simplification of the design, we assume that upgrades > >>>>> use a > >>>>>>>>>>>>>>>>> step size > >>>>>>>>>>>>>>>> of a single > >>>>>>>>>>>>>>>> minor version. We don't guarantee skipping minor versions > >>>>> (e.g. > >>>>>>>>>>>>>>>> 1.11 to > >>>>>>>>>>>>>>>> 1.14). > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> I think that for this first step we should make it > >> absolutely > >>>>>>>>>>>>>>>> clear to the > >>>>>>>>>>>>>>>> users that they would need to go through all intermediate > >>>>> versions > >>>>>>>>>>>>>>>> to end up with the target version they wish. If we are to > >>>>> support > >>>>>>>>>>>>>>>> skipping > >>>>>>>>>>>>>>>> versions in the future, i.e. upgrade from 1.14 to 1.17, this > >>>>> means > >>>>>>>>>>>>>>>> that we need to have a testing infrastructure in place that > >>>>> would > >>>>>>>>>>>>>>>> test all > >>>>>>>>>>>>>>>> possible combinations of version upgrades, i.e. from 1.14 to > >>>>> 1.15, > >>>>>>>>>>>>>>>> from 1.14 to 1.16 and so forth, while still testing and of > >>>>> course > >>>>>>>>>>>>>>>> supporting all the upgrades from the previous minor version. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> I like a lot the idea of introducing HINTS to define some > >>>>>>>>>>>>>>>> behaviour in the > >>>>>>>>>>>>>>>> programs! > >>>>>>>>>>>>>>>> - the hints live together with the sql statements and > >>>>> consequently > >>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>> (JSON) plans. > >>>>>>>>>>>>>>>> - If multiple queries are involved in a program, each one of > >>>>> them > >>>>>>>>>> can > >>>>>>>>>>>>>>>> define its own config (regarding plan optimisation, not null > >>>>>>>>>>>>>>>> enforcement, > >>>>>>>>>>>>>>>> etc) > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> I agree with Francesco on his argument regarding the *JSON* > >>>>>>>>>>>>>>>> plan. I > >>>>>>>>>>>>>>>> believe we should already provide flexibility here, since > >> (who > >>>>>>>>>>>>>>>> knows) in > >>>>>>>>>>>>>>>> the future > >>>>>>>>>>>>>>>> a JSON plan might not fulfil the desired functionality. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> I also agree that we need some very obvious way (i.e. not > >> log > >>>>>>>>>>>>>>>> entry) to > >>>>>>>>>>>>>>>> show the users that their program doesn't support version > >>>>>>>>>>>>>>>> upgrades, and > >>>>>>>>>>>>>>>> prevent them from being negatively surprised in the future, > >>>>> when > >>>>>>>>>>>>>>>> trying to > >>>>>>>>>>>>>>>> upgrade their production pipelines. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> This is an implementation detail, but I'd like to add that > >>>>> there > >>>>>>>>>>>>>>>> should be > >>>>>>>>>>>>>>>> some good logging in place when the upgrade is taking place, > >>>>> to be > >>>>>>>>>>>>>>>> able to track every restoration action, and help debug any > >>>>>>>>>>>>>>>> potential > >>>>>>>>>>>>>>>> issues arising from that. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> On Fri, Nov 26, 2021 at 2:54 PM Till Rohrmann > >>>>>>>>>>>>>>>> <trohrm...@apache.org > >>>>>>>>>>> > >>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Thanks for writing this FLIP Timo. I think this will be a > >>>>> very > >>>>>>>>>>>>>>>>> important > >>>>>>>>>>>>>>>>> improvement for Flink and our SQL user :-) > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Similar to Francesco I would like to understand the > >> statement > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> For simplification of the design, we assume that upgrades > >>>>> use a > >>>>>>>>>>>>>>>>>> step > >>>>>>>>>>>>>>>>> size > >>>>>>>>>>>>>>>>> of a single > >>>>>>>>>>>>>>>>> minor version. We don't guarantee skipping minor versions > >>>>> (e.g. > >>>>>>>>>>>>>>>>> 1.11 to > >>>>>>>>>>>>>>>>> 1.14). > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> a bit better. Is it because Flink does not guarantee that a > >>>>>>>>>>>>>>>>> savepoint > >>>>>>>>>>>>>>>>> created by version 1.x can be directly recovered by version > >>>>> 1.y > >>>>>>>>>>>>>>>>> with x + 1 > >>>>>>>>>>>>>>>>> < y but users might have to go through a cascade of > >> upgrades? > >>>>>>>>>>>>>>>>> From how I > >>>>>>>>>>>>>>>>> understand your proposal, the compiled plan won't be > >> changed > >>>>>>>>>>>>>>>>> after being > >>>>>>>>>>>>>>>>> written initially. Hence, I would assume that for the plan > >>>>> alone > >>>>>>>>>>>>>>>>> Flink > >>>>>>>>>>>>>>>>> will > >>>>>>>>>>>>>>>>> have to give backwards compatibility guarantees for all > >>>>> versions. > >>>>>>>>>>>>>>>>> Am I > >>>>>>>>>>>>>>>>> understanding this part correctly? > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Cheers, > >>>>>>>>>>>>>>>>> Till > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> On Thu, Nov 25, 2021 at 4:55 PM Francesco Guardiani < > >>>>>>>>>>>>>>>>> france...@ververica.com> > >>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> Hi Timo, > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> Thanks for putting this amazing work together, I have some > >>>>>>>>>>>>>>>>>> considerations/questions > >>>>>>>>>>>>>>>>>> about the FLIP: > >>>>>>>>>>>>>>>>>> *Proposed changes #6*: Other than defining this rule of > >>>>> thumb, > >>>>>>>>>>>>>>>>>> we must > >>>>>>>>>>>>>>>>>> also make sure > >>>>>>>>>>>>>>>>>> that compiling plans with these objects that cannot be > >>>>>>>>>>>>>>>>>> serialized in the > >>>>>>>>>>>>>>>>>> plan must fail hard, > >>>>>>>>>>>>>>>>>> so users don't bite themselves with such issues, or at > >>>>> least we > >>>>>>>>>>>>>>>>>> need to > >>>>>>>>>>>>>>>>>> output warning > >>>>>>>>>>>>>>>>>> logs. In general, whenever the user is trying to use the > >>>>>>>>>>>>>>>>>> CompiledPlan > >>>>>>>>>>>>>>>>> APIs > >>>>>>>>>>>>>>>>>> and at the same > >>>>>>>>>>>>>>>>>> time, they're trying to do something "illegal" for the > >>>>> plan, we > >>>>>>>>>>>>>>>>>> should > >>>>>>>>>>>>>>>>>> immediately either > >>>>>>>>>>>>>>>>>> log or fail depending on the issue, in order to avoid any > >>>>>>>>>>>>>>>>>> surprises once > >>>>>>>>>>>>>>>>>> the user upgrades. > >>>>>>>>>>>>>>>>>> I would also say the same for things like registering a > >>>>>>>>>>>>>>>>>> function, > >>>>>>>>>>>>>>>>>> registering a DataStream, > >>>>>>>>>>>>>>>>>> and for every other thing which won't end up in the plan, > >> we > >>>>>>>>>>>>>>>>>> should log > >>>>>>>>>>>>>>>>>> such info to the > >>>>>>>>>>>>>>>>>> user by default. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> *General JSON Plan Assumptions #9:* When thinking to > >>>>> connectors > >>>>>>>>>> and > >>>>>>>>>>>>>>>>>> formats, I think > >>>>>>>>>>>>>>>>>> it's reasonable to assume and keep out of the feature > >> design > >>>>>>>>>>>>>>>>>> that no > >>>>>>>>>>>>>>>>>> feature/ability can > >>>>>>>>>>>>>>>>>> deleted from a connector/format. I also don't think new > >>>>>>>>>>>>>>>>> features/abilities > >>>>>>>>>>>>>>>>>> can influence > >>>>>>>>>>>>>>>>>> this FLIP as well, given the plan is static, so if for > >>>>> example, > >>>>>>>>>>>>>>>>>> MyCoolTableSink in the next > >>>>>>>>>>>>>>>>>> flink version implements SupportsProjectionsPushDown, then > >>>>> it > >>>>>>>>>>>>>>>>>> shouldn't > >>>>>>>>>>>>>>>>> be > >>>>>>>>>>>>>>>>>> a problem > >>>>>>>>>>>>>>>>>> for the upgrade story since the plan is still configured > >> as > >>>>>>>>>>>>>>>>>> computed > >>>>>>>>>>>>>>>>> from > >>>>>>>>>>>>>>>>>> the previous flink > >>>>>>>>>>>>>>>>>> version. What worries me is breaking changes, in > >> particular > >>>>>>>>>>>>>>>>>> behavioural > >>>>>>>>>>>>>>>>>> changes that > >>>>>>>>>>>>>>>>>> might happen in connectors/formats. Although this argument > >>>>>>>>>>>>>>>>>> doesn't seem > >>>>>>>>>>>>>>>>>> relevant for > >>>>>>>>>>>>>>>>>> the connectors shipped by the flink project itself, > >> because > >>>>> we > >>>>>>>>>>>>>>>>>> try to > >>>>>>>>>>>>>>>>> keep > >>>>>>>>>>>>>>>>>> them as stable as > >>>>>>>>>>>>>>>>>> possible and avoid eventual breaking changes, it's > >>>>> compelling to > >>>>>>>>>>>>>>>>> external > >>>>>>>>>>>>>>>>>> connectors and > >>>>>>>>>>>>>>>>>> formats, which might be decoupled from the flink release > >>>>> cycle > >>>>>>>>>>>>>>>>>> and might > >>>>>>>>>>>>>>>>>> have different > >>>>>>>>>>>>>>>>>> backward compatibility guarantees. It's totally reasonable > >>>>> if we > >>>>>>>>>>>>>>>>>> don't > >>>>>>>>>>>>>>>>>> want to tackle it in > >>>>>>>>>>>>>>>>>> this first iteration of the feature, but it's something we > >>>>> need > >>>>>>>>>>>>>>>>>> to keep > >>>>>>>>>>>>>>>>> in > >>>>>>>>>>>>>>>>>> mind for the future. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> *Functions:* It's not clear to me what you mean for > >>>>>>>>>>>>>>>>>> "identifier", > >>>>>>>>>>>>>>>>> because > >>>>>>>>>>>>>>>>>> then somewhere > >>>>>>>>>>>>>>>>>> else in the same context you talk about "name". Are we > >>>>> talking > >>>>>>>>>>>>>>>>>> about the > >>>>>>>>>>>>>>>>>> function name > >>>>>>>>>>>>>>>>>> or the function complete signature? Let's assume for > >>>>> example we > >>>>>>>>>>>>>>>>>> have > >>>>>>>>>>>>>>>>> these > >>>>>>>>>>>>>>>>>> function > >>>>>>>>>>>>>>>>>> definitions: > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> * TO_TIMESTAMP_LTZ(BIGINT) > >>>>>>>>>>>>>>>>>> * TO_TIMESTAMP_LTZ(STRING) > >>>>>>>>>>>>>>>>>> * TO_TIMESTAMP_LTZ(STRING, STRING) > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> These for me are very different functions with different > >>>>>>>>>>>>>>>>> implementations, > >>>>>>>>>>>>>>>>>> where each of > >>>>>>>>>>>>>>>>>> them might evolve separately at a different pace. Hence > >>>>> when we > >>>>>>>>>>>>>>>>>> store > >>>>>>>>>>>>>>>>> them > >>>>>>>>>>>>>>>>>> in the json > >>>>>>>>>>>>>>>>>> plan we should perhaps use a logically defined unique id > >>>>> like > >>>>>>>>>>>>>>>>>> /bigIntToTimestamp/, / > >>>>>>>>>>>>>>>>>> stringToTimestamp/ and /stringToTimestampWithFormat/. This > >>>>> also > >>>>>>>>>>>>>>>>>> solves > >>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>> issue of > >>>>>>>>>>>>>>>>>> correctly referencing the functions when restoring the > >> plan, > >>>>>>>>>>>>>>>>>> without > >>>>>>>>>>>>>>>>>> running again the > >>>>>>>>>>>>>>>>>> inference logic (which might have been changed in the > >>>>> meantime) > >>>>>>>>>>>>>>>>>> and it > >>>>>>>>>>>>>>>>>> might also solve > >>>>>>>>>>>>>>>>>> the versioning, that is the function identifier can > >> contain > >>>>> the > >>>>>>>>>>>>>>>>>> function > >>>>>>>>>>>>>>>>>> version like / > >>>>>>>>>>>>>>>>>> stringToTimestampWithFormat_1_1 /or > >>>>>>>>>>>>>>>>>> /stringToTimestampWithFormat_1_2/. > >>>>>>>>>>>>>>>>> An > >>>>>>>>>>>>>>>>>> alternative could be to use the string signature > >>>>> representation, > >>>>>>>>>>>>>>>>>> which > >>>>>>>>>>>>>>>>>> might not be trivial > >>>>>>>>>>>>>>>>>> to compute, given the complexity of our type inference > >>>>> logic. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> *The term "JSON plan"*: I think we should rather keep JSON > >>>>> out > >>>>>>>>>>>>>>>>>> of the > >>>>>>>>>>>>>>>>>> concept and just > >>>>>>>>>>>>>>>>>> name it "Compiled Plan" (like the proposed API) or > >> something > >>>>>>>>>>>>>>>>>> similar, > >>>>>>>>>>>>>>>>> as I > >>>>>>>>>>>>>>>>>> see how in > >>>>>>>>>>>>>>>>>> future we might decide to support/modify our persistence > >>>>>>>>>>>>>>>>>> format to > >>>>>>>>>>>>>>>>>> something more > >>>>>>>>>>>>>>>>>> efficient storage wise like BSON. For example, I would > >>>>> rename / > >>>>>>>>>>>>>>>>>> CompiledPlan.fromJsonFile/ to simply > >>>>> /CompiledPlan.fromFile/. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> *Who is the owner of the plan file?* I asked myself this > >>>>>>>>>>>>>>>>>> question when > >>>>>>>>>>>>>>>>>> reading this: > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> For simplification of the design, we assume that upgrades > >>>>> use a > >>>>>>>>>>>>>>>>>>> step > >>>>>>>>>>>>>>>>>> size of a single > >>>>>>>>>>>>>>>>>> minor version. We don't guarantee skipping minor versions > >>>>> (e.g. > >>>>>>>>>>>>>>>>>> 1.11 to > >>>>>>>>>>>>>>>>>> 1.14). > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> My understanding of this statement is that a user can > >>>>> upgrade > >>>>>>>>>>>>>>>>>> between > >>>>>>>>>>>>>>>>>> minors but then > >>>>>>>>>>>>>>>>>> following all the minors, the same query can remain up and > >>>>>>>>>> running. > >>>>>>>>>>>>>>>>> E.g. I > >>>>>>>>>>>>>>>>>> upgrade from > >>>>>>>>>>>>>>>>>> 1.15 to 1.16, and then from 1.16 to 1.17 and I still > >> expect > >>>>> my > >>>>>>>>>>>>>>>>>> original > >>>>>>>>>>>>>>>>>> query to work > >>>>>>>>>>>>>>>>>> without recomputing the plan. This necessarily means that > >> at > >>>>>>>>>>>>>>>>>> some point > >>>>>>>>>>>>>>>>> in > >>>>>>>>>>>>>>>>>> future > >>>>>>>>>>>>>>>>>> releases we'll need some basic "migration" tool to keep > >> the > >>>>>>>>>>>>>>>>>> queries up > >>>>>>>>>>>>>>>>> and > >>>>>>>>>>>>>>>>>> running, > >>>>>>>>>>>>>>>>>> ending up modifying the compiled plan. So I guess flink > >>>>> should > >>>>>>>>>>>>>>>>>> write it > >>>>>>>>>>>>>>>>>> back in the original > >>>>>>>>>>>>>>>>>> plan file, perhaps doing a backup of the previous one? Can > >>>>> you > >>>>>>>>>>>>>>>>>> please > >>>>>>>>>>>>>>>>>> clarify this aspect? > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> Except these considerations, the proposal looks good to me > >>>>>>>>>>>>>>>>>> and I'm > >>>>>>>>>>>>>>>>> eagerly > >>>>>>>>>>>>>>>>>> waiting to see > >>>>>>>>>>>>>>>>>> it in play. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> Thanks, > >>>>>>>>>>>>>>>>>> FG > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> -- > >>>>>>>>>>>>>>>>>> Francesco Guardiani | Software Engineer > >>>>>>>>>>>>>>>>>> france...@ververica.com[1] > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> Follow us @VervericaData > >>>>>>>>>>>>>>>>>> -- > >>>>>>>>>>>>>>>>>> Join Flink Forward[2] - The Apache Flink Conference > >>>>>>>>>>>>>>>>>> Stream Processing | Event Driven | Real Time > >>>>>>>>>>>>>>>>>> -- > >>>>>>>>>>>>>>>>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, > >> Germany > >>>>>>>>>>>>>>>>>> -- > >>>>>>>>>>>>>>>>>> Ververica GmbH > >>>>>>>>>>>>>>>>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B > >>>>>>>>>>>>>>>>>> Managing Directors: Karl Anton Wehner, Holger Temme, Yip > >>>>> Park > >>>>>>>>>>>>>>>>>> Tung > >>>>>>>>>>>>>>>>> Jason, > >>>>>>>>>>>>>>>>>> Jinwei (Kevin) > >>>>>>>>>>>>>>>>>> Zhang > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> -------- > >>>>>>>>>>>>>>>>>> [1] mailto:france...@ververica.com > >>>>>>>>>>>>>>>>>> [2] https://flink-forward.org/ > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> -- > >>>>>>>>>>>>>>>> Marios > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>> > >>>>> > >>>>> > >>> > >> > >> > > >