原来你是小幸运001 created FLINK-32540: ----------------------------------
Summary: The issue of not distributing the last batch of data Key: FLINK-32540 URL: https://issues.apache.org/jira/browse/FLINK-32540 Project: Flink Issue Type: Bug Environment: The above code was executed in IntelliJ IDEA, Flink version 1.16, which also has this issue in 1.14. Other versions have not attempted it Reporter: 原来你是小幸运001 I copied the source code of the flat map and wanted to implement my own flat map. One of the logic is to issue the last piece of data at the end of the Flink job, so I executed collector.collect in the close method, but the data was not issued and the operator below cannot receive it. {code:java} import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.Utils; import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.operators.TimestampedCollector; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.util.Collector; /** * @author LaiYongBIn * @date 2023/7/5 10:09 * @Description Do SomeThing */ public class Test { public static void main(String[] args) throws Exception { ParameterTool parameter = ParameterTool.fromArgs(args); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<String> stream0 = env.addSource(new SourceFunction<String>() { @Override public void run(SourceContext<String> sourceContext) throws Exception { sourceContext.collect("TEST"); System.out.println("--------------------cancel--------------------"); } @Override public void cancel() { } }) .setParallelism(1); MyFlatMapFun flatMapFunc = new MyFlatMapFun(); TypeInformation<String> outType = TypeExtractor.getFlatMapReturnTypes(env.clean(flatMapFunc), stream0.getType(), Utils.getCallLocationName(), true); DataStream<String> flatMap = stream0.transform("Flat Map", outType, new MyStreamOperator(env.clean(flatMapFunc))).setParallelism(1); flatMap.flatMap(new FlatMapFunction<String, String>() { @Override public void flatMap(String s, Collector<String> collector) throws Exception { System.out.println("----------------------------------------Obtain upstream data is:" + s); } }); env.execute(); } } class MyStreamOperator extends AbstractUdfStreamOperator<String, FlatMapFunction<String, String>> implements OneInputStreamOperator<String, String> { private transient TimestampedCollector<String> collector; public MyStreamOperator(FlatMapFunction<String, String> userFunction) { super(userFunction); } @Override public void open() throws Exception { collector = new TimestampedCollector<>(output); } @Override public void close() throws Exception { // Distribute data during close collector.collect("close message"); } @Override public void processElement(StreamRecord<String> streamRecord) throws Exception { // do nothing } } class MyFlatMapFun implements FlatMapFunction<String, String> { @Override public void flatMap(String s, Collector<String> collector) throws Exception { // do nothing } } {code} Then I found out there was a finish method, and I tried to execute 'collector. collect' in the finish method, and the data was successfully distributed。 {code:java} import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.Utils; import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.operators.TimestampedCollector; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.util.Collector; /** * @author LaiYongBIn * @date 2023/7/5 10:09 * @Description Do SomeThing */ public class Test { public static void main(String[] args) throws Exception { ParameterTool parameter = ParameterTool.fromArgs(args); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<String> stream0 = env.addSource(new SourceFunction<String>() { @Override public void run(SourceContext<String> sourceContext) throws Exception { sourceContext.collect("TEST"); System.out.println("--------------------cancel--------------------"); } @Override public void cancel() { } }) .setParallelism(1); MyFlatMapFun flatMapFunc = new MyFlatMapFun(); TypeInformation<String> outType = TypeExtractor.getFlatMapReturnTypes(env.clean(flatMapFunc), stream0.getType(), Utils.getCallLocationName(), true); DataStream<String> flatMap = stream0.transform("Flat Map", outType, new MyStreamOperator(env.clean(flatMapFunc))).setParallelism(1); flatMap.flatMap(new FlatMapFunction<String, String>() { @Override public void flatMap(String s, Collector<String> collector) throws Exception { System.out.println("----------------------------------------Obtain upstream data is:" + s); } }); env.execute(); } } class MyStreamOperator extends AbstractUdfStreamOperator<String, FlatMapFunction<String, String>> implements OneInputStreamOperator<String, String> { private transient TimestampedCollector<String> collector; public MyStreamOperator(FlatMapFunction<String, String> userFunction) { super(userFunction); } @Override public void open() throws Exception { collector = new TimestampedCollector<>(output); } @Override public void close() throws Exception { } @Override public void finish() throws Exception { // Distribute data during finish collector.collect("close message"); } @Override public void processElement(StreamRecord<String> streamRecord) throws Exception { // do nothing } } class MyFlatMapFun implements FlatMapFunction<String, String> { @Override public void flatMap(String s, Collector<String> collector) throws Exception { // do nothing } } {code} But when I executed the program on Yarn, it was still not distributed. May I know the reason for this and how to solve it.I hope that when the program is executed on Yarn, the last batch of data can still be distributed to downstream operators -- This message was sent by Atlassian Jira (v8.20.10#820010)