Hi Yury, your solution should exactly solve your problem. An operator sends all outgoing records to all connected successor operators. There should not be any non-deterministic behavior or splitting of records.
Can you share some example code that produces the non-deterministic behavior? Best, Fabian 2016-12-20 10:50 GMT+01:00 Yury Ruchin <yuri.ruc...@gmail.com>: > Hi all, > > I have a streaming job that I want at some point to duplicate stream, so > that one copy of it ends up in a sink, and another one goes down the > processing pipeline. This way I want to implement something similar to > "tee" Linux utility. > > I tried a naive approach that looked like this: > > val streamToMirror = env.addSource(mySource).<some operators here> > streamToMirror.addSink(someSink) // (1) tee > streamToMirror.<more operators here>.addSink(anotherSink) // (2) process > further > > But the above results in the stream being split between the mySink (1) and > further processors (2) in a seemingly nondeterministic way. > > I could write through Kafka as shown in this pseudocode: > > val headStream = env.addSource(mySource).<some operators here> > headStream.addSink(KafkaSink("myTopic")) > val tailStream = env.addSource(KafkaSource("myTopic")).<more operators > here> > > But this would incur additional latency + deserialization overhead that I > would like to avoid. > > Is there any way to implement stream teeing as described? > > Thanks, > Yury >