[ https://issues.apache.org/jira/browse/FLINK-25048?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17448996#comment-17448996 ]
Yao Zhang commented on FLINK-25048: ----------------------------------- Hi [~liangxinli] , I debugged the StreamGraph generation process and finally got the root cause. If you chained function with side output( such as processFunction), getSideOutput and downstream process(print function in your example) one by one, the call chain will be translated to an edge, with the start vertex of a processFunction and end vertex of a print function. Also StreamGraph will attach an OutputTag as you have defined in the example to the edge. After that, a collector(underlying implementation is RecordWriterOutput) with the same outputTag will be instantiated based on this edge. When you call the collect method of RecordWriteOutput, before all it will check whether the output tag the RecordWriterOutput holds equals the one in the parameter collect method or not. Only if they are equal will the data pass to the next stage. That's the reason why you can get the output if you call getSideOutput directly after the function which produces the side output. However in your example, right after invoking the processing, you called the assignTimestampsAndWatermarks. Thus the edge generated in StreamGraph would be not tagged with any output tags. Then in the collector right before the stage "assignTimestampsAndWatermarks", all the side output data would be ignored because this collector was not tagged with output tags as well. Flink currently is designed to work like this. > 在某个option后设置watermark,如果这个option里面有 outputTag,则此outputTag 不会有数据输出,只会在主流输出数据 > --------------------------------------------------------------------------- > > Key: FLINK-25048 > URL: https://issues.apache.org/jira/browse/FLINK-25048 > Project: Flink > Issue Type: Bug > Components: API / DataStream > Reporter: xinli liang > Priority: Major > Attachments: 1637770386(1).jpg, 1637770540(1).jpg, test.java > > > // 1. 创建流式执行环境 > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > // 2. 读取文件 > DataStreamSource<String> lineDSS = env.socketTextStream("hadoop102", 9999); > OutputTag<String> outputTag = new OutputTag<String>("test"){}; > // 3. 转换数据格式 > SingleOutputStreamOperator<String> process = lineDSS.process(new > ProcessFunction<String, String>() { > @Override > public void processElement(String value, Context ctx, Collector<String> out) > throws Exception { > String[] s = value.split(" "); > String word = s[0]; > String ts = s[1]; > if (word.startsWith("a")) { > out.collect(value); > } else { > ctx.output(outputTag, value); > } > } > }) > .assignTimestampsAndWatermarks(WatermarkStrategy > .<String>forBoundedOutOfOrderness(Duration.ofSeconds(4)) > .withTimestampAssigner((data,ts)-> Long.parseLong(data.split(" ")[1]))) > ; > process.print("主流>>>"); > process.getSideOutput(outputTag).print("侧输出流>>>"); > // 4. 执行 > try { > env.execute(); > } catch (Exception e) { > e.printStackTrace(); > } -- This message was sent by Atlassian Jira (v8.20.1#820001)