As long as the inputs don't change, this should be correct.

On Tue, Jun 26, 2018 at 2:35 PM Hequn Cheng <chenghe...@gmail.com> wrote:

> Hi
>
> I'm not sure about the answer. I have a feeling that if we only add new
> code below the old code(i.e., append new code after old code), the uid will
> not be changed.
>
> On Tue, Jun 26, 2018 at 3:06 PM, Till Rohrmann <trohrm...@apache.org>
> wrote:
>
>> I think so. Maybe Fabian or Timo can correct me if I'm wrong here.
>>
>> On Mon, Jun 25, 2018 at 9:17 AM James (Jian Wu) [FDS Data Platform] <
>> james...@coupang.com> wrote:
>>
>>> Hi Till:
>>>
>>>
>>>
>>> Thanks for your answer, so if I just add new sql and not modified old
>>> sql then use `/`--allowNonRestoredState option to restart job can resume
>>> old sql state from savepoints?
>>>
>>>
>>>
>>> Regards
>>>
>>>
>>>
>>> James
>>>
>>>
>>>
>>> *From: *Till Rohrmann <trohrm...@apache.org>
>>> *Date: *Friday, June 15, 2018 at 8:13 PM
>>> *To: *"James (Jian Wu) [FDS Data Platform]" <james...@coupang.com>
>>> *Cc: *user <user@flink.apache.org>, Fabian Hueske <fhue...@apache.org>,
>>> Timo Walther <twal...@apache.org>
>>> *Subject: *Re: Restore state from save point with add new flink sql
>>>
>>>
>>>
>>> Hi James,
>>>
>>>
>>>
>>> as long as you do not change anything for `sql1`, it should work to
>>> recover from a savepoint if you pass the `-n`/`--allowNonRestoredState`
>>> option to the CLI when resuming your program from the savepoint. The reason
>>> is that an operators generated uid depends on the operator and on its
>>> inputs.
>>>
>>>
>>>
>>> I've also pulled in Fabian and Timo who will be able to tell you a
>>> little bit more about the job modification support for stream SQL.
>>>
>>>
>>>
>>> Cheers,
>>> Till
>>>
>>>
>>>
>>> On Fri, Jun 15, 2018 at 9:59 AM James (Jian Wu) [FDS Data Platform] <
>>> james...@coupang.com> wrote:
>>>
>>> *Hi:*
>>>
>>>
>>>
>>> *   My application use flink sql, I want to add new sql to the
>>> application, *
>>>
>>>
>>>
>>> *For example first version is*
>>>
>>>
>>>
>>> DataStream<AggregatedOrderItems> paymentCompleteStream =
>>> *getKafkaStream*(env, bootStrapServers, kafkaGroup,
>>> orderPaymentCompleteTopic)
>>>         .flatMap(new
>>> PaymentComplete2AggregatedOrderItemFlatMap()).assignTimestampsAndWatermarks(wmAssigner2).setParallelism(30)
>>>         .returns(TypeInformation.*of*(AggregatedOrderItems.class));
>>>
>>> tableEnv.registerDataStream("AggregatedOrderItems",
>>> paymentCompleteStream, *concatFieldsName*(AggregatedOrderItems.class,
>>> true, "eventTs"));
>>>
>>> tableEnv.registerFunction("group_concat", new GroupConcatFunction());
>>>
>>> Table resultTable = tableEnv.sqlQuery(*sql1*);
>>> tableEnv.toAppendStream(resultTable, Row.class, qConfig)
>>>         .flatMap(new
>>> E5FlatmapFunction(resultSampleRate)).setParallelism(30)
>>>         .filter(new FilterFunction<DetectionResult>() {
>>>             @Override
>>>             public boolean filter(DetectionResult value) throws
>>> Exception {
>>>                return (value.getViolationCount() >= 5);
>>>             }
>>>         }).addSink(new DetectionResultMySqlSink());
>>>
>>>
>>>
>>> *Then second version, I add new sql*
>>>
>>>
>>>
>>> Table resultTable2 = tableEnv.sqlQuery(*sql2*);
>>> tableEnv.toAppendStream(resultTable2, Row.class, qConfig)
>>>         .flatMap(new
>>> A2FlatmapFunction(resultSampleRate)).setParallelism(30)
>>>         .filter(new FilterFunction<DetectionResult>() {
>>>             @Override
>>>             public boolean filter(DetectionResult value) throws
>>> Exception {
>>>                 return (value.getViolationCount() >= 5);
>>>             }
>>>         }).addSink(new DetectionResultMySqlSink());
>>>
>>>
>>>
>>> *After restart job with savepoints, whether the original flink sql can
>>> be restore success? Whether the flink will assign a new UID to original sql
>>> operator? (I will not change the original sql)*
>>>
>>>
>>>
>>> *Regards*
>>>
>>>
>>>
>>> *James*
>>>
>>>
>>>
>>>
>

Reply via email to