Hi
上游 snapshot 的逻辑和下游收到之前的 notifyCheckpointComplete
之间是没有必然联系的,所以这个从理论上是不保证先后顺序的。
Best,
Congxian
key lou <[email protected]> 于2020年8月16日周日 下午9:27写道:
> 各位大佬:
> 在如下代码中: FCombine 执行snapshot collect 发送数据之后如果不执行sleep 则 FSubmit
> 在执行 notifyCheckpointComplete 方法时,list 集合 ls 为空。
> 如果在 FCombine 执行snapshot collect 发送数据之后如果执行sleep,
> 在执行 notifyCheckpointComplete 方法时 则就可以收到 snapshot collect 发送的数据。
> 我之前的理解是每个算子在执行完checkpoint 之后 才会把 barrier 广播到下游算子。 所以觉得下游无论如何应该在执行
> notifyCheckpointComplete 之前就会收到 上游 snapshot collect 发送数据(因为 snapshot
> collect 在前,广播 barrier 在后,然后下游在收到了 barrier 才会执行 chekpoint
> 的相关方法,所以在执行相关方法前 上游 snapshot collect 发出的数据就应该已经到达了下游)。
> 但是根据如下代码的测试来看,不是这样的。请大佬帮忙解答下原因。
>
> public class FlinkCheckpointTest {
> public static void main(String[] args) throws Exception {
> StreamExecutionEnvironment steamEnv =
> StreamExecutionEnvironment.getExecutionEnvironment();
> steamEnv.enableCheckpointing(1000L*2);
> steamEnv
> .addSource(new FSource()).setParallelism(4)
> .transform("开始事务", Types.STRING,new FStart()).setParallelism(1)
> .process(new FCombine()).name("事务预处理").setParallelism(4)
> .addSink(new FSubmit()).name("提交事务").setParallelism(1)
> ;
> steamEnv.execute("test");
> }
>
> static class FSource extends RichParallelSourceFunction<String>{
> @Override
> public void run(SourceContext<String> sourceContext) throws
> Exception {
> int I =0;
> while (true){
> I = I + 1;
> sourceContext.collect("thread " +
> Thread.currentThread().getId() +"-" +I);
> Thread.sleep(1000);
> }
> }
> @Override
> public void cancel() {}
> }
>
> static class FStart extends AbstractStreamOperator<String>
> implements OneInputStreamOperator<String,String>{
> volatile Long ckid = 0L;
> @Override
> public void processElement(StreamRecord<String> streamRecord)
> throws Exception {
> log("收到数据: " + streamRecord.getValue() + "..ckid:" + ckid);
> output.collect(streamRecord);
> }
> @Override
> public void prepareSnapshotPreBarrier(long checkpointId)
> throws Exception {
> log("开启事务: " + checkpointId);
> ckid = checkpointId;
> super.prepareSnapshotPreBarrier(checkpointId);
> }
> }
>
> static class FCombine extends ProcessFunction<String,String>
> implements CheckpointedFunction {
> List ls = new ArrayList<String>();
> Collector<String> collector =null;
> volatile Long ckid = 0L;
>
> @Override
> public void snapshotState(FunctionSnapshotContext
> functionSnapshotContext) throws Exception {
> StringBuffer sb = new StringBuffer();
> ls.forEach(x->{sb.append(x).append(";");});
> log("批处理 " + functionSnapshotContext.getCheckpointId() +
> ": 时收到数据:" + sb.toString());
> Thread.sleep(5*1000);
> collector.collect(sb.toString());
> ls.clear();
> Thread.sleep(5*1000);
> //Thread.sleep(20*1000);
> }
> @Override
> public void initializeState(FunctionInitializationContext
> functionInitializationContext) throws Exception { }
> @Override
> public void processElement(String s, Context context,
> Collector<String> out) throws Exception {
> if(StringUtils.isNotBlank(s)){
> ls.add(s);
> }
> log("收到数据 :" + s + "; 这批数据大小为:" + ls.size() + "..ckid:" +
> ckid);
> if(collector ==null){
> collector = out;
> }
> }
> }
>
> static class FSubmit extends RichSinkFunction<String> implements
> /* CheckpointedFunction,*/ CheckpointListener {
> List ls = new ArrayList<String>();
> volatile Long ckid = 0L;
> @Override
> public void notifyCheckpointComplete(long l) throws Exception {
> ckid = l;
> StringBuffer sb = new StringBuffer();
> ls.forEach(x->{sb.append(x).append("||");});
> log("submit checkpoint " + l + " over data:list size" +
> ls.size()+ "; detail" + sb.toString());
> ls.clear();
> }
> @Override
> public void invoke(String value, Context context) throws Exception
> {
> if(StringUtils.isNotBlank(value)){
> ls.add(value);
> }
> log("收到数据 :" + value + " list zie:" + ls.size() + "..ckid:" +
> ckid);
> }
> }
> public static void log(String s){
> String name = Thread.currentThread().getName();
> System.out.println(new SimpleDateFormat("HH:mm:ss").format(new
> Date())+":"+name + ":" + s);
> }
> }
>