xinli liang created FLINK-25048:
-----------------------------------

             Summary: 在某个option后设置watermark,如果这个option里面有 outputTag,则此outputTag 
不会有数据输出,只会在主流输出数据
                 Key: FLINK-25048
                 URL: https://issues.apache.org/jira/browse/FLINK-25048
             Project: Flink
          Issue Type: Bug
          Components: API / DataStream
            Reporter: xinli liang
         Attachments: 1637770386(1).jpg, 1637770540(1).jpg, test.java

// 1. 创建流式执行环境
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
// 2. 读取文件
DataStreamSource<String> lineDSS = env.socketTextStream("hadoop102", 9999);

OutputTag<String> outputTag = new OutputTag<String>("test"){};
// 3. 转换数据格式

SingleOutputStreamOperator<String> process = lineDSS.process(new 
ProcessFunction<String, String>() {
@Override
public void processElement(String value, Context ctx, Collector<String> out) 
throws Exception {
String[] s = value.split(" ");
String word = s[0];
String ts = s[1];
if (word.startsWith("a")) {
out.collect(value);
} else {
ctx.output(outputTag, value);
}
}
})
.assignTimestampsAndWatermarks(WatermarkStrategy
.<String>forBoundedOutOfOrderness(Duration.ofSeconds(4))
.withTimestampAssigner((data,ts)-> Long.parseLong(data.split(" ")[1])))
;

process.print("主流>>>");
process.getSideOutput(outputTag).print("侧输出流>>>");


// 4. 执行
try {
env.execute();
} catch (Exception e) {
e.printStackTrace();
}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to