原来你是小幸运001 created FLINK-32540:
----------------------------------

             Summary: The issue of not distributing the last batch of data
                 Key: FLINK-32540
                 URL: https://issues.apache.org/jira/browse/FLINK-32540
             Project: Flink
          Issue Type: Bug
         Environment: The above code was executed in IntelliJ IDEA, Flink 
version 1.16, which also has this issue in 1.14. Other versions have not 
attempted it

 
            Reporter: 原来你是小幸运001


I copied the source code of the flat map and wanted to implement my own flat 
map. One of the logic is to issue the last piece of data at the end of the 
Flink job, so I executed collector.collect in the close method, but the data 
was not issued and the operator below cannot receive it.
{code:java}
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.Utils;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.TimestampedCollector;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.Collector;

/**
 * @author LaiYongBIn
 * @date 2023/7/5 10:09
 * @Description Do SomeThing
 */
public class Test {
    public static void main(String[] args) throws Exception {
        ParameterTool parameter = ParameterTool.fromArgs(args);
        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<String> stream0 =
                env.addSource(new SourceFunction<String>() {
                            @Override
                            public void run(SourceContext<String> 
sourceContext) throws Exception {
                                sourceContext.collect("TEST");
                                
System.out.println("--------------------cancel--------------------");
                            }

                            @Override
                            public void cancel() {
                            }
                        })
                        .setParallelism(1);

        MyFlatMapFun flatMapFunc = new MyFlatMapFun();
        TypeInformation<String> outType = 
TypeExtractor.getFlatMapReturnTypes(env.clean(flatMapFunc), stream0.getType(), 
Utils.getCallLocationName(), true);

        DataStream<String> flatMap = stream0.transform("Flat Map", outType, new 
MyStreamOperator(env.clean(flatMapFunc))).setParallelism(1);

        flatMap.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String s, Collector<String> collector) throws 
Exception {
                
System.out.println("----------------------------------------Obtain upstream 
data is:" + s);
            }
        });

        env.execute();
    }
}


class MyStreamOperator extends AbstractUdfStreamOperator<String, 
FlatMapFunction<String, String>> implements OneInputStreamOperator<String, 
String> {


    private transient TimestampedCollector<String> collector;


    public MyStreamOperator(FlatMapFunction<String, String> userFunction) {
        super(userFunction);
    }

    @Override
    public void open() throws Exception {
        collector = new TimestampedCollector<>(output);
    }

    @Override
    public void close() throws Exception {
        // Distribute data during close
        collector.collect("close message");
    }

    @Override
    public void processElement(StreamRecord<String> streamRecord) throws 
Exception {
        // do nothing
    }
}


 class MyFlatMapFun implements FlatMapFunction<String, String> {

    @Override
    public void flatMap(String s, Collector<String> collector) throws Exception 
{
        // do nothing
    }
} {code}
Then I found out there was a finish method, and I tried to execute 'collector. 
collect' in the finish method, and the data was successfully distributed。
{code:java}
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.Utils;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.TimestampedCollector;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.Collector;

/**
 * @author LaiYongBIn
 * @date 2023/7/5 10:09
 * @Description Do SomeThing
 */
public class Test {
    public static void main(String[] args) throws Exception {
        ParameterTool parameter = ParameterTool.fromArgs(args);
        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<String> stream0 =
                env.addSource(new SourceFunction<String>() {
                            @Override
                            public void run(SourceContext<String> 
sourceContext) throws Exception {
                                sourceContext.collect("TEST");
                                
System.out.println("--------------------cancel--------------------");
                            }

                            @Override
                            public void cancel() {
                            }
                        })
                        .setParallelism(1);

        MyFlatMapFun flatMapFunc = new MyFlatMapFun();
        TypeInformation<String> outType = 
TypeExtractor.getFlatMapReturnTypes(env.clean(flatMapFunc), stream0.getType(), 
Utils.getCallLocationName(), true);

        DataStream<String> flatMap = stream0.transform("Flat Map", outType, new 
MyStreamOperator(env.clean(flatMapFunc))).setParallelism(1);

        flatMap.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String s, Collector<String> collector) throws 
Exception {
                
System.out.println("----------------------------------------Obtain upstream 
data is:" + s);
            }
        });

        env.execute();
    }
}


class MyStreamOperator extends AbstractUdfStreamOperator<String, 
FlatMapFunction<String, String>> implements OneInputStreamOperator<String, 
String> {


    private transient TimestampedCollector<String> collector;


    public MyStreamOperator(FlatMapFunction<String, String> userFunction) {
        super(userFunction);
    }

    @Override
    public void open() throws Exception {
        collector = new TimestampedCollector<>(output);
    }

    @Override
    public void close() throws Exception {

    }

    @Override
    public void finish() throws Exception {
        // Distribute data during finish
        collector.collect("close message");
    }

    @Override
    public void processElement(StreamRecord<String> streamRecord) throws 
Exception {
        // do nothing
    }
}


 class MyFlatMapFun implements FlatMapFunction<String, String> {

    @Override
    public void flatMap(String s, Collector<String> collector) throws Exception 
{
        // do nothing
    }
} {code}
But when I executed the program on Yarn, it was still not distributed. May I 
know the reason for this and how to solve it.I hope that when the program is 
executed on Yarn, the last batch of data can still be distributed to downstream 
operators



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to