My bad, the "Records Out" in the previous message should be read "Records sent" as per Flink UI.
2016-12-20 18:42 GMT+03:00 Yury Ruchin <yuri.ruc...@gmail.com>: > Well, it seems I figured it out. You're right, Fabian, it works the way > you described. I wrote a simple test job: > > val env = StreamExecutionEnvironment.getExecutionEnvironment > val stream = env.fromCollection(Seq.range(0, 100)) > > stream.addSink(new DiscardingSink[Int]).disableChaining() > stream.map {_ => 1}.countWindowAll(100).sum(0).print().disableChaining() > > env.setParallelism(2) > env.execute("tee-test-job") > > I saw that "Records received" was 100 for both the DiscardingSink and > window operators. I also noticed that "Records out" for the > fromCollection() sink was 200 - and that was the key to understanding. In > the original job I use Kafka source, and I treated its "Records out" as the > number of records consumed by it, but it's not true. The correct > interpretation should be "<Records Out> = <Records Consumed> * <Number of > successor operators>". The additional source of confusion for me was some > inaccuracy of sampled numbers - "Records In" values in successor operators > were not exactly equal which made me think they receive different portions > of the stream. I believe the inaccuracy is somewhat intrinsic to live > stream sampling, so that's fine. > > > 2016-12-20 14:35 GMT+03:00 Yury Ruchin <yuri.ruc...@gmail.com>: > >> Thanks Fabian, I will try creating a toy job illustrating the issue and >> get back. >> >> 2016-12-20 12:58 GMT+03:00 Fabian Hueske <fhue...@gmail.com>: >> >>> Hi Yury, >>> >>> your solution should exactly solve your problem. >>> An operator sends all outgoing records to all connected successor >>> operators. >>> There should not be any non-deterministic behavior or splitting of >>> records. >>> >>> Can you share some example code that produces the non-deterministic >>> behavior? >>> >>> Best, Fabian >>> >>> >>> 2016-12-20 10:50 GMT+01:00 Yury Ruchin <yuri.ruc...@gmail.com>: >>> >>>> Hi all, >>>> >>>> I have a streaming job that I want at some point to duplicate stream, >>>> so that one copy of it ends up in a sink, and another one goes down the >>>> processing pipeline. This way I want to implement something similar to >>>> "tee" Linux utility. >>>> >>>> I tried a naive approach that looked like this: >>>> >>>> val streamToMirror = env.addSource(mySource).<some operators here> >>>> streamToMirror.addSink(someSink) // (1) tee >>>> streamToMirror.<more operators here>.addSink(anotherSink) // (2) >>>> process further >>>> >>>> But the above results in the stream being split between the mySink (1) >>>> and further processors (2) in a seemingly nondeterministic way. >>>> >>>> I could write through Kafka as shown in this pseudocode: >>>> >>>> val headStream = env.addSource(mySource).<some operators here> >>>> headStream.addSink(KafkaSink("myTopic")) >>>> val tailStream = env.addSource(KafkaSource("myTopic")).<more operators >>>> here> >>>> >>>> But this would incur additional latency + deserialization overhead that >>>> I would like to avoid. >>>> >>>> Is there any way to implement stream teeing as described? >>>> >>>> Thanks, >>>> Yury >>>> >>> >>> >> >