Hi I'm not sure about the answer. I have a feeling that if we only add new code below the old code(i.e., append new code after old code), the uid will not be changed.
On Tue, Jun 26, 2018 at 3:06 PM, Till Rohrmann <trohrm...@apache.org> wrote: > I think so. Maybe Fabian or Timo can correct me if I'm wrong here. > > On Mon, Jun 25, 2018 at 9:17 AM James (Jian Wu) [FDS Data Platform] < > james...@coupang.com> wrote: > >> Hi Till: >> >> >> >> Thanks for your answer, so if I just add new sql and not modified old sql >> then use `/`--allowNonRestoredState option to restart job can resume old >> sql state from savepoints? >> >> >> >> Regards >> >> >> >> James >> >> >> >> *From: *Till Rohrmann <trohrm...@apache.org> >> *Date: *Friday, June 15, 2018 at 8:13 PM >> *To: *"James (Jian Wu) [FDS Data Platform]" <james...@coupang.com> >> *Cc: *user <user@flink.apache.org>, Fabian Hueske <fhue...@apache.org>, >> Timo Walther <twal...@apache.org> >> *Subject: *Re: Restore state from save point with add new flink sql >> >> >> >> Hi James, >> >> >> >> as long as you do not change anything for `sql1`, it should work to >> recover from a savepoint if you pass the `-n`/`--allowNonRestoredState` >> option to the CLI when resuming your program from the savepoint. The reason >> is that an operators generated uid depends on the operator and on its >> inputs. >> >> >> >> I've also pulled in Fabian and Timo who will be able to tell you a little >> bit more about the job modification support for stream SQL. >> >> >> >> Cheers, >> Till >> >> >> >> On Fri, Jun 15, 2018 at 9:59 AM James (Jian Wu) [FDS Data Platform] < >> james...@coupang.com> wrote: >> >> *Hi:* >> >> >> >> * My application use flink sql, I want to add new sql to the >> application, * >> >> >> >> *For example first version is* >> >> >> >> DataStream<AggregatedOrderItems> paymentCompleteStream = >> *getKafkaStream*(env, >> bootStrapServers, kafkaGroup, orderPaymentCompleteTopic) >> .flatMap(new PaymentComplete2AggregatedOrderItemFlatMap()). >> assignTimestampsAndWatermarks(wmAssigner2).setParallelism(30) >> .returns(TypeInformation.*of*(AggregatedOrderItems.class)); >> >> tableEnv.registerDataStream("AggregatedOrderItems", >> paymentCompleteStream, *concatFieldsName*(AggregatedOrderItems.class, >> true, "eventTs")); >> >> tableEnv.registerFunction("group_concat", new GroupConcatFunction()); >> >> Table resultTable = tableEnv.sqlQuery(*sql1*); >> tableEnv.toAppendStream(resultTable, Row.class, qConfig) >> .flatMap(new E5FlatmapFunction(resultSampleRate)). >> setParallelism(30) >> .filter(new FilterFunction<DetectionResult>() { >> @Override >> public boolean filter(DetectionResult value) throws Exception >> { >> return (value.getViolationCount() >= 5); >> } >> }).addSink(new DetectionResultMySqlSink()); >> >> >> >> *Then second version, I add new sql* >> >> >> >> Table resultTable2 = tableEnv.sqlQuery(*sql2*); >> tableEnv.toAppendStream(resultTable2, Row.class, qConfig) >> .flatMap(new A2FlatmapFunction(resultSampleRate)). >> setParallelism(30) >> .filter(new FilterFunction<DetectionResult>() { >> @Override >> public boolean filter(DetectionResult value) throws Exception >> { >> return (value.getViolationCount() >= 5); >> } >> }).addSink(new DetectionResultMySqlSink()); >> >> >> >> *After restart job with savepoints, whether the original flink sql can be >> restore success? Whether the flink will assign a new UID to original sql >> operator? (I will not change the original sql)* >> >> >> >> *Regards* >> >> >> >> *James* >> >> >> >>