[ 
https://issues.apache.org/jira/browse/FLINK-25048?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17448996#comment-17448996
 ] 

Yao Zhang commented on FLINK-25048:
-----------------------------------

Hi [~liangxinli] ,

I debugged the StreamGraph generation process and finally got the root cause.

If you chained function with side output( such as processFunction), 
getSideOutput and downstream process(print function in your example) one by 
one, the call chain will be translated to an edge, with the start vertex of a 
processFunction and end vertex of a print function. Also StreamGraph will 
attach an OutputTag as you have defined in the example to the edge. After that, 
a collector(underlying implementation is RecordWriterOutput) with the same 
outputTag will be instantiated based on this edge. When you call the collect 
method of RecordWriteOutput, before all it will check whether the output tag 
the RecordWriterOutput holds equals the one in the parameter collect method or 
not. Only if they are equal will the data pass to the next stage. That's the 
reason why you can get the output if you call getSideOutput directly after the 
function which produces the side output.

However in your example, right after invoking the processing, you called the 
assignTimestampsAndWatermarks. Thus the edge generated in StreamGraph would be 
not tagged with any output tags. Then in the collector right before the stage 
"assignTimestampsAndWatermarks", all the side output data would be ignored 
because this collector was not tagged with output tags as well. Flink currently 
is designed to work like this.

> 在某个option后设置watermark,如果这个option里面有 outputTag,则此outputTag 不会有数据输出,只会在主流输出数据
> ---------------------------------------------------------------------------
>
>                 Key: FLINK-25048
>                 URL: https://issues.apache.org/jira/browse/FLINK-25048
>             Project: Flink
>          Issue Type: Bug
>          Components: API / DataStream
>            Reporter: xinli liang
>            Priority: Major
>         Attachments: 1637770386(1).jpg, 1637770540(1).jpg, test.java
>
>
> // 1. 创建流式执行环境
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> // 2. 读取文件
> DataStreamSource<String> lineDSS = env.socketTextStream("hadoop102", 9999);
> OutputTag<String> outputTag = new OutputTag<String>("test"){};
> // 3. 转换数据格式
> SingleOutputStreamOperator<String> process = lineDSS.process(new 
> ProcessFunction<String, String>() {
> @Override
> public void processElement(String value, Context ctx, Collector<String> out) 
> throws Exception {
> String[] s = value.split(" ");
> String word = s[0];
> String ts = s[1];
> if (word.startsWith("a")) {
> out.collect(value);
> } else {
> ctx.output(outputTag, value);
> }
> }
> })
> .assignTimestampsAndWatermarks(WatermarkStrategy
> .<String>forBoundedOutOfOrderness(Duration.ofSeconds(4))
> .withTimestampAssigner((data,ts)-> Long.parseLong(data.split(" ")[1])))
> ;
> process.print("主流>>>");
> process.getSideOutput(outputTag).print("侧输出流>>>");
> // 4. 执行
> try {
> env.execute();
> } catch (Exception e) {
> e.printStackTrace();
> }



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to