Hi all, I have a streaming job that I want at some point to duplicate stream, so that one copy of it ends up in a sink, and another one goes down the processing pipeline. This way I want to implement something similar to "tee" Linux utility.
I tried a naive approach that looked like this: val streamToMirror = env.addSource(mySource).<some operators here> streamToMirror.addSink(someSink) // (1) tee streamToMirror.<more operators here>.addSink(anotherSink) // (2) process further But the above results in the stream being split between the mySink (1) and further processors (2) in a seemingly nondeterministic way. I could write through Kafka as shown in this pseudocode: val headStream = env.addSource(mySource).<some operators here> headStream.addSink(KafkaSink("myTopic")) val tailStream = env.addSource(KafkaSource("myTopic")).<more operators here> But this would incur additional latency + deserialization overhead that I would like to avoid. Is there any way to implement stream teeing as described? Thanks, Yury