[ 
https://issues.apache.org/jira/browse/FLINK-25048?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17448904#comment-17448904
 ] 

Wei-Che Wei commented on FLINK-25048:
-------------------------------------

[~liangxinli] try this

> 在某个option后设置watermark,如果这个option里面有 outputTag,则此outputTag 不会有数据输出,只会在主流输出数据
> ---------------------------------------------------------------------------
>
>                 Key: FLINK-25048
>                 URL: https://issues.apache.org/jira/browse/FLINK-25048
>             Project: Flink
>          Issue Type: Bug
>          Components: API / DataStream
>            Reporter: xinli liang
>            Priority: Major
>         Attachments: 1637770386(1).jpg, 1637770540(1).jpg, test.java
>
>
> // 1. 创建流式执行环境
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> // 2. 读取文件
> DataStreamSource<String> lineDSS = env.socketTextStream("hadoop102", 9999);
> OutputTag<String> outputTag = new OutputTag<String>("test"){};
> // 3. 转换数据格式
> SingleOutputStreamOperator<String> process = lineDSS.process(new 
> ProcessFunction<String, String>() {
> @Override
> public void processElement(String value, Context ctx, Collector<String> out) 
> throws Exception {
> String[] s = value.split(" ");
> String word = s[0];
> String ts = s[1];
> if (word.startsWith("a")) {
> out.collect(value);
> } else {
> ctx.output(outputTag, value);
> }
> }
> })
> .assignTimestampsAndWatermarks(WatermarkStrategy
> .<String>forBoundedOutOfOrderness(Duration.ofSeconds(4))
> .withTimestampAssigner((data,ts)-> Long.parseLong(data.split(" ")[1])))
> ;
> process.print("主流>>>");
> process.getSideOutput(outputTag).print("侧输出流>>>");
> // 4. 执行
> try {
> env.execute();
> } catch (Exception e) {
> e.printStackTrace();
> }



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to