Hi:
My application use flink sql, I want to add new sql to the application,
For example first version is
DataStream<AggregatedOrderItems> paymentCompleteStream = getKafkaStream(env,
bootStrapServers, kafkaGroup, orderPaymentCompleteTopic)
.flatMap(new
PaymentComplete2AggregatedOrderItemFlatMap()).assignTimestampsAndWatermarks(wmAssigner2).setParallelism(30)
.returns(TypeInformation.of(AggregatedOrderItems.class));
tableEnv.registerDataStream("AggregatedOrderItems", paymentCompleteStream,
concatFieldsName(AggregatedOrderItems.class, true, "eventTs"));
tableEnv.registerFunction("group_concat", new GroupConcatFunction());
Table resultTable = tableEnv.sqlQuery(sql1);
tableEnv.toAppendStream(resultTable, Row.class, qConfig)
.flatMap(new E5FlatmapFunction(resultSampleRate)).setParallelism(30)
.filter(new FilterFunction<DetectionResult>() {
@Override
public boolean filter(DetectionResult value) throws Exception {
return (value.getViolationCount() >= 5);
}
}).addSink(new DetectionResultMySqlSink());
Then second version, I add new sql
Table resultTable2 = tableEnv.sqlQuery(sql2);
tableEnv.toAppendStream(resultTable2, Row.class, qConfig)
.flatMap(new A2FlatmapFunction(resultSampleRate)).setParallelism(30)
.filter(new FilterFunction<DetectionResult>() {
@Override
public boolean filter(DetectionResult value) throws Exception {
return (value.getViolationCount() >= 5);
}
}).addSink(new DetectionResultMySqlSink());
After restart job with savepoints, whether the original flink sql can be
restore success? Whether the flink will assign a new UID to original sql
operator? (I will not change the original sql)
Regards
James