As long as the inputs don't change, this should be correct. On Tue, Jun 26, 2018 at 2:35 PM Hequn Cheng <chenghe...@gmail.com> wrote:
> Hi > > I'm not sure about the answer. I have a feeling that if we only add new > code below the old code(i.e., append new code after old code), the uid will > not be changed. > > On Tue, Jun 26, 2018 at 3:06 PM, Till Rohrmann <trohrm...@apache.org> > wrote: > >> I think so. Maybe Fabian or Timo can correct me if I'm wrong here. >> >> On Mon, Jun 25, 2018 at 9:17 AM James (Jian Wu) [FDS Data Platform] < >> james...@coupang.com> wrote: >> >>> Hi Till: >>> >>> >>> >>> Thanks for your answer, so if I just add new sql and not modified old >>> sql then use `/`--allowNonRestoredState option to restart job can resume >>> old sql state from savepoints? >>> >>> >>> >>> Regards >>> >>> >>> >>> James >>> >>> >>> >>> *From: *Till Rohrmann <trohrm...@apache.org> >>> *Date: *Friday, June 15, 2018 at 8:13 PM >>> *To: *"James (Jian Wu) [FDS Data Platform]" <james...@coupang.com> >>> *Cc: *user <user@flink.apache.org>, Fabian Hueske <fhue...@apache.org>, >>> Timo Walther <twal...@apache.org> >>> *Subject: *Re: Restore state from save point with add new flink sql >>> >>> >>> >>> Hi James, >>> >>> >>> >>> as long as you do not change anything for `sql1`, it should work to >>> recover from a savepoint if you pass the `-n`/`--allowNonRestoredState` >>> option to the CLI when resuming your program from the savepoint. The reason >>> is that an operators generated uid depends on the operator and on its >>> inputs. >>> >>> >>> >>> I've also pulled in Fabian and Timo who will be able to tell you a >>> little bit more about the job modification support for stream SQL. >>> >>> >>> >>> Cheers, >>> Till >>> >>> >>> >>> On Fri, Jun 15, 2018 at 9:59 AM James (Jian Wu) [FDS Data Platform] < >>> james...@coupang.com> wrote: >>> >>> *Hi:* >>> >>> >>> >>> * My application use flink sql, I want to add new sql to the >>> application, * >>> >>> >>> >>> *For example first version is* >>> >>> >>> >>> DataStream<AggregatedOrderItems> paymentCompleteStream = >>> *getKafkaStream*(env, bootStrapServers, kafkaGroup, >>> orderPaymentCompleteTopic) >>> .flatMap(new >>> PaymentComplete2AggregatedOrderItemFlatMap()).assignTimestampsAndWatermarks(wmAssigner2).setParallelism(30) >>> .returns(TypeInformation.*of*(AggregatedOrderItems.class)); >>> >>> tableEnv.registerDataStream("AggregatedOrderItems", >>> paymentCompleteStream, *concatFieldsName*(AggregatedOrderItems.class, >>> true, "eventTs")); >>> >>> tableEnv.registerFunction("group_concat", new GroupConcatFunction()); >>> >>> Table resultTable = tableEnv.sqlQuery(*sql1*); >>> tableEnv.toAppendStream(resultTable, Row.class, qConfig) >>> .flatMap(new >>> E5FlatmapFunction(resultSampleRate)).setParallelism(30) >>> .filter(new FilterFunction<DetectionResult>() { >>> @Override >>> public boolean filter(DetectionResult value) throws >>> Exception { >>> return (value.getViolationCount() >= 5); >>> } >>> }).addSink(new DetectionResultMySqlSink()); >>> >>> >>> >>> *Then second version, I add new sql* >>> >>> >>> >>> Table resultTable2 = tableEnv.sqlQuery(*sql2*); >>> tableEnv.toAppendStream(resultTable2, Row.class, qConfig) >>> .flatMap(new >>> A2FlatmapFunction(resultSampleRate)).setParallelism(30) >>> .filter(new FilterFunction<DetectionResult>() { >>> @Override >>> public boolean filter(DetectionResult value) throws >>> Exception { >>> return (value.getViolationCount() >= 5); >>> } >>> }).addSink(new DetectionResultMySqlSink()); >>> >>> >>> >>> *After restart job with savepoints, whether the original flink sql can >>> be restore success? Whether the flink will assign a new UID to original sql >>> operator? (I will not change the original sql)* >>> >>> >>> >>> *Regards* >>> >>> >>> >>> *James* >>> >>> >>> >>> >