HI Guowei,

Will check the doc out. Thanks for your help.

Best regards,
Chen-Che

On Mon, Mar 21, 2022 at 4:05 PM Guowei Ma <guowei....@gmail.com> wrote:

> Hi, Huang
> From the document[1] it seems that you need to close the iterate stream.
> such as `iteration.closeWith(feedback);`
> BTW You also could get a detailed iteration example from here [2].
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/operators/overview/#iterate
> [2]
> https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
>
> Best,
> Guowei
>
>
> On Mon, Mar 21, 2022 at 2:27 PM Chen-Che Huang <acmic...@gmail.com> wrote:
>
>> Hi all,
>>
>> We have an application where the operations on some keys depend on the
>> results of related keys. Assume that there are
>> two keys k1 and k2 that have some relationship between them. Our
>> application won't send the value for key k1 to the data sink
>> when the value for key k2 was sent to the data sink earlier. To do so, we
>> hope that our Flink application can send some value
>> information for key k2 to SideOutput and the SideOutput becomes the input
>> of the original stream (see below).
>>
>> dataSource1
>> .union(dataSource2)
>> .iterate(
>> inStream => {
>> val outStream = inStream
>> .keyBy(_.key)
>> .connect(relationshipSource)
>> .process(new CustomOperator())
>>
>> (outStream.getSideOutput(CustomOperator.Result), outStream)
>> }
>> )
>> .disableChaining()
>> .name(OperatorKey.Name).uid(OperatorKey.Name)
>>
>> However, although our Flink application can write value info to
>> SideOutput successfully, the data in SideOutput won't be
>> sent to the input stream. We wonder whether it's doable for our scenario
>> with Flink? If so, how should we modify our code to
>> achieve the goal? Many thanks for any comments.
>>
>> Best regards,
>> Chen-Che Huang
>>
>

Reply via email to