Hi Till,Yes, you might have to. But not a new plan from the SQL query but a migration from the old plan to the new plan. This will not happen often. But we need a way to evolve the format of the JSON plan itself.
Maybe this confuses a bit, so let me clarify it again: Mostly ExecNode versions and operator state layouts will evolve. Not the plan files, those will be pretty stable. But also not infinitely.
Regards, Timo On 02.12.21 16:01, Till Rohrmann wrote:
Then for migrating from Flink 1.10 to 1.12, I might have to create a new plan using Flink 1.11 in order to migrate from Flink 1.11 to 1.12, right? Cheers, Till On Thu, Dec 2, 2021 at 3:39 PM Timo Walther <twal...@apache.org> wrote:Response to Till's feedback: > compiled plan won't be changed after being written initially This is not entirely correct. We give guarantees for keeping the query up and running. We reserve us the right to force plan migrations. In this case, the plan might not be created from the SQL statement but from the old plan. I have added an example in section 10.1.1. In general, both persisted entities "plan" and "savepoint" can evolve independently from each other. Thanks, Timo On 02.12.21 15:10, Timo Walther wrote:Response to Godfrey's feedback: > "EXPLAIN PLAN EXECUTE STATEMENT SET BEGIN ... END" is missing. Thanks for the hint. I added a dedicated section 7.1.3. > it's hard to maintain the supported versions for "supportedPlanChanges" and "supportedSavepointChanges" Actually, I think we are mostly on the same page. The annotation does not need to be updated for every Flink version. As the name suggests it is about "Changes" (in other words: incompatibilities) that require some kind of migration. Either plan migration (= PlanChanges) or savepoint migration (=SavepointChanges, using operator migration or savepoint migration). Let's assume we introduced two ExecNodes A and B in Flink 1.15. The annotations are: @ExecNodeMetadata(name=A, supportedPlanChanges=1.15, supportedSavepointChanges=1.15) @ExecNodeMetadata(name=B, supportedPlanChanges=1.15, supportedSavepointChanges=1.15) We change an operator state of B in Flink 1.16. We perform the change in the operator of B in a way to support both state layouts. Thus, no need for a new ExecNode version. The annotations in 1.16 are: @ExecNodeMetadata(name=A, supportedPlanChanges=1.15, supportedSavepointChanges=1.15) @ExecNodeMetadata(name=B, supportedPlanChanges=1.15, supportedSavepointChanges=1.15, 1.16) So the versions in the annotations are "start version"s. I don't think we need end versions? End version would mean that we drop the ExecNode from the code base? Please check the section 10.1.1 again. I added a more complex example. Thanks, Timo On 01.12.21 16:29, Timo Walther wrote:Response to Francesco's feedback: > *Proposed changes #6*: Other than defining this rule of thumb, we must also make sure that compiling plans with these objects that cannot be serialized in the plan must fail hard Yes, I totally agree. We will fail hard with a helpful exception. Any mistake e.g. using a inline object in Table API or an invalid DataStream API source without uid should immediately fail a plan compilation step. I added a remark to the FLIP again. > What worries me is breaking changes, in particular behavioural changes that might happen in connectors/formats Breaking changes in connectors and formats need to be encoded in the options. I could also imagine to versioning in the factory identifier `connector=kafka` and `connector=kafka-2`. If this is necessary. After thinking about your question again, I think we will also need the same testing infrastructure for our connectors and formats. Esp. restore tests and completeness test. I updated the document accordingly. Also I added a way to generate UIDs for DataStream API providers. > *Functions:* Are we talking about the function name or the function complete signature? For catalog functions, the identifier contains catalog name and database name. For system functions, identifier contains only a name which make function name and identifier identical. I reworked the section again and also fixed some of the naming conflicts you mentioned. > we should perhaps use a logically defined unique id like /bigIntToTimestamp/ I added a concrete example for the resolution and restoration. The unique id is composed of name + version. Internally, this is represented as `$TO_TIMESTAMP_LTZ$1`. > I think we should rather keep JSON out of the concept Sounds ok to me. In SQL we also just call it "plan". I will change the file sections. But would suggest to keep the fromJsonString method. > write it back in the original plan file I updated the terminology section for what we consider an "upgrade". We might need to update the orginal plan file. This is already considered in the COMPILE PLAN ... FROM ... even though this is future work. Also savepoint migration. Thanks for all the feedback! Timo On 30.11.21 14:28, Timo Walther wrote:Response to Wenlongs's feedback: > I would prefer not to provide such a shortcut, let users use COMPILE PLAN IF NOT EXISTS and EXECUTE explicitly, which can be understood by new users even without inferring the docs. I would like to hear more opinions on this topic. Personally, I find a combined statement very useful. Not only for quicker development and debugging but also for readability. It helps in keeping the JSON path and the query close to each other in order to know the origin of the plan. > but the plan and SQL are not matched. The result would be quite confusing if we still execute the plan directly, we may need to add a validation. You are right that there could be a mismatch. But we have a similar problem when executing CREATE TABLE IF NOT EXISTS. The schema or options of a table could have changed completely in the catalog but the CREATE TABLE IF NOT EXISTS is not executed again. So a mismatch could also occur there. Regards, Timo On 30.11.21 14:17, Timo Walther wrote:Hi everyone, thanks for the feedback so far. Let me answer each email indvidually. I will start with a response to Ingo's feedback: > Will the JSON plan's schema be considered an API? No, not in the first version. This is explicitly mentioned in the `General JSON Plan Assumptions`. I tried to improve the section once more to make it clearer. However, the JSON plan is definitely stable per minor version. And since the plan is versioned by Flink version, external tooling could be build around it. We might make it public API once the design has settled. > Given that upgrades across multiple versions at once are unsupported, do we verify this somehow? Good question. I extended the `General JSON Plan Assumptions`. Now yes: the Flink version is part of the JSON plan and will be verified during restore. But keep in mind that we might support more that just the last version at least until the JSON plan has been migrated. Regards, Timo On 30.11.21 09:39, Marios Trivyzas wrote:I have a question regarding the `COMPILE PLAN OVEWRITE`. If we choose to go with the config option instead, that doesn't provide the flexibility to overwrite certain plans but not others, since the config applies globally, isn't that something to consider? On Mon, Nov 29, 2021 at 10:15 AM Marios Trivyzas <mat...@gmail.com> wrote:Hi Timo! Thanks a lot for taking all that time and effort to put togetherthisproposal! Regarding:For simplification of the design, we assume that upgrades use a step sizeof a single minor version. We don't guarantee skipping minor versions (e.g. 1.11 to 1.14). I think that for this first step we should make it absolutely clear to the users that they would need to go through all intermediate versions to end up with the target version they wish. If we are to support skipping versions in the future, i.e. upgrade from 1.14 to 1.17, this means that we need to have a testing infrastructure in place that would test all possible combinations of version upgrades, i.e. from 1.14 to 1.15, from 1.14 to 1.16 and so forth, while still testing and of course supporting all the upgrades from the previous minor version. I like a lot the idea of introducing HINTS to define some behaviour in the programs! - the hints live together with the sql statements and consequently the (JSON) plans. - If multiple queries are involved in a program, each one of themcandefine its own config (regarding plan optimisation, not null enforcement, etc) I agree with Francesco on his argument regarding the *JSON* plan. I believe we should already provide flexibility here, since (who knows) in the future a JSON plan might not fulfil the desired functionality. I also agree that we need some very obvious way (i.e. not log entry) to show the users that their program doesn't support version upgrades, and prevent them from being negatively surprised in the future, when trying to upgrade their production pipelines. This is an implementation detail, but I'd like to add that there should be some good logging in place when the upgrade is taking place, to be able to track every restoration action, and help debug any potential issues arising from that. On Fri, Nov 26, 2021 at 2:54 PM Till Rohrmann <trohrm...@apache.orgwrote:Thanks for writing this FLIP Timo. I think this will be a very important improvement for Flink and our SQL user :-) Similar to Francesco I would like to understand the statementFor simplification of the design, we assume that upgrades use a stepsize of a single minor version. We don't guarantee skipping minor versions (e.g. 1.11 to 1.14). a bit better. Is it because Flink does not guarantee that a savepoint created by version 1.x can be directly recovered by version 1.y with x + 1 < y but users might have to go through a cascade of upgrades? From how I understand your proposal, the compiled plan won't be changed after being written initially. Hence, I would assume that for the plan alone Flink will have to give backwards compatibility guarantees for all versions. Am I understanding this part correctly? Cheers, Till On Thu, Nov 25, 2021 at 4:55 PM Francesco Guardiani < france...@ververica.com> wrote:Hi Timo, Thanks for putting this amazing work together, I have some considerations/questions about the FLIP: *Proposed changes #6*: Other than defining this rule of thumb, we must also make sure that compiling plans with these objects that cannot be serialized in the plan must fail hard, so users don't bite themselves with such issues, or at least we need to output warning logs. In general, whenever the user is trying to use the CompiledPlanAPIsand at the same time, they're trying to do something "illegal" for the plan, we should immediately either log or fail depending on the issue, in order to avoid any surprises once the user upgrades. I would also say the same for things like registering a function, registering a DataStream, and for every other thing which won't end up in the plan, we should log such info to the user by default. *General JSON Plan Assumptions #9:* When thinking to connectorsandformats, I think it's reasonable to assume and keep out of the feature design that no feature/ability can deleted from a connector/format. I also don't think newfeatures/abilitiescan influence this FLIP as well, given the plan is static, so if for example, MyCoolTableSink in the next flink version implements SupportsProjectionsPushDown, then it shouldn'tbea problem for the upgrade story since the plan is still configured as computedfromthe previous flink version. What worries me is breaking changes, in particular behavioural changes that might happen in connectors/formats. Although this argument doesn't seem relevant for the connectors shipped by the flink project itself, because we try tokeepthem as stable as possible and avoid eventual breaking changes, it's compelling toexternalconnectors and formats, which might be decoupled from the flink release cycle and might have different backward compatibility guarantees. It's totally reasonable if we don't want to tackle it in this first iteration of the feature, but it's something we need to keepinmind for the future. *Functions:* It's not clear to me what you mean for "identifier",becausethen somewhere else in the same context you talk about "name". Are we talking about the function name or the function complete signature? Let's assume for example we havethesefunction definitions: * TO_TIMESTAMP_LTZ(BIGINT) * TO_TIMESTAMP_LTZ(STRING) * TO_TIMESTAMP_LTZ(STRING, STRING) These for me are very different functions with differentimplementations,where each of them might evolve separately at a different pace. Hence when we storethemin the json plan we should perhaps use a logically defined unique id like /bigIntToTimestamp/, / stringToTimestamp/ and /stringToTimestampWithFormat/. This also solvestheissue of correctly referencing the functions when restoring the plan, without running again the inference logic (which might have been changed in the meantime) and it might also solve the versioning, that is the function identifier can contain the function version like / stringToTimestampWithFormat_1_1 /or /stringToTimestampWithFormat_1_2/.Analternative could be to use the string signature representation, which might not be trivial to compute, given the complexity of our type inference logic. *The term "JSON plan"*: I think we should rather keep JSON out of the concept and just name it "Compiled Plan" (like the proposed API) or something similar,as Isee how in future we might decide to support/modify our persistence format to something more efficient storage wise like BSON. For example, I would rename / CompiledPlan.fromJsonFile/ to simply /CompiledPlan.fromFile/. *Who is the owner of the plan file?* I asked myself this question when reading this:For simplification of the design, we assume that upgrades use a stepsize of a single minor version. We don't guarantee skipping minor versions (e.g. 1.11 to 1.14). My understanding of this statement is that a user can upgrade between minors but then following all the minors, the same query can remain up andrunning.E.g. Iupgrade from 1.15 to 1.16, and then from 1.16 to 1.17 and I still expect my original query to work without recomputing the plan. This necessarily means that at some pointinfuture releases we'll need some basic "migration" tool to keep the queries upandrunning, ending up modifying the compiled plan. So I guess flink should write it back in the original plan file, perhaps doing a backup of the previous one? Can you please clarify this aspect? Except these considerations, the proposal looks good to me and I'meagerlywaiting to see it in play. Thanks, FG -- Francesco Guardiani | Software Engineer france...@ververica.com[1] Follow us @VervericaData -- Join Flink Forward[2] - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbH Registered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Karl Anton Wehner, Holger Temme, Yip Park TungJason,Jinwei (Kevin) Zhang -------- [1] mailto:france...@ververica.com [2] https://flink-forward.org/-- Marios