大家都好热情啊~ @IORI,这个问题取决于你是要把一个流复制成两个流分别套用不用的处理逻辑呢,还是说是要把数据根据一定的规则分开成两个流。 如果是复制的话,用@邓成刚 的方法就可以 如果是要进行数据分割的话,那用split或者sideoutput都行
Best, Kurt On Tue, Mar 26, 2019 at 10:45 AM Yun Chen <[email protected]> wrote: > split官方好像是不建议使用了,建议使用 Side Outputs > > > https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/side_output.html > > > 下面是示例参考 > > > val xOutputTag = OutputTag[String]("xx-side-output") > val xxOutputTag = OutputTag[String]("xx-side-output") > > val xxx = xxx.process(new ProcessFunction[String, String] { > override def processElement(i: String, context: ProcessFunction[String, > String]#Context, > collector: Collector[String]): Unit = { > > ....... > > arrData(0) = channel > arrData(1) = tboxinfo > > collector.collect(parse) > > context.output(channelOutputTag,String.valueOf(arrData(0))) > context.output(eventOutputTag,arrData(1)) > > } > }) > > val xStream = ouputStream.getSideOutput(xOutputTag) > val xxStream = ouputStream.getSideOutput(xxOutputTag) > > Best, > YunKillere > > ________________________________ > 发件人: 戴嘉诚 <[email protected]> > 发送时间: 2019年3月25日 19:26 > 收件人: [email protected] > 主题: 答复: flink疑问 > > 使用 Split 算子把流根据特定条件拆分成两个或者更多,然后在用select算子从拆分流中选择对应的拆分流做处理即可。 > 可以看看文档上,有介绍用法 > > https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/ > > 发件人: [email protected] > 发送时间: 2019年3月26日 10:10 > 收件人: user-zh > 主题: 回复: flink疑问 > > 一个算子出来两个流好像不能吧。 > 要想实现你说的,可以先基于A流过滤生成要进行B算子的流,基于A流过滤生成要进行C算子的流。 > > > > [email protected] > > 发件人: IORI > 发送时间: 2019-03-26 09:46 > 收件人: user-zh > 主题: flink疑问 > > 请问:数据流通过算子A时,我想分裂成两个数据流,一个数据流进行算子B操作然后sink,另外一个数据流需要先进行算子C操作,再reduce然后sink,请问这种情况应该如何处理?一个operator中能出来两个不同的数据流吗? > >
