Hi
    上游 snapshot 的逻辑和下游收到之前的 notifyCheckpointComplete
之间是没有必然联系的,所以这个从理论上是不保证先后顺序的。
Best,
Congxian


key lou <[email protected]> 于2020年8月16日周日 下午9:27写道:

> 各位大佬:
>    在如下代码中: FCombine  执行snapshot  collect 发送数据之后如果不执行sleep 则  FSubmit
> 在执行 notifyCheckpointComplete 方法时,list 集合 ls 为空。
> 如果在  FCombine  执行snapshot  collect 发送数据之后如果执行sleep,
> 在执行 notifyCheckpointComplete 方法时 则就可以收到  snapshot  collect 发送的数据。
> 我之前的理解是每个算子在执行完checkpoint 之后 才会把 barrier 广播到下游算子。 所以觉得下游无论如何应该在执行
> notifyCheckpointComplete 之前就会收到 上游 snapshot  collect 发送数据(因为 snapshot
>  collect 在前,广播 barrier  在后,然后下游在收到了 barrier  才会执行 chekpoint
> 的相关方法,所以在执行相关方法前 上游 snapshot  collect 发出的数据就应该已经到达了下游)。
> 但是根据如下代码的测试来看,不是这样的。请大佬帮忙解答下原因。
>
> public class FlinkCheckpointTest {
>     public static void main(String[] args) throws Exception {
>         StreamExecutionEnvironment steamEnv =
> StreamExecutionEnvironment.getExecutionEnvironment();
>         steamEnv.enableCheckpointing(1000L*2);
>         steamEnv
>             .addSource(new FSource()).setParallelism(4)
>             .transform("开始事务", Types.STRING,new FStart()).setParallelism(1)
>             .process(new FCombine()).name("事务预处理").setParallelism(4)
>             .addSink(new FSubmit()).name("提交事务").setParallelism(1)
>         ;
>         steamEnv.execute("test");
>     }
>
>    static class FSource extends RichParallelSourceFunction<String>{
>         @Override
>         public void run(SourceContext<String> sourceContext) throws
> Exception {
>             int I =0;
>             while (true){
>                 I = I + 1;
>                 sourceContext.collect("thread " +
> Thread.currentThread().getId() +"-" +I);
>                 Thread.sleep(1000);
>             }
>         }
>         @Override
>         public void cancel() {}
>     }
>
>     static class FStart extends AbstractStreamOperator<String>
> implements OneInputStreamOperator<String,String>{
>        volatile Long ckid = 0L;
>         @Override
>         public void processElement(StreamRecord<String> streamRecord)
> throws Exception {
>             log("收到数据: " + streamRecord.getValue() + "..ckid:" + ckid);
>             output.collect(streamRecord);
>         }
>         @Override
>         public void prepareSnapshotPreBarrier(long checkpointId)
> throws Exception {
>             log("开启事务: " + checkpointId);
>             ckid = checkpointId;
>             super.prepareSnapshotPreBarrier(checkpointId);
>         }
>     }
>
>     static class FCombine extends ProcessFunction<String,String>
> implements CheckpointedFunction {
>         List ls = new ArrayList<String>();
>         Collector<String> collector =null;
>         volatile Long ckid = 0L;
>
>         @Override
>         public void snapshotState(FunctionSnapshotContext
> functionSnapshotContext) throws Exception {
>             StringBuffer sb = new StringBuffer();
>             ls.forEach(x->{sb.append(x).append(";");});
>             log("批处理 " + functionSnapshotContext.getCheckpointId() +
> ": 时收到数据:" + sb.toString());
>             Thread.sleep(5*1000);
>             collector.collect(sb.toString());
>             ls.clear();
>             Thread.sleep(5*1000);
>             //Thread.sleep(20*1000);
>         }
>         @Override
>         public void initializeState(FunctionInitializationContext
> functionInitializationContext) throws Exception {        }
>         @Override
>         public void processElement(String s, Context context,
> Collector<String> out) throws Exception {
>             if(StringUtils.isNotBlank(s)){
>                 ls.add(s);
>             }
>             log("收到数据 :" + s + "; 这批数据大小为:" + ls.size() + "..ckid:" +
> ckid);
>             if(collector ==null){
>                 collector = out;
>             }
>         }
>     }
>
>     static class FSubmit extends RichSinkFunction<String> implements
> /*  CheckpointedFunction,*/ CheckpointListener {
>         List ls = new ArrayList<String>();
>         volatile Long ckid = 0L;
>         @Override
>         public void notifyCheckpointComplete(long l) throws Exception {
>             ckid = l;
>             StringBuffer sb = new StringBuffer();
>             ls.forEach(x->{sb.append(x).append("||");});
>             log("submit checkpoint " + l + " over data:list size" +
> ls.size()+ "; detail" + sb.toString());
>             ls.clear();
>         }
>         @Override
>         public void invoke(String value, Context context) throws Exception
> {
>             if(StringUtils.isNotBlank(value)){
>                 ls.add(value);
>             }
>             log("收到数据 :" + value + " list zie:" + ls.size() + "..ckid:" +
> ckid);
>         }
>     }
>     public static void log(String s){
>         String name = Thread.currentThread().getName();
>         System.out.println(new SimpleDateFormat("HH:mm:ss").format(new
> Date())+":"+name + ":" + s);
>     }
> }
>

回复