[ https://issues.apache.org/jira/browse/FLINK-25048?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17448904#comment-17448904 ]
Wei-Che Wei commented on FLINK-25048: ------------------------------------- [~liangxinli] try this > 在某个option后设置watermark,如果这个option里面有 outputTag,则此outputTag 不会有数据输出,只会在主流输出数据 > --------------------------------------------------------------------------- > > Key: FLINK-25048 > URL: https://issues.apache.org/jira/browse/FLINK-25048 > Project: Flink > Issue Type: Bug > Components: API / DataStream > Reporter: xinli liang > Priority: Major > Attachments: 1637770386(1).jpg, 1637770540(1).jpg, test.java > > > // 1. 创建流式执行环境 > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > // 2. 读取文件 > DataStreamSource<String> lineDSS = env.socketTextStream("hadoop102", 9999); > OutputTag<String> outputTag = new OutputTag<String>("test"){}; > // 3. 转换数据格式 > SingleOutputStreamOperator<String> process = lineDSS.process(new > ProcessFunction<String, String>() { > @Override > public void processElement(String value, Context ctx, Collector<String> out) > throws Exception { > String[] s = value.split(" "); > String word = s[0]; > String ts = s[1]; > if (word.startsWith("a")) { > out.collect(value); > } else { > ctx.output(outputTag, value); > } > } > }) > .assignTimestampsAndWatermarks(WatermarkStrategy > .<String>forBoundedOutOfOrderness(Duration.ofSeconds(4)) > .withTimestampAssigner((data,ts)-> Long.parseLong(data.split(" ")[1]))) > ; > process.print("主流>>>"); > process.getSideOutput(outputTag).print("侧输出流>>>"); > // 4. 执行 > try { > env.execute(); > } catch (Exception e) { > e.printStackTrace(); > } -- This message was sent by Atlassian Jira (v8.20.1#820001)