Hi all,

I have a streaming job that I want at some point to duplicate stream, so
that one copy of it ends up in a sink, and another one goes down the
processing pipeline. This way I want to implement something similar to
"tee" Linux utility.

I tried a naive approach that looked like this:

val streamToMirror = env.addSource(mySource).<some operators here>
streamToMirror.addSink(someSink) // (1) tee
streamToMirror.<more operators here>.addSink(anotherSink) // (2) process
further

But the above results in the stream being split between the mySink (1) and
further processors (2) in a seemingly nondeterministic way.

I could write through Kafka as shown in this pseudocode:

val headStream = env.addSource(mySource).<some operators here>
headStream.addSink(KafkaSink("myTopic"))
val tailStream = env.addSource(KafkaSource("myTopic")).<more operators here>

But this would incur additional latency + deserialization overhead that I
would like to avoid.

Is there any way to implement stream teeing as described?

Thanks,
Yury

Reply via email to