HI Guowei, Will check the doc out. Thanks for your help.
Best regards, Chen-Che On Mon, Mar 21, 2022 at 4:05 PM Guowei Ma <guowei....@gmail.com> wrote: > Hi, Huang > From the document[1] it seems that you need to close the iterate stream. > such as `iteration.closeWith(feedback);` > BTW You also could get a detailed iteration example from here [2]. > > [1] > https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/operators/overview/#iterate > [2] > https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java > > Best, > Guowei > > > On Mon, Mar 21, 2022 at 2:27 PM Chen-Che Huang <acmic...@gmail.com> wrote: > >> Hi all, >> >> We have an application where the operations on some keys depend on the >> results of related keys. Assume that there are >> two keys k1 and k2 that have some relationship between them. Our >> application won't send the value for key k1 to the data sink >> when the value for key k2 was sent to the data sink earlier. To do so, we >> hope that our Flink application can send some value >> information for key k2 to SideOutput and the SideOutput becomes the input >> of the original stream (see below). >> >> dataSource1 >> .union(dataSource2) >> .iterate( >> inStream => { >> val outStream = inStream >> .keyBy(_.key) >> .connect(relationshipSource) >> .process(new CustomOperator()) >> >> (outStream.getSideOutput(CustomOperator.Result), outStream) >> } >> ) >> .disableChaining() >> .name(OperatorKey.Name).uid(OperatorKey.Name) >> >> However, although our Flink application can write value info to >> SideOutput successfully, the data in SideOutput won't be >> sent to the input stream. We wonder whether it's doable for our scenario >> with Flink? If so, how should we modify our code to >> achieve the goal? Many thanks for any comments. >> >> Best regards, >> Chen-Che Huang >> >