My bad, the "Records Out" in the previous message should be read "Records
sent" as per Flink UI.

2016-12-20 18:42 GMT+03:00 Yury Ruchin <yuri.ruc...@gmail.com>:

> Well, it seems I figured it out. You're right, Fabian, it works the way
> you described. I wrote a simple test job:
>
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> val stream = env.fromCollection(Seq.range(0, 100))
>
> stream.addSink(new DiscardingSink[Int]).disableChaining()
> stream.map {_ => 1}.countWindowAll(100).sum(0).print().disableChaining()
>
> env.setParallelism(2)
> env.execute("tee-test-job")
>
> I saw that "Records received" was 100 for both the DiscardingSink and
> window operators. I also noticed that "Records out" for the
> fromCollection() sink was 200 - and that was the key to understanding. In
> the original job I use Kafka source, and I treated its "Records out" as the
> number of records consumed by it, but it's not true. The correct
> interpretation should be "<Records Out> = <Records Consumed> * <Number of
> successor operators>". The additional source of confusion for me was some
> inaccuracy of sampled numbers - "Records In" values in successor operators
> were not exactly equal which made me think they receive different portions
> of the stream. I believe the inaccuracy is somewhat intrinsic to live
> stream sampling, so that's fine.
>
>
> 2016-12-20 14:35 GMT+03:00 Yury Ruchin <yuri.ruc...@gmail.com>:
>
>> Thanks Fabian, I will try creating a toy job illustrating the issue and
>> get back.
>>
>> 2016-12-20 12:58 GMT+03:00 Fabian Hueske <fhue...@gmail.com>:
>>
>>> Hi Yury,
>>>
>>> your solution should exactly solve your problem.
>>> An operator sends all outgoing records to all connected successor
>>> operators.
>>> There should not be any non-deterministic behavior or splitting of
>>> records.
>>>
>>> Can you share some example code that produces the non-deterministic
>>> behavior?
>>>
>>> Best, Fabian
>>>
>>>
>>> 2016-12-20 10:50 GMT+01:00 Yury Ruchin <yuri.ruc...@gmail.com>:
>>>
>>>> Hi all,
>>>>
>>>> I have a streaming job that I want at some point to duplicate stream,
>>>> so that one copy of it ends up in a sink, and another one goes down the
>>>> processing pipeline. This way I want to implement something similar to
>>>> "tee" Linux utility.
>>>>
>>>> I tried a naive approach that looked like this:
>>>>
>>>> val streamToMirror = env.addSource(mySource).<some operators here>
>>>> streamToMirror.addSink(someSink) // (1) tee
>>>> streamToMirror.<more operators here>.addSink(anotherSink) // (2)
>>>> process further
>>>>
>>>> But the above results in the stream being split between the mySink (1)
>>>> and further processors (2) in a seemingly nondeterministic way.
>>>>
>>>> I could write through Kafka as shown in this pseudocode:
>>>>
>>>> val headStream = env.addSource(mySource).<some operators here>
>>>> headStream.addSink(KafkaSink("myTopic"))
>>>> val tailStream = env.addSource(KafkaSource("myTopic")).<more operators
>>>> here>
>>>>
>>>> But this would incur additional latency + deserialization overhead that
>>>> I would like to avoid.
>>>>
>>>> Is there any way to implement stream teeing as described?
>>>>
>>>> Thanks,
>>>> Yury
>>>>
>>>
>>>
>>
>

Reply via email to