Well, it seems I figured it out. You're right, Fabian, it works the way you described. I wrote a simple test job:
val env = StreamExecutionEnvironment.getExecutionEnvironment val stream = env.fromCollection(Seq.range(0, 100)) stream.addSink(new DiscardingSink[Int]).disableChaining() stream.map {_ => 1}.countWindowAll(100).sum(0).print().disableChaining() env.setParallelism(2) env.execute("tee-test-job") I saw that "Records received" was 100 for both the DiscardingSink and window operators. I also noticed that "Records out" for the fromCollection() sink was 200 - and that was the key to understanding. In the original job I use Kafka source, and I treated its "Records out" as the number of records consumed by it, but it's not true. The correct interpretation should be "<Records Out> = <Records Consumed> * <Number of successor operators>". The additional source of confusion for me was some inaccuracy of sampled numbers - "Records In" values in successor operators were not exactly equal which made me think they receive different portions of the stream. I believe the inaccuracy is somewhat intrinsic to live stream sampling, so that's fine. 2016-12-20 14:35 GMT+03:00 Yury Ruchin <yuri.ruc...@gmail.com>: > Thanks Fabian, I will try creating a toy job illustrating the issue and > get back. > > 2016-12-20 12:58 GMT+03:00 Fabian Hueske <fhue...@gmail.com>: > >> Hi Yury, >> >> your solution should exactly solve your problem. >> An operator sends all outgoing records to all connected successor >> operators. >> There should not be any non-deterministic behavior or splitting of >> records. >> >> Can you share some example code that produces the non-deterministic >> behavior? >> >> Best, Fabian >> >> >> 2016-12-20 10:50 GMT+01:00 Yury Ruchin <yuri.ruc...@gmail.com>: >> >>> Hi all, >>> >>> I have a streaming job that I want at some point to duplicate stream, so >>> that one copy of it ends up in a sink, and another one goes down the >>> processing pipeline. This way I want to implement something similar to >>> "tee" Linux utility. >>> >>> I tried a naive approach that looked like this: >>> >>> val streamToMirror = env.addSource(mySource).<some operators here> >>> streamToMirror.addSink(someSink) // (1) tee >>> streamToMirror.<more operators here>.addSink(anotherSink) // (2) process >>> further >>> >>> But the above results in the stream being split between the mySink (1) >>> and further processors (2) in a seemingly nondeterministic way. >>> >>> I could write through Kafka as shown in this pseudocode: >>> >>> val headStream = env.addSource(mySource).<some operators here> >>> headStream.addSink(KafkaSink("myTopic")) >>> val tailStream = env.addSource(KafkaSource("myTopic")).<more operators >>> here> >>> >>> But this would incur additional latency + deserialization overhead that >>> I would like to avoid. >>> >>> Is there any way to implement stream teeing as described? >>> >>> Thanks, >>> Yury >>> >> >> >