Well, it seems I figured it out. You're right, Fabian, it works the way you
described. I wrote a simple test job:

val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.fromCollection(Seq.range(0, 100))

stream.addSink(new DiscardingSink[Int]).disableChaining()
stream.map {_ => 1}.countWindowAll(100).sum(0).print().disableChaining()

env.setParallelism(2)
env.execute("tee-test-job")

I saw that "Records received" was 100 for both the DiscardingSink and
window operators. I also noticed that "Records out" for the
fromCollection() sink was 200 - and that was the key to understanding. In
the original job I use Kafka source, and I treated its "Records out" as the
number of records consumed by it, but it's not true. The correct
interpretation should be "<Records Out> = <Records Consumed> * <Number of
successor operators>". The additional source of confusion for me was some
inaccuracy of sampled numbers - "Records In" values in successor operators
were not exactly equal which made me think they receive different portions
of the stream. I believe the inaccuracy is somewhat intrinsic to live
stream sampling, so that's fine.


2016-12-20 14:35 GMT+03:00 Yury Ruchin <yuri.ruc...@gmail.com>:

> Thanks Fabian, I will try creating a toy job illustrating the issue and
> get back.
>
> 2016-12-20 12:58 GMT+03:00 Fabian Hueske <fhue...@gmail.com>:
>
>> Hi Yury,
>>
>> your solution should exactly solve your problem.
>> An operator sends all outgoing records to all connected successor
>> operators.
>> There should not be any non-deterministic behavior or splitting of
>> records.
>>
>> Can you share some example code that produces the non-deterministic
>> behavior?
>>
>> Best, Fabian
>>
>>
>> 2016-12-20 10:50 GMT+01:00 Yury Ruchin <yuri.ruc...@gmail.com>:
>>
>>> Hi all,
>>>
>>> I have a streaming job that I want at some point to duplicate stream, so
>>> that one copy of it ends up in a sink, and another one goes down the
>>> processing pipeline. This way I want to implement something similar to
>>> "tee" Linux utility.
>>>
>>> I tried a naive approach that looked like this:
>>>
>>> val streamToMirror = env.addSource(mySource).<some operators here>
>>> streamToMirror.addSink(someSink) // (1) tee
>>> streamToMirror.<more operators here>.addSink(anotherSink) // (2) process
>>> further
>>>
>>> But the above results in the stream being split between the mySink (1)
>>> and further processors (2) in a seemingly nondeterministic way.
>>>
>>> I could write through Kafka as shown in this pseudocode:
>>>
>>> val headStream = env.addSource(mySource).<some operators here>
>>> headStream.addSink(KafkaSink("myTopic"))
>>> val tailStream = env.addSource(KafkaSource("myTopic")).<more operators
>>> here>
>>>
>>> But this would incur additional latency + deserialization overhead that
>>> I would like to avoid.
>>>
>>> Is there any way to implement stream teeing as described?
>>>
>>> Thanks,
>>> Yury
>>>
>>
>>
>

Reply via email to