Thanks Fabian, I will try creating a toy job illustrating the issue and get back.
2016-12-20 12:58 GMT+03:00 Fabian Hueske <fhue...@gmail.com>: > Hi Yury, > > your solution should exactly solve your problem. > An operator sends all outgoing records to all connected successor > operators. > There should not be any non-deterministic behavior or splitting of records. > > Can you share some example code that produces the non-deterministic > behavior? > > Best, Fabian > > > 2016-12-20 10:50 GMT+01:00 Yury Ruchin <yuri.ruc...@gmail.com>: > >> Hi all, >> >> I have a streaming job that I want at some point to duplicate stream, so >> that one copy of it ends up in a sink, and another one goes down the >> processing pipeline. This way I want to implement something similar to >> "tee" Linux utility. >> >> I tried a naive approach that looked like this: >> >> val streamToMirror = env.addSource(mySource).<some operators here> >> streamToMirror.addSink(someSink) // (1) tee >> streamToMirror.<more operators here>.addSink(anotherSink) // (2) process >> further >> >> But the above results in the stream being split between the mySink (1) >> and further processors (2) in a seemingly nondeterministic way. >> >> I could write through Kafka as shown in this pseudocode: >> >> val headStream = env.addSource(mySource).<some operators here> >> headStream.addSink(KafkaSink("myTopic")) >> val tailStream = env.addSource(KafkaSource("myTopic")).<more operators >> here> >> >> But this would incur additional latency + deserialization overhead that I >> would like to avoid. >> >> Is there any way to implement stream teeing as described? >> >> Thanks, >> Yury >> > >