hi, Timo, thanks for the explanation. I totally agree with what you said.
My actual question is: Will the version of an exec node be serialised in
the Json Plan? In my understanding, it is not in the former design. If it
is yes, my question is solved already.


Best,
Wenlong


On Fri, 10 Dec 2021 at 18:15, Timo Walther <twal...@apache.org> wrote:

> Hi Wenlong,
>
> also thought about adding a `flinkVersion` field per ExecNode. But this
> is not necessary, because the `version` of the ExecNode has the same
> purpose.
>
> The plan version just encodes that:
> "plan has been updated in Flink 1.17" / "plan is entirely valid for
> Flink 1.17"
>
> The ExecNode version maps to `minStateVersion` to verify state
> compatibility.
>
> So even if the plan version is 1.17, some ExecNodes use state layout of
> 1.15.
>
> It is totally fine to only update the ExecNode to version 2 and not 3 in
> your example.
>
> Regards,
> Timo
>
>
>
> On 10.12.21 06:02, wenlong.lwl wrote:
> > Hi, Timo, thanks for updating the doc.
> >
> > I have a comment on plan migration:
> > I think we may need to add a version field for every exec node when
> > serialising. In earlier discussions, I think we have a conclusion that
> > treating the version of plan as the version of node, but in this case it
> > would be broken.
> > Take the following example in FLIP into consideration, there is a bad
> case:
> > when in 1.17, we introduced an incompatible version 3 and dropped version
> > 1, we can only update the version to 2, so the version should be per exec
> > node.
> >
> > ExecNode version *1* is not supported anymore. Even though the state is
> > actually compatible. The plan restore will fail with a helpful exception
> > that forces users to perform plan migration.
> >
> > COMPILE PLAN '/mydir/plan_new.json' FROM '/mydir/plan_old.json';
> >
> > The plan migration will safely replace the old version *1* with *2. The
> > JSON plan flinkVersion changes to 1.17.*
> >
> >
> > Best,
> >
> > Wenlong
> >
> > On Thu, 9 Dec 2021 at 18:36, Timo Walther <twal...@apache.org> wrote:
> >
> >> Hi Jing and Godfrey,
> >>
> >> I had another iteration over the document. There are two major changes:
> >>
> >> 1. Supported Flink Upgrade Versions
> >>
> >> I got the feedback via various channels that a step size of one minor
> >> version is not very convenient. As you said, "because upgrading to a new
> >> version is a time-consuming process". I rephrased this section:
> >>
> >> Upgrading usually involves work which is why many users perform this
> >> task rarely (e.g. only once per year). Also skipping a versions is
> >> common until a new feature has been introduced for which is it worth to
> >> upgrade. We will support the upgrade to the most recent Flink version
> >> from a set of previous versions. We aim to support upgrades from the
> >> last 2-3 releases on a best-effort basis; maybe even more depending on
> >> the maintenance overhead. However, in order to not grow the testing
> >> matrix infinitely and to perform important refactoring if necessary, we
> >> only guarantee upgrades with a step size of a single minor version (i.e.
> >> a cascade of upgrades).
> >>
> >> 2. Annotation Design
> >>
> >> I also adopted the multiple annotations design for the previous
> >> supportPlanFormat. So no array of versions anymore. I reworked the
> >> section, please have a look with updated examples:
> >>
> >>
> >>
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=191336489#FLIP190:SupportVersionUpgradesforTableAPI&SQLPrograms-ExecNodeTests
> >>
> >> I also got the feedback offline that `savepoint` might not be the right
> >> terminology for the annotation. I changed that to minPlanVersion and
> >> minStateVersion.
> >>
> >> Let me know what you think.
> >>
> >> Regards,
> >> Timo
> >>
> >>
> >>
> >> On 09.12.21 08:44, Jing Zhang wrote:
> >>> Hi Timo,
> >>> Thanks a lot for driving this discussion.
> >>> I believe it could solve many problems what we are suffering in
> >> upgrading.
> >>>
> >>> I only have a little complain on the following point.
> >>>
> >>>> For simplification of the design, we assume that upgrades use a step
> >> size
> >>> of a single minor version. We don't guarantee skipping minor versions
> >> (e.g.
> >>> 1.11 to
> >>> 1.14).
> >>>
> >>> In our internal production environment, we follow up with the
> community's
> >>> latest stable release version almost once a year because upgrading to a
> >> new
> >>> version is a time-consuming process.
> >>> So we might missed 1~3 version after we upgrade to the latest version.
> >> This
> >>> might also appears in other company too.
> >>> Could we guarantee FLIP-190 work if we skip minor versions less than
> >>> specified threshold?
> >>> Then we could know which version is good for us when prepare upgrading.
> >>>
> >>> Best,
> >>> Jing Zhang
> >>>
> >>> godfrey he <godfre...@gmail.com> 于2021年12月8日周三 22:16写道:
> >>>
> >>>> Hi Timo,
> >>>>
> >>>> Thanks for the explanation, it's much clearer now.
> >>>>
> >>>> One thing I want to confirm about `supportedPlanFormat `
> >>>> and `supportedSavepointFormat `:
> >>>> `supportedPlanFormat ` supports multiple versions,
> >>>> while `supportedSavepointFormat ` supports only one version ?
> >>>> A json plan  can be deserialized by multiple versions
> >>>> because default value will be set for new fields.
> >>>> In theory, a Savepoint can be restored by more than one version
> >>>> of the operators even if a state layout is changed,
> >>>> such as deleting a whole state and starting job with
> >>>> `allowNonRestoredState`=true.
> >>>> I think this is a corner case, and it's hard to understand comparing
> >>>> to `supportedPlanFormat ` supporting multiple versions.
> >>>> So, for most cases, when the state layout is changed, the savepoint is
> >>>> incompatible,
> >>>> and `supportedSavepointFormat` and version need to be changed.
> >>>>
> >>>> I think we need a detail explanation about the annotations change
> story
> >> in
> >>>> the java doc of  `ExecNodeMetadata` class for all developers
> >>>> (esp. those unfamiliar with this part).
> >>>>
> >>>> Best,
> >>>> Godfrey
> >>>>
> >>>> Timo Walther <twal...@apache.org> 于2021年12月8日周三 下午4:57写道:
> >>>>>
> >>>>> Hi Wenlong,
> >>>>>
> >>>>> thanks for the feedback. Great that we reached consensus here. I will
> >>>>> update the entire document with my previous example shortly.
> >>>>>
> >>>>>    > if we don't update the version when plan format changes, we
> can't
> >>>>> find that the plan can't not be deserialized in 1.15
> >>>>>
> >>>>> This should not be a problem as the entire plan file has a version as
> >>>>> well. We should not allow reading a 1.16 plan in 1.15. We can throw a
> >>>>> helpful exception early.
> >>>>>
> >>>>> Reading a 1.15 plan in 1.16 is possible until we drop the old
> >>>>> `supportedPlanFormat` from one of used ExecNodes. Afterwards all
> >>>>> `supportedPlanFormat` of ExecNodes must be equal or higher then the
> >> plan
> >>>>> version.
> >>>>>
> >>>>> Regards,
> >>>>> Timo
> >>>>>
> >>>>> On 08.12.21 03:07, wenlong.lwl wrote:
> >>>>>> Hi, Timo,  +1 for multi metadata.
> >>>>>>
> >>>>>> The compatible change I mean in the last email is the slight state
> >>>> change
> >>>>>> example you gave, so we have got  consensus on this actually, IMO.
> >>>>>>
> >>>>>> Another question based on the example you gave:
> >>>>>> In the example "JSON node gets an additional property in 1.16", if
> we
> >>>> don't
> >>>>>> update the version when plan format changes, we can't find that the
> >>>> plan
> >>>>>> can't not be deserialized in 1.15, although the savepoint state is
> >>>>>> compatible.
> >>>>>> The error message may be not so friendly if we just throw
> >>>> deserialization
> >>>>>> failure.
> >>>>>>
> >>>>>> On Tue, 7 Dec 2021 at 16:49, Timo Walther <twal...@apache.org>
> wrote:
> >>>>>>
> >>>>>>> Hi Wenlong,
> >>>>>>>
> >>>>>>>     > First,  we add a newStateLayout because of some improvement
> in
> >>>> state, in
> >>>>>>>     > order to keep compatibility we may still keep the old state
> for
> >>>> the
> >>>>>>> first
> >>>>>>>     > version. We need to update the version, so that we can
> generate
> >> a
> >>>> new
> >>>>>>>     > version plan for the new job and keep the exec node
> compatible
> >>>> with
> >>>>>>> the old
> >>>>>>>     > version plan.
> >>>>>>>
> >>>>>>> The problem that I see here for contributors is that the actual
> >> update
> >>>>>>> of a version is more complicated than just updating an integer
> value.
> >>>> It
> >>>>>>> means copying a lot of ExecNode code for a change that happens
> >> locally
> >>>>>>> in an operator. Let's assume multiple ExecNodes use a similar
> >>>> operator.
> >>>>>>> Why do we need to update all ExecNode versions, if the operator
> >> itself
> >>>>>>> can deal with the incompatibility. The ExecNode version is meant
> for
> >>>>>>> topology changes or fundamental state changes.
> >>>>>>>
> >>>>>>> If we don't find consensus on this topic, I would at least vote for
> >>>>>>> supporting multiple annotations for an ExecNode class. This way we
> >>>> don't
> >>>>>>> need to copy code but only add two ExecNode annotations with
> >> different
> >>>>>>> ExecNode versions.
> >>>>>>>
> >>>>>>>     > Maybe we can add support for this case :
> >>>>>>>     > when an exec node is changed in 1.16, but is compatible with
> >> 1.15,
> >>>>>>>     > we can use the node of 1.16 to deserialize the plan of 1.15.
> >>>>>>>
> >>>>>>> If the ExecNode is compatible, there is no reason to increase the
> >>>>>>> ExecNode version.
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> I tried to come up with a reworked solution to make all parties
> >> happy:
> >>>>>>>
> >>>>>>> 1. Let's assume the following annotations:
> >>>>>>>
> >>>>>>> supportedPlanFormat = [1.15]
> >>>>>>>
> >>>>>>> supportedSavepointFormat = 1.15
> >>>>>>>
> >>>>>>> we drop `added` as it is equal to `supportedSavepointFormat`
> >>>>>>>
> >>>>>>> 2. Multiple annotations over ExecNodes are possible:
> >>>>>>>
> >>>>>>> // operator state changes
> >>>>>>>
> >>>>>>> // initial introduction in 1.15
> >>>>>>> @ExecNodeMetadata(name=A, version=1, supportedPlanFormat=1.15,
> >>>>>>> supportedSavepointFormat=1.15)
> >>>>>>>
> >>>>>>> // state layout changed slightly in 1.16
> >>>>>>> // - operator migration is possible
> >>>>>>> // - operator supports state of both versions and will perform
> >>>> operator
> >>>>>>> state migration
> >>>>>>> // - new plans will get new ExecNode version
> >>>>>>> @ExecNodeMetadata(name=A, version=1, supportedPlanFormat=1.15,
> >>>>>>> supportedSavepointFormat=1.15)
> >>>>>>> @ExecNodeMetadata(name=A, version=2, supportedPlanFormat=1.15,
> >>>>>>> supportedSavepointFormat=1.16)
> >>>>>>>
> >>>>>>> // we force a plan migration in 1.17
> >>>>>>> // - we assume that all operator states have been migrated in the
> >>>>>>> previous version
> >>>>>>> // - we can safely replace the old version `1` with `2` and only
> keep
> >>>>>>> the new savepoint format
> >>>>>>> @ExecNodeMetadata(name=A, version=2, supportedPlanFormat=1.15,
> >>>>>>> supportedSavepointFormat=1.16)
> >>>>>>>
> >>>>>>>
> >>>>>>> // plan changes
> >>>>>>>
> >>>>>>> // initial introduction in 1.15
> >>>>>>> @ExecNodeMetadata(name=A, version=1, supportedPlanFormat=1.15,
> >>>>>>> supportedSavepointFormat=1.15)
> >>>>>>>
> >>>>>>> // JSON node gets an additional property in 1.16
> >>>>>>> // e.g. { some-prop: 42 } -> { some-prop: 42, some-flag: false}
> >>>>>>> // - ExecNode version does not change
> >>>>>>> // - ExecNode version only changes when topology or state is
> affected
> >>>>>>> // - we support both JSON plan formats, the old and the newest one
> >>>>>>> @ExecNodeMetadata(name=A, version=1, supportedPlanFormat=[1.15,
> >> 1.16],
> >>>>>>> supportedSavepointFormat=1.15)
> >>>>>>>
> >>>>>>> // we force a plan migration in 1.17
> >>>>>>> // - now we only support 1.16 plan format
> >>>>>>> @ExecNodeMetadata(name=A, version=1, supportedPlanFormat=1.16,
> >>>>>>> supportedSavepointFormat=1.15)
> >>>>>>>
> >>>>>>>
> >>>>>>> // topology change
> >>>>>>>
> >>>>>>> // initial introduction in 1.15
> >>>>>>> @ExecNodeMetadata(name=A, version=1, supportedPlanFormat=1.15,
> >>>>>>> supportedSavepointFormat=1.15)
> >>>>>>>
> >>>>>>> // complete new class structure in 1.16 annotated with
> >>>>>>> @ExecNodeMetadata(name=A, version=2, supportedPlanFormat=1.15,
> >>>>>>> supportedSavepointFormat=1.16)
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> What do you think?
> >>>>>>>
> >>>>>>>
> >>>>>>> Regards,
> >>>>>>> Timo
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> On 07.12.21 08:20, wenlong.lwl wrote:
> >>>>>>>> Maybe we can add support for this case :
> >>>>>>>>             when an exec node is changed in 1.16, but is
> compatible
> >>>> with
> >>>>>>> 1.15,
> >>>>>>>> we can use the node of 1.16 to deserialize the plan of 1.15.
> >>>>>>>> By this way, we don't need to fork the code if the change is
> >>>> compatible,
> >>>>>>>> and can avoid fork code frequently.
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> Best,
> >>>>>>>> Wenlong
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On Tue, 7 Dec 2021 at 15:08, wenlong.lwl <wenlong88....@gmail.com
> >
> >>>>>>> wrote:
> >>>>>>>>
> >>>>>>>>> hi, Timo, I would prefer to update the version every time we
> change
> >>>> the
> >>>>>>>>> state layer too.
> >>>>>>>>>
> >>>>>>>>> It could be possible that we change the exec node in 2 steps:
> >>>>>>>>> First,  we add a newStateLayout because of some improvement in
> >>>> state, in
> >>>>>>>>> order to keep compatibility we may still keep the old state for
> the
> >>>>>>> first
> >>>>>>>>> version. We need to update the version, so that we can generate a
> >>>> new
> >>>>>>>>> version plan for the new job and keep the exec node compatible
> with
> >>>> the
> >>>>>>> old
> >>>>>>>>> version plan.
> >>>>>>>>> After some versions, we may remove the old version state layout
> and
> >>>>>>> clean
> >>>>>>>>> up the deprecated code. We still need to update the version, so
> >>>> that we
> >>>>>>> can
> >>>>>>>>> verify that we are compatible with the plan after the first
> change,
> >>>> but
> >>>>>>> not
> >>>>>>>>> compatible with the plan earlier.
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> Best,
> >>>>>>>>> Wenlong
> >>>>>>>>>
> >>>>>>>>> On Mon, 6 Dec 2021 at 21:27, Timo Walther <twal...@apache.org>
> >>>> wrote:
> >>>>>>>>>
> >>>>>>>>>> Hi Godfrey,
> >>>>>>>>>>
> >>>>>>>>>>      > design makes thing more complex.
> >>>>>>>>>>
> >>>>>>>>>> Yes, the design might be a bit more complex. But operator
> >>>> migration is
> >>>>>>>>>> way easier than ExecNode migration at a later point in time for
> >>>> code
> >>>>>>>>>> maintenance. We know that ExecNodes can become pretty complex.
> >> Even
> >>>>>>>>>> though we have put a lot of code into `CommonXXExecNode` it will
> >>>> be a
> >>>>>>>>>> lot of work to maintain multiple versions of ExecNodes. If we
> can
> >>>> avoid
> >>>>>>>>>> this with operator state migration, this should always be
> >> preferred
> >>>>>>> over
> >>>>>>>>>> a new ExecNode version.
> >>>>>>>>>>
> >>>>>>>>>> I'm aware that operator state migration might only be important
> >> for
> >>>>>>>>>> roughly 10 % of all changes. A new ExecNode version will be used
> >>>> for
> >>>>>>> 90%
> >>>>>>>>>> of all changes.
> >>>>>>>>>>
> >>>>>>>>>>      > If there are multiple state layouts, which layout the
> >> ExecNode
> >>>>>>> should
> >>>>>>>>>> use?
> >>>>>>>>>>
> >>>>>>>>>> It is not the responsibility of the ExecNode to decide this but
> >> the
> >>>>>>>>>> operator. Something like:
> >>>>>>>>>>
> >>>>>>>>>> class X extends ProcessFunction {
> >>>>>>>>>>        ValueState<A> oldStateLayout;
> >>>>>>>>>>        ValueState<B> newStateLayout;
> >>>>>>>>>>
> >>>>>>>>>>        open() {
> >>>>>>>>>>          if (oldStateLayout.get() != null) {
> >>>>>>>>>>            performOperatorMigration();
> >>>>>>>>>>          }
> >>>>>>>>>>          useNewStateLayout();
> >>>>>>>>>>        }
> >>>>>>>>>> }
> >>>>>>>>>>
> >>>>>>>>>> Operator migration is meant for smaller "more local" changes
> >>>> without
> >>>>>>>>>> touching the ExecNode layer. The CEP library and DataStream API
> >>>> sources
> >>>>>>>>>> are performing operator migration for years already.
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>      > `supportedPlanChanges ` and `supportedSavepointChanges `
> are
> >>>> a bit
> >>>>>>>>>> obscure.
> >>>>>>>>>>
> >>>>>>>>>> Let me try to come up with more examples why I think both
> >>>> annotation
> >>>>>>>>>> make sense and are esp. important *for test coverage*.
> >>>>>>>>>>
> >>>>>>>>>> supportedPlanChanges:
> >>>>>>>>>>
> >>>>>>>>>> Let's assume we have some JSON in Flink 1.15:
> >>>>>>>>>>
> >>>>>>>>>> {
> >>>>>>>>>>        some-prop: 42
> >>>>>>>>>> }
> >>>>>>>>>>
> >>>>>>>>>> And we want to extend the JSON in Flink 1.16:
> >>>>>>>>>>
> >>>>>>>>>> {
> >>>>>>>>>>        some-prop: 42,
> >>>>>>>>>>        some-flag: false
> >>>>>>>>>> }
> >>>>>>>>>>
> >>>>>>>>>> Maybe we don't need to increase the ExecNode version but only
> >>>> ensure
> >>>>>>>>>> that the flag is set to `false` by default for the older
> versions.
> >>>>>>>>>>
> >>>>>>>>>> We need a location to track changes and document the changelog.
> >>>> With
> >>>>>>> the
> >>>>>>>>>> help of the annotation supportedPlanChanges = [1.15, 1.16] we
> can
> >>>>>>> verify
> >>>>>>>>>> that we have tests for both JSON formats.
> >>>>>>>>>>
> >>>>>>>>>> And once we decide to drop the 1.15 format, we enforce plan
> >>>> migration
> >>>>>>>>>> and fill-in the default value `false` into the old plans and
> bump
> >>>> their
> >>>>>>>>>> JSON plan version to 1.16 or higher.
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>      > once the state layout is changed, the ExecNode version
> needs
> >>>> also
> >>>>>>> be
> >>>>>>>>>> updated
> >>>>>>>>>>
> >>>>>>>>>> This will still be the majority of cases. But if we can avoid
> >>>> this, we
> >>>>>>>>>> should do it for not having too much duplicate code to maintain.
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> Thanks,
> >>>>>>>>>> Timo
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> On 06.12.21 09:58, godfrey he wrote:
> >>>>>>>>>>> Hi, Timo,
> >>>>>>>>>>>
> >>>>>>>>>>> Thanks for the detailed explanation.
> >>>>>>>>>>>
> >>>>>>>>>>>> We change an operator state of B in Flink 1.16. We perform the
> >>>> change
> >>>>>>>>>> in the operator of B in a way to support both state layouts.
> Thus,
> >>>> no
> >>>>>>> need
> >>>>>>>>>> for a new ExecNode version.
> >>>>>>>>>>>
> >>>>>>>>>>> I think this design makes thing more complex.
> >>>>>>>>>>> 1. If there are multiple state layouts, which layout the
> ExecNode
> >>>>>>>>>> should use ?
> >>>>>>>>>>> It increases the cost of understanding for developers
> (especially
> >>>> for
> >>>>>>>>>>> Flink newer),
> >>>>>>>>>>> making them prone to mistakes.
> >>>>>>>>>>> 2. `supportedPlanChanges ` and `supportedSavepointChanges `
> are a
> >>>> bit
> >>>>>>>>>> obscure.
> >>>>>>>>>>>
> >>>>>>>>>>> The purpose of ExecNode annotations are not only to support
> >>>> powerful
> >>>>>>>>>> validation,
> >>>>>>>>>>> but more importantly to make it easy for developers to
> understand
> >>>>>>>>>>> to ensure that every modification is easy and state compatible.
> >>>>>>>>>>>
> >>>>>>>>>>> I prefer, once the state layout is changed, the ExecNode
> version
> >>>> needs
> >>>>>>>>>>> also be updated.
> >>>>>>>>>>> which could make thing simple. How about
> >>>>>>>>>>> rename `supportedPlanChanges ` to `planCompatibleVersion`
> >>>>>>>>>>> (which means the plan is compatible with the plan generated by
> >> the
> >>>>>>>>>>> given version node)
> >>>>>>>>>>>       and rename `supportedSavepointChanges` to
> >>>>>>> `savepointCompatibleVersion
> >>>>>>>>>> `
> >>>>>>>>>>> (which means the state is compatible with the state generated
> by
> >>>> the
> >>>>>>>>>>> given version node) ?
> >>>>>>>>>>> The names also indicate that only one version value can be set.
> >>>>>>>>>>>
> >>>>>>>>>>> WDYT?
> >>>>>>>>>>>
> >>>>>>>>>>> Best,
> >>>>>>>>>>> Godfrey
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> Timo Walther <twal...@apache.org> 于2021年12月2日周四 下午11:42写道:
> >>>>>>>>>>>>
> >>>>>>>>>>>> Response to Marios's feedback:
> >>>>>>>>>>>>
> >>>>>>>>>>>>       > there should be some good logging in place when the
> >>>> upgrade is
> >>>>>>>>>> taking
> >>>>>>>>>>>> place
> >>>>>>>>>>>>
> >>>>>>>>>>>> Yes, I agree. I added this part to the FLIP.
> >>>>>>>>>>>>
> >>>>>>>>>>>>       > config option instead that doesn't provide the
> >> flexibility
> >>>> to
> >>>>>>>>>>>> overwrite certain plans
> >>>>>>>>>>>>
> >>>>>>>>>>>> One can set the config option also around sections of the
> >>>>>>>>>>>> multi-statement SQL script.
> >>>>>>>>>>>>
> >>>>>>>>>>>> SET 'table.plan.force-recompile'='true';
> >>>>>>>>>>>>
> >>>>>>>>>>>> COMPILE ...
> >>>>>>>>>>>>
> >>>>>>>>>>>> SET 'table.plan.force-recompile'='false';
> >>>>>>>>>>>>
> >>>>>>>>>>>> But the question is why a user wants to run COMPILE multiple
> >>>> times.
> >>>>>>> If
> >>>>>>>>>>>> it is during development, then running EXECUTE (or just the
> >>>> statement
> >>>>>>>>>>>> itself) without calling COMPILE should be sufficient. The file
> >>>> can
> >>>>>>> also
> >>>>>>>>>>>> manually be deleted if necessary.
> >>>>>>>>>>>>
> >>>>>>>>>>>> What do you think?
> >>>>>>>>>>>>
> >>>>>>>>>>>> Regards,
> >>>>>>>>>>>> Timo
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> On 02.12.21 16:09, Timo Walther wrote:
> >>>>>>>>>>>>> Hi Till,
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Yes, you might have to. But not a new plan from the SQL query
> >>>> but a
> >>>>>>>>>>>>> migration from the old plan to the new plan. This will not
> >>>> happen
> >>>>>>>>>> often.
> >>>>>>>>>>>>> But we need a way to evolve the format of the JSON plan
> itself.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Maybe this confuses a bit, so let me clarify it again: Mostly
> >>>>>>> ExecNode
> >>>>>>>>>>>>> versions and operator state layouts will evolve. Not the plan
> >>>> files,
> >>>>>>>>>>>>> those will be pretty stable. But also not infinitely.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Regards,
> >>>>>>>>>>>>> Timo
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> On 02.12.21 16:01, Till Rohrmann wrote:
> >>>>>>>>>>>>>> Then for migrating from Flink 1.10 to 1.12, I might have to
> >>>> create
> >>>>>>> a
> >>>>>>>>>> new
> >>>>>>>>>>>>>> plan using Flink 1.11 in order to migrate from Flink 1.11 to
> >>>> 1.12,
> >>>>>>>>>> right?
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Cheers,
> >>>>>>>>>>>>>> Till
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> On Thu, Dec 2, 2021 at 3:39 PM Timo Walther <
> >>>> twal...@apache.org>
> >>>>>>>>>> wrote:
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Response to Till's feedback:
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>        > compiled plan won't be changed after being written
> >>>> initially
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> This is not entirely correct. We give guarantees for
> keeping
> >>>> the
> >>>>>>>>>> query
> >>>>>>>>>>>>>>> up and running. We reserve us the right to force plan
> >>>> migrations.
> >>>>>>> In
> >>>>>>>>>>>>>>> this case, the plan might not be created from the SQL
> >>>> statement
> >>>>>>> but
> >>>>>>>>>> from
> >>>>>>>>>>>>>>> the old plan. I have added an example in section 10.1.1. In
> >>>>>>> general,
> >>>>>>>>>>>>>>> both persisted entities "plan" and "savepoint" can evolve
> >>>>>>>>>> independently
> >>>>>>>>>>>>>>> from each other.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Thanks,
> >>>>>>>>>>>>>>> Timo
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> On 02.12.21 15:10, Timo Walther wrote:
> >>>>>>>>>>>>>>>> Response to Godfrey's feedback:
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>        > "EXPLAIN PLAN EXECUTE STATEMENT SET BEGIN ...
> END"
> >> is
> >>>>>>> missing.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Thanks for the hint. I added a dedicated section 7.1.3.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>        > it's hard to maintain the supported versions for
> >>>>>>>>>>>>>>>> "supportedPlanChanges" and "supportedSavepointChanges"
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Actually, I think we are mostly on the same page.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> The annotation does not need to be updated for every Flink
> >>>>>>>>>> version. As
> >>>>>>>>>>>>>>>> the name suggests it is about "Changes" (in other words:
> >>>>>>>>>>>>>>>> incompatibilities) that require some kind of migration.
> >>>> Either
> >>>>>>> plan
> >>>>>>>>>>>>>>>> migration (= PlanChanges) or savepoint migration
> >>>>>>>>>> (=SavepointChanges,
> >>>>>>>>>>>>>>>> using operator migration or savepoint migration).
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Let's assume we introduced two ExecNodes A and B in Flink
> >>>> 1.15.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> The annotations are:
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> @ExecNodeMetadata(name=A, supportedPlanChanges=1.15,
> >>>>>>>>>>>>>>>> supportedSavepointChanges=1.15)
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> @ExecNodeMetadata(name=B, supportedPlanChanges=1.15,
> >>>>>>>>>>>>>>>> supportedSavepointChanges=1.15)
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> We change an operator state of B in Flink 1.16.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> We perform the change in the operator of B in a way to
> >>>> support
> >>>>>>> both
> >>>>>>>>>>>>>>>> state layouts. Thus, no need for a new ExecNode version.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> The annotations in 1.16 are:
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> @ExecNodeMetadata(name=A, supportedPlanChanges=1.15,
> >>>>>>>>>>>>>>>> supportedSavepointChanges=1.15)
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> @ExecNodeMetadata(name=B, supportedPlanChanges=1.15,
> >>>>>>>>>>>>>>>> supportedSavepointChanges=1.15, 1.16)
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> So the versions in the annotations are "start version"s.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> I don't think we need end versions? End version would mean
> >>>> that
> >>>>>>> we
> >>>>>>>>>> drop
> >>>>>>>>>>>>>>>> the ExecNode from the code base?
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Please check the section 10.1.1 again. I added a more
> >> complex
> >>>>>>>>>> example.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Thanks,
> >>>>>>>>>>>>>>>> Timo
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> On 01.12.21 16:29, Timo Walther wrote:
> >>>>>>>>>>>>>>>>> Response to Francesco's feedback:
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>        > *Proposed changes #6*: Other than defining this
> >> rule
> >>>> of
> >>>>>>>>>> thumb, we
> >>>>>>>>>>>>>>>>> must also make sure that compiling plans with these
> objects
> >>>> that
> >>>>>>>>>>>>>>>>> cannot be serialized in the plan must fail hard
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Yes, I totally agree. We will fail hard with a helpful
> >>>>>>> exception.
> >>>>>>>>>> Any
> >>>>>>>>>>>>>>>>> mistake e.g. using a inline object in Table API or an
> >>>> invalid
> >>>>>>>>>>>>>>>>> DataStream API source without uid should immediately
> fail a
> >>>> plan
> >>>>>>>>>>>>>>>>> compilation step. I added a remark to the FLIP again.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>        > What worries me is breaking changes, in
> particular
> >>>>>>>>>> behavioural
> >>>>>>>>>>>>>>>>> changes that might happen in connectors/formats
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Breaking changes in connectors and formats need to be
> >>>> encoded in
> >>>>>>>>>> the
> >>>>>>>>>>>>>>>>> options. I could also imagine to versioning in the
> factory
> >>>>>>>>>> identifier
> >>>>>>>>>>>>>>>>> `connector=kafka` and `connector=kafka-2`. If this is
> >>>> necessary.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> After thinking about your question again, I think we will
> >>>> also
> >>>>>>>>>> need
> >>>>>>>>>>>>>>>>> the same testing infrastructure for our connectors and
> >>>> formats.
> >>>>>>>>>> Esp.
> >>>>>>>>>>>>>>>>> restore tests and completeness test. I updated the
> document
> >>>>>>>>>>>>>>>>> accordingly. Also I added a way to generate UIDs for
> >>>> DataStream
> >>>>>>>>>> API
> >>>>>>>>>>>>>>>>> providers.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>        > *Functions:* Are we talking about the function
> name
> >>>> or the
> >>>>>>>>>>>>>>>>> function
> >>>>>>>>>>>>>>>>> complete signature?
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> For catalog functions, the identifier contains catalog
> name
> >>>> and
> >>>>>>>>>>>>>>>>> database name. For system functions, identifier contains
> >>>> only a
> >>>>>>>>>> name
> >>>>>>>>>>>>>>>>> which make function name and identifier identical. I
> >>>> reworked
> >>>>>>> the
> >>>>>>>>>>>>>>>>> section again and also fixed some of the naming conflicts
> >>>> you
> >>>>>>>>>>>>>>>>> mentioned.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>        > we should perhaps use a logically defined
> unique id
> >>>> like
> >>>>>>>>>>>>>>>>> /bigIntToTimestamp/
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> I added a concrete example for the resolution and
> >>>> restoration.
> >>>>>>> The
> >>>>>>>>>>>>>>>>> unique id is composed of name + version. Internally, this
> >> is
> >>>>>>>>>>>>>>>>> represented as `$TO_TIMESTAMP_LTZ$1`.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>        > I think we should rather keep JSON out of the
> >> concept
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Sounds ok to me. In SQL we also just call it "plan". I
> will
> >>>>>>>>>> change the
> >>>>>>>>>>>>>>>>> file sections. But would suggest to keep the
> fromJsonString
> >>>>>>>>>> method.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>        > write it back in the original plan file
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> I updated the terminology section for what we consider an
> >>>>>>>>>> "upgrade".
> >>>>>>>>>>>>>>>>> We might need to update the orginal plan file. This is
> >>>> already
> >>>>>>>>>>>>>>>>> considered in the COMPILE PLAN ... FROM ... even though
> >>>> this is
> >>>>>>>>>> future
> >>>>>>>>>>>>>>>>> work. Also savepoint migration.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Thanks for all the feedback!
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Timo
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> On 30.11.21 14:28, Timo Walther wrote:
> >>>>>>>>>>>>>>>>>> Response to Wenlongs's feedback:
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>        > I would prefer not to provide such a shortcut,
> let
> >>>> users
> >>>>>>> use
> >>>>>>>>>>>>>>>>>> COMPILE PLAN IF NOT EXISTS and EXECUTE explicitly, which
> >>>> can be
> >>>>>>>>>>>>>>>>>> understood by new users even without inferring the docs.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> I would like to hear more opinions on this topic.
> >>>> Personally, I
> >>>>>>>>>> find
> >>>>>>>>>>>>>>>>>> a combined statement very useful. Not only for quicker
> >>>>>>>>>> development
> >>>>>>>>>>>>>>>>>> and debugging but also for readability. It helps in
> >>>> keeping the
> >>>>>>>>>> JSON
> >>>>>>>>>>>>>>>>>> path and the query close to each other in order to know
> >> the
> >>>>>>>>>> origin of
> >>>>>>>>>>>>>>>>>> the plan.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>        > but the plan and SQL are not matched. The
> result
> >>>> would be
> >>>>>>>>>> quite
> >>>>>>>>>>>>>>>>>> confusing if we still execute the plan directly, we may
> >>>> need to
> >>>>>>>>>> add a
> >>>>>>>>>>>>>>>>>> validation.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> You are right that there could be a mismatch. But we
> have
> >> a
> >>>>>>>>>> similar
> >>>>>>>>>>>>>>>>>> problem when executing CREATE TABLE IF NOT EXISTS. The
> >>>> schema
> >>>>>>> or
> >>>>>>>>>>>>>>>>>> options of a table could have changed completely in the
> >>>> catalog
> >>>>>>>>>> but
> >>>>>>>>>>>>>>>>>> the CREATE TABLE IF NOT EXISTS is not executed again.
> So a
> >>>>>>>>>> mismatch
> >>>>>>>>>>>>>>>>>> could also occur there.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Regards,
> >>>>>>>>>>>>>>>>>> Timo
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> On 30.11.21 14:17, Timo Walther wrote:
> >>>>>>>>>>>>>>>>>>> Hi everyone,
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> thanks for the feedback so far. Let me answer each
> email
> >>>>>>>>>>>>>>>>>>> indvidually.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> I will start with a response to Ingo's feedback:
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>        > Will the JSON plan's schema be considered an
> API?
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> No, not in the first version. This is explicitly
> >>>> mentioned in
> >>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>> `General JSON Plan Assumptions`. I tried to improve the
> >>>>>>> section
> >>>>>>>>>> once
> >>>>>>>>>>>>>>>>>>> more to make it clearer. However, the JSON plan is
> >>>> definitely
> >>>>>>>>>> stable
> >>>>>>>>>>>>>>>>>>> per minor version. And since the plan is versioned by
> >>>> Flink
> >>>>>>>>>> version,
> >>>>>>>>>>>>>>>>>>> external tooling could be build around it. We might
> make
> >>>> it
> >>>>>>>>>> public
> >>>>>>>>>>>>>>>>>>> API once the design has settled.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>        > Given that upgrades across multiple versions
> at
> >>>> once are
> >>>>>>>>>>>>>>>>>>> unsupported, do we verify this somehow?
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Good question. I extended the `General JSON Plan
> >>>> Assumptions`.
> >>>>>>>>>> Now
> >>>>>>>>>>>>>>>>>>> yes: the Flink version is part of the JSON plan and
> will
> >>>> be
> >>>>>>>>>> verified
> >>>>>>>>>>>>>>>>>>> during restore. But keep in mind that we might support
> >>>> more
> >>>>>>> that
> >>>>>>>>>>>>>>>>>>> just the last version at least until the JSON plan has
> >>>> been
> >>>>>>>>>>>>>>>>>>> migrated.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Regards,
> >>>>>>>>>>>>>>>>>>> Timo
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> On 30.11.21 09:39, Marios Trivyzas wrote:
> >>>>>>>>>>>>>>>>>>>> I have a question regarding the `COMPILE PLAN
> OVEWRITE`.
> >>>> If
> >>>>>>> we
> >>>>>>>>>>>>>>>>>>>> choose to go
> >>>>>>>>>>>>>>>>>>>> with the config option instead,
> >>>>>>>>>>>>>>>>>>>> that doesn't provide the flexibility to overwrite
> >> certain
> >>>>>>>>>> plans but
> >>>>>>>>>>>>>>>>>>>> not
> >>>>>>>>>>>>>>>>>>>> others, since the config applies globally, isn't that
> >>>>>>>>>>>>>>>>>>>> something to consider?
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> On Mon, Nov 29, 2021 at 10:15 AM Marios Trivyzas <
> >>>>>>>>>> mat...@gmail.com>
> >>>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> Hi Timo!
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> Thanks a lot for taking all that time and effort to
> put
> >>>>>>>>>> together
> >>>>>>>>>>>>>>> this
> >>>>>>>>>>>>>>>>>>>>> proposal!
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> Regarding:
> >>>>>>>>>>>>>>>>>>>>>> For simplification of the design, we assume that
> >>>> upgrades
> >>>>>>>>>> use a
> >>>>>>>>>>>>>>>>>>>>>> step size
> >>>>>>>>>>>>>>>>>>>>> of a single
> >>>>>>>>>>>>>>>>>>>>> minor version. We don't guarantee skipping minor
> >>>> versions
> >>>>>>>>>> (e.g.
> >>>>>>>>>>>>>>>>>>>>> 1.11 to
> >>>>>>>>>>>>>>>>>>>>> 1.14).
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> I think that for this first step we should make it
> >>>>>>> absolutely
> >>>>>>>>>>>>>>>>>>>>> clear to the
> >>>>>>>>>>>>>>>>>>>>> users that they would need to go through all
> >>>> intermediate
> >>>>>>>>>> versions
> >>>>>>>>>>>>>>>>>>>>> to end up with the target version they wish. If we
> are
> >>>> to
> >>>>>>>>>> support
> >>>>>>>>>>>>>>>>>>>>> skipping
> >>>>>>>>>>>>>>>>>>>>> versions in the future, i.e. upgrade from 1.14 to
> 1.17,
> >>>> this
> >>>>>>>>>> means
> >>>>>>>>>>>>>>>>>>>>> that we need to have a testing infrastructure in
> place
> >>>> that
> >>>>>>>>>> would
> >>>>>>>>>>>>>>>>>>>>> test all
> >>>>>>>>>>>>>>>>>>>>> possible combinations of version upgrades, i.e. from
> >>>> 1.14 to
> >>>>>>>>>> 1.15,
> >>>>>>>>>>>>>>>>>>>>> from 1.14 to 1.16 and so forth, while still testing
> and
> >>>> of
> >>>>>>>>>> course
> >>>>>>>>>>>>>>>>>>>>> supporting all the upgrades from the previous minor
> >>>> version.
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> I like a lot the idea of introducing HINTS to define
> >>>> some
> >>>>>>>>>>>>>>>>>>>>> behaviour in the
> >>>>>>>>>>>>>>>>>>>>> programs!
> >>>>>>>>>>>>>>>>>>>>> - the hints live together with the sql statements and
> >>>>>>>>>> consequently
> >>>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>> (JSON) plans.
> >>>>>>>>>>>>>>>>>>>>> - If multiple queries are involved in a program, each
> >>>> one of
> >>>>>>>>>> them
> >>>>>>>>>>>>>>> can
> >>>>>>>>>>>>>>>>>>>>> define its own config (regarding plan optimisation,
> not
> >>>> null
> >>>>>>>>>>>>>>>>>>>>> enforcement,
> >>>>>>>>>>>>>>>>>>>>> etc)
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> I agree with Francesco on his argument regarding the
> >>>> *JSON*
> >>>>>>>>>>>>>>>>>>>>> plan. I
> >>>>>>>>>>>>>>>>>>>>> believe we should already provide flexibility here,
> >>>> since
> >>>>>>> (who
> >>>>>>>>>>>>>>>>>>>>> knows) in
> >>>>>>>>>>>>>>>>>>>>> the future
> >>>>>>>>>>>>>>>>>>>>> a JSON plan might not fulfil the desired
> functionality.
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> I also agree that we need some very obvious way (i.e.
> >>>> not
> >>>>>>> log
> >>>>>>>>>>>>>>>>>>>>> entry) to
> >>>>>>>>>>>>>>>>>>>>> show the users that their program doesn't support
> >>>> version
> >>>>>>>>>>>>>>>>>>>>> upgrades, and
> >>>>>>>>>>>>>>>>>>>>> prevent them from being negatively surprised in the
> >>>> future,
> >>>>>>>>>> when
> >>>>>>>>>>>>>>>>>>>>> trying to
> >>>>>>>>>>>>>>>>>>>>> upgrade their production pipelines.
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> This is an implementation detail, but I'd like to add
> >>>> that
> >>>>>>>>>> there
> >>>>>>>>>>>>>>>>>>>>> should be
> >>>>>>>>>>>>>>>>>>>>> some good logging in place when the upgrade is taking
> >>>> place,
> >>>>>>>>>> to be
> >>>>>>>>>>>>>>>>>>>>> able to track every restoration action, and help
> debug
> >>>> any
> >>>>>>>>>>>>>>>>>>>>> potential
> >>>>>>>>>>>>>>>>>>>>> issues arising from that.
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> On Fri, Nov 26, 2021 at 2:54 PM Till Rohrmann
> >>>>>>>>>>>>>>>>>>>>> <trohrm...@apache.org
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> Thanks for writing this FLIP Timo. I think this will
> >>>> be a
> >>>>>>>>>> very
> >>>>>>>>>>>>>>>>>>>>>> important
> >>>>>>>>>>>>>>>>>>>>>> improvement for Flink and our SQL user :-)
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> Similar to Francesco I would like to understand the
> >>>>>>> statement
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> For simplification of the design, we assume that
> >>>> upgrades
> >>>>>>>>>> use a
> >>>>>>>>>>>>>>>>>>>>>>> step
> >>>>>>>>>>>>>>>>>>>>>> size
> >>>>>>>>>>>>>>>>>>>>>> of a single
> >>>>>>>>>>>>>>>>>>>>>> minor version. We don't guarantee skipping minor
> >>>> versions
> >>>>>>>>>> (e.g.
> >>>>>>>>>>>>>>>>>>>>>> 1.11 to
> >>>>>>>>>>>>>>>>>>>>>> 1.14).
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> a bit better. Is it because Flink does not guarantee
> >>>> that a
> >>>>>>>>>>>>>>>>>>>>>> savepoint
> >>>>>>>>>>>>>>>>>>>>>> created by version 1.x can be directly recovered by
> >>>> version
> >>>>>>>>>> 1.y
> >>>>>>>>>>>>>>>>>>>>>> with x + 1
> >>>>>>>>>>>>>>>>>>>>>> < y but users might have to go through a cascade of
> >>>>>>> upgrades?
> >>>>>>>>>>>>>>>>>>>>>>       From how I
> >>>>>>>>>>>>>>>>>>>>>> understand your proposal, the compiled plan won't be
> >>>>>>> changed
> >>>>>>>>>>>>>>>>>>>>>> after being
> >>>>>>>>>>>>>>>>>>>>>> written initially. Hence, I would assume that for
> the
> >>>> plan
> >>>>>>>>>> alone
> >>>>>>>>>>>>>>>>>>>>>> Flink
> >>>>>>>>>>>>>>>>>>>>>> will
> >>>>>>>>>>>>>>>>>>>>>> have to give backwards compatibility guarantees for
> >> all
> >>>>>>>>>> versions.
> >>>>>>>>>>>>>>>>>>>>>> Am I
> >>>>>>>>>>>>>>>>>>>>>> understanding this part correctly?
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> Cheers,
> >>>>>>>>>>>>>>>>>>>>>> Till
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> On Thu, Nov 25, 2021 at 4:55 PM Francesco Guardiani
> <
> >>>>>>>>>>>>>>>>>>>>>> france...@ververica.com>
> >>>>>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> Hi Timo,
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> Thanks for putting this amazing work together, I
> have
> >>>> some
> >>>>>>>>>>>>>>>>>>>>>>> considerations/questions
> >>>>>>>>>>>>>>>>>>>>>>> about the FLIP:
> >>>>>>>>>>>>>>>>>>>>>>> *Proposed changes #6*: Other than defining this
> rule
> >>>> of
> >>>>>>>>>> thumb,
> >>>>>>>>>>>>>>>>>>>>>>> we must
> >>>>>>>>>>>>>>>>>>>>>>> also make sure
> >>>>>>>>>>>>>>>>>>>>>>> that compiling plans with these objects that cannot
> >> be
> >>>>>>>>>>>>>>>>>>>>>>> serialized in the
> >>>>>>>>>>>>>>>>>>>>>>> plan must fail hard,
> >>>>>>>>>>>>>>>>>>>>>>> so users don't bite themselves with such issues, or
> >> at
> >>>>>>>>>> least we
> >>>>>>>>>>>>>>>>>>>>>>> need to
> >>>>>>>>>>>>>>>>>>>>>>> output warning
> >>>>>>>>>>>>>>>>>>>>>>> logs. In general, whenever the user is trying to
> use
> >>>> the
> >>>>>>>>>>>>>>>>>>>>>>> CompiledPlan
> >>>>>>>>>>>>>>>>>>>>>> APIs
> >>>>>>>>>>>>>>>>>>>>>>> and at the same
> >>>>>>>>>>>>>>>>>>>>>>> time, they're trying to do something "illegal" for
> >> the
> >>>>>>>>>> plan, we
> >>>>>>>>>>>>>>>>>>>>>>> should
> >>>>>>>>>>>>>>>>>>>>>>> immediately either
> >>>>>>>>>>>>>>>>>>>>>>> log or fail depending on the issue, in order to
> avoid
> >>>> any
> >>>>>>>>>>>>>>>>>>>>>>> surprises once
> >>>>>>>>>>>>>>>>>>>>>>> the user upgrades.
> >>>>>>>>>>>>>>>>>>>>>>> I would also say the same for things like
> registering
> >>>> a
> >>>>>>>>>>>>>>>>>>>>>>> function,
> >>>>>>>>>>>>>>>>>>>>>>> registering a DataStream,
> >>>>>>>>>>>>>>>>>>>>>>> and for every other thing which won't end up in the
> >>>> plan,
> >>>>>>> we
> >>>>>>>>>>>>>>>>>>>>>>> should log
> >>>>>>>>>>>>>>>>>>>>>>> such info to the
> >>>>>>>>>>>>>>>>>>>>>>> user by default.
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> *General JSON Plan Assumptions #9:* When thinking
> to
> >>>>>>>>>> connectors
> >>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>>>>>>> formats, I think
> >>>>>>>>>>>>>>>>>>>>>>> it's reasonable to assume and keep out of the
> feature
> >>>>>>> design
> >>>>>>>>>>>>>>>>>>>>>>> that no
> >>>>>>>>>>>>>>>>>>>>>>> feature/ability can
> >>>>>>>>>>>>>>>>>>>>>>> deleted from a connector/format. I also don't think
> >>>> new
> >>>>>>>>>>>>>>>>>>>>>> features/abilities
> >>>>>>>>>>>>>>>>>>>>>>> can influence
> >>>>>>>>>>>>>>>>>>>>>>> this FLIP as well, given the plan is static, so if
> >> for
> >>>>>>>>>> example,
> >>>>>>>>>>>>>>>>>>>>>>> MyCoolTableSink in the next
> >>>>>>>>>>>>>>>>>>>>>>> flink version implements
> SupportsProjectionsPushDown,
> >>>> then
> >>>>>>>>>> it
> >>>>>>>>>>>>>>>>>>>>>>> shouldn't
> >>>>>>>>>>>>>>>>>>>>>> be
> >>>>>>>>>>>>>>>>>>>>>>> a problem
> >>>>>>>>>>>>>>>>>>>>>>> for the upgrade story since the plan is still
> >>>> configured
> >>>>>>> as
> >>>>>>>>>>>>>>>>>>>>>>> computed
> >>>>>>>>>>>>>>>>>>>>>> from
> >>>>>>>>>>>>>>>>>>>>>>> the previous flink
> >>>>>>>>>>>>>>>>>>>>>>> version. What worries me is breaking changes, in
> >>>>>>> particular
> >>>>>>>>>>>>>>>>>>>>>>> behavioural
> >>>>>>>>>>>>>>>>>>>>>>> changes that
> >>>>>>>>>>>>>>>>>>>>>>> might happen in connectors/formats. Although this
> >>>> argument
> >>>>>>>>>>>>>>>>>>>>>>> doesn't seem
> >>>>>>>>>>>>>>>>>>>>>>> relevant for
> >>>>>>>>>>>>>>>>>>>>>>> the connectors shipped by the flink project itself,
> >>>>>>> because
> >>>>>>>>>> we
> >>>>>>>>>>>>>>>>>>>>>>> try to
> >>>>>>>>>>>>>>>>>>>>>> keep
> >>>>>>>>>>>>>>>>>>>>>>> them as stable as
> >>>>>>>>>>>>>>>>>>>>>>> possible and avoid eventual breaking changes, it's
> >>>>>>>>>> compelling to
> >>>>>>>>>>>>>>>>>>>>>> external
> >>>>>>>>>>>>>>>>>>>>>>> connectors and
> >>>>>>>>>>>>>>>>>>>>>>> formats, which might be decoupled from the flink
> >>>> release
> >>>>>>>>>> cycle
> >>>>>>>>>>>>>>>>>>>>>>> and might
> >>>>>>>>>>>>>>>>>>>>>>> have different
> >>>>>>>>>>>>>>>>>>>>>>> backward compatibility guarantees. It's totally
> >>>> reasonable
> >>>>>>>>>> if we
> >>>>>>>>>>>>>>>>>>>>>>> don't
> >>>>>>>>>>>>>>>>>>>>>>> want to tackle it in
> >>>>>>>>>>>>>>>>>>>>>>> this first iteration of the feature, but it's
> >>>> something we
> >>>>>>>>>> need
> >>>>>>>>>>>>>>>>>>>>>>> to keep
> >>>>>>>>>>>>>>>>>>>>>> in
> >>>>>>>>>>>>>>>>>>>>>>> mind for the future.
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> *Functions:* It's not clear to me what you mean for
> >>>>>>>>>>>>>>>>>>>>>>> "identifier",
> >>>>>>>>>>>>>>>>>>>>>> because
> >>>>>>>>>>>>>>>>>>>>>>> then somewhere
> >>>>>>>>>>>>>>>>>>>>>>> else in the same context you talk about "name". Are
> >> we
> >>>>>>>>>> talking
> >>>>>>>>>>>>>>>>>>>>>>> about the
> >>>>>>>>>>>>>>>>>>>>>>> function name
> >>>>>>>>>>>>>>>>>>>>>>> or the function complete signature? Let's assume
> for
> >>>>>>>>>> example we
> >>>>>>>>>>>>>>>>>>>>>>> have
> >>>>>>>>>>>>>>>>>>>>>> these
> >>>>>>>>>>>>>>>>>>>>>>> function
> >>>>>>>>>>>>>>>>>>>>>>> definitions:
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> * TO_TIMESTAMP_LTZ(BIGINT)
> >>>>>>>>>>>>>>>>>>>>>>> * TO_TIMESTAMP_LTZ(STRING)
> >>>>>>>>>>>>>>>>>>>>>>> * TO_TIMESTAMP_LTZ(STRING, STRING)
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> These for me are very different functions with
> >>>> different
> >>>>>>>>>>>>>>>>>>>>>> implementations,
> >>>>>>>>>>>>>>>>>>>>>>> where each of
> >>>>>>>>>>>>>>>>>>>>>>> them might evolve separately at a different pace.
> >>>> Hence
> >>>>>>>>>> when we
> >>>>>>>>>>>>>>>>>>>>>>> store
> >>>>>>>>>>>>>>>>>>>>>> them
> >>>>>>>>>>>>>>>>>>>>>>> in the json
> >>>>>>>>>>>>>>>>>>>>>>> plan we should perhaps use a logically defined
> unique
> >>>> id
> >>>>>>>>>> like
> >>>>>>>>>>>>>>>>>>>>>>> /bigIntToTimestamp/, /
> >>>>>>>>>>>>>>>>>>>>>>> stringToTimestamp/ and
> /stringToTimestampWithFormat/.
> >>>> This
> >>>>>>>>>> also
> >>>>>>>>>>>>>>>>>>>>>>> solves
> >>>>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>> issue of
> >>>>>>>>>>>>>>>>>>>>>>> correctly referencing the functions when restoring
> >> the
> >>>>>>> plan,
> >>>>>>>>>>>>>>>>>>>>>>> without
> >>>>>>>>>>>>>>>>>>>>>>> running again the
> >>>>>>>>>>>>>>>>>>>>>>> inference logic (which might have been changed in
> the
> >>>>>>>>>> meantime)
> >>>>>>>>>>>>>>>>>>>>>>> and it
> >>>>>>>>>>>>>>>>>>>>>>> might also solve
> >>>>>>>>>>>>>>>>>>>>>>> the versioning, that is the function identifier can
> >>>>>>> contain
> >>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>> function
> >>>>>>>>>>>>>>>>>>>>>>> version like /
> >>>>>>>>>>>>>>>>>>>>>>> stringToTimestampWithFormat_1_1 /or
> >>>>>>>>>>>>>>>>>>>>>>> /stringToTimestampWithFormat_1_2/.
> >>>>>>>>>>>>>>>>>>>>>> An
> >>>>>>>>>>>>>>>>>>>>>>> alternative could be to use the string signature
> >>>>>>>>>> representation,
> >>>>>>>>>>>>>>>>>>>>>>> which
> >>>>>>>>>>>>>>>>>>>>>>> might not be trivial
> >>>>>>>>>>>>>>>>>>>>>>> to compute, given the complexity of our type
> >> inference
> >>>>>>>>>> logic.
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> *The term "JSON plan"*: I think we should rather
> keep
> >>>> JSON
> >>>>>>>>>> out
> >>>>>>>>>>>>>>>>>>>>>>> of the
> >>>>>>>>>>>>>>>>>>>>>>> concept and just
> >>>>>>>>>>>>>>>>>>>>>>> name it "Compiled Plan" (like the proposed API) or
> >>>>>>> something
> >>>>>>>>>>>>>>>>>>>>>>> similar,
> >>>>>>>>>>>>>>>>>>>>>> as I
> >>>>>>>>>>>>>>>>>>>>>>> see how in
> >>>>>>>>>>>>>>>>>>>>>>> future we might decide to support/modify our
> >>>> persistence
> >>>>>>>>>>>>>>>>>>>>>>> format to
> >>>>>>>>>>>>>>>>>>>>>>> something more
> >>>>>>>>>>>>>>>>>>>>>>> efficient storage wise like BSON. For example, I
> >> would
> >>>>>>>>>> rename /
> >>>>>>>>>>>>>>>>>>>>>>> CompiledPlan.fromJsonFile/ to simply
> >>>>>>>>>> /CompiledPlan.fromFile/.
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> *Who is the owner of the plan file?* I asked myself
> >>>> this
> >>>>>>>>>>>>>>>>>>>>>>> question when
> >>>>>>>>>>>>>>>>>>>>>>> reading this:
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> For simplification of the design, we assume that
> >>>> upgrades
> >>>>>>>>>> use a
> >>>>>>>>>>>>>>>>>>>>>>>> step
> >>>>>>>>>>>>>>>>>>>>>>> size of a single
> >>>>>>>>>>>>>>>>>>>>>>> minor version. We don't guarantee skipping minor
> >>>> versions
> >>>>>>>>>> (e.g.
> >>>>>>>>>>>>>>>>>>>>>>> 1.11 to
> >>>>>>>>>>>>>>>>>>>>>>> 1.14).
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> My understanding of this statement is that a user
> can
> >>>>>>>>>> upgrade
> >>>>>>>>>>>>>>>>>>>>>>> between
> >>>>>>>>>>>>>>>>>>>>>>> minors but then
> >>>>>>>>>>>>>>>>>>>>>>> following all the minors, the same query can remain
> >>>> up and
> >>>>>>>>>>>>>>> running.
> >>>>>>>>>>>>>>>>>>>>>> E.g. I
> >>>>>>>>>>>>>>>>>>>>>>> upgrade from
> >>>>>>>>>>>>>>>>>>>>>>> 1.15 to 1.16, and then from 1.16 to 1.17 and I
> still
> >>>>>>> expect
> >>>>>>>>>> my
> >>>>>>>>>>>>>>>>>>>>>>> original
> >>>>>>>>>>>>>>>>>>>>>>> query to work
> >>>>>>>>>>>>>>>>>>>>>>> without recomputing the plan. This necessarily
> means
> >>>> that
> >>>>>>> at
> >>>>>>>>>>>>>>>>>>>>>>> some point
> >>>>>>>>>>>>>>>>>>>>>> in
> >>>>>>>>>>>>>>>>>>>>>>> future
> >>>>>>>>>>>>>>>>>>>>>>> releases we'll need some basic "migration" tool to
> >>>> keep
> >>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>> queries up
> >>>>>>>>>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>>>>>>> running,
> >>>>>>>>>>>>>>>>>>>>>>> ending up modifying the compiled plan. So I guess
> >>>> flink
> >>>>>>>>>> should
> >>>>>>>>>>>>>>>>>>>>>>> write it
> >>>>>>>>>>>>>>>>>>>>>>> back in the original
> >>>>>>>>>>>>>>>>>>>>>>> plan file, perhaps doing a backup of the previous
> >>>> one? Can
> >>>>>>>>>> you
> >>>>>>>>>>>>>>>>>>>>>>> please
> >>>>>>>>>>>>>>>>>>>>>>> clarify this aspect?
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> Except these considerations, the proposal looks
> good
> >>>> to me
> >>>>>>>>>>>>>>>>>>>>>>> and I'm
> >>>>>>>>>>>>>>>>>>>>>> eagerly
> >>>>>>>>>>>>>>>>>>>>>>> waiting to see
> >>>>>>>>>>>>>>>>>>>>>>> it in play.
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> Thanks,
> >>>>>>>>>>>>>>>>>>>>>>> FG
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> --
> >>>>>>>>>>>>>>>>>>>>>>> Francesco Guardiani | Software Engineer
> >>>>>>>>>>>>>>>>>>>>>>> france...@ververica.com[1]
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> Follow us @VervericaData
> >>>>>>>>>>>>>>>>>>>>>>> --
> >>>>>>>>>>>>>>>>>>>>>>> Join Flink Forward[2] - The Apache Flink Conference
> >>>>>>>>>>>>>>>>>>>>>>> Stream Processing | Event Driven | Real Time
> >>>>>>>>>>>>>>>>>>>>>>> --
> >>>>>>>>>>>>>>>>>>>>>>> Ververica GmbH | Invalidenstrasse 115, 10115
> Berlin,
> >>>>>>> Germany
> >>>>>>>>>>>>>>>>>>>>>>> --
> >>>>>>>>>>>>>>>>>>>>>>> Ververica GmbH
> >>>>>>>>>>>>>>>>>>>>>>> Registered at Amtsgericht Charlottenburg: HRB
> 158244
> >> B
> >>>>>>>>>>>>>>>>>>>>>>> Managing Directors: Karl Anton Wehner, Holger
> Temme,
> >>>> Yip
> >>>>>>>>>> Park
> >>>>>>>>>>>>>>>>>>>>>>> Tung
> >>>>>>>>>>>>>>>>>>>>>> Jason,
> >>>>>>>>>>>>>>>>>>>>>>> Jinwei (Kevin)
> >>>>>>>>>>>>>>>>>>>>>>> Zhang
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> --------
> >>>>>>>>>>>>>>>>>>>>>>> [1] mailto:france...@ververica.com
> >>>>>>>>>>>>>>>>>>>>>>> [2] https://flink-forward.org/
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> --
> >>>>>>>>>>>>>>>>>>>>> Marios
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>>
> >>>
> >>
> >>
> >
>
>

Reply via email to