We've seen the same thing here. We read files twice for the same reason, it's simply faster to do that than to connect the two pipes to the same input.
From: Sofer, Tovi [mailto:tovi.so...@citi.com] Sent: Monday, January 01, 2018 8:51 AM To: user@flink.apache.org Subject: Two operators consuming from same stream Hi group, We have the following graph below, on which we added metrics for latency calculation. We have two streams which are consumed by two operators: * ordersStream and pricesStream - they are both consumed by two operators: CoMapperA and CoMapperB, each using connect. Initially we thought that for stream consumed by two operators - that we need to duplicate the stream to two separate streams, so we did it using split as below. Then we understood it is not a must , and two operators can consume same stream, so we removed the duplicate part. However - when checking latency - we found that latency with duplicated streams was much better than without duplication (about twice). My questions: * Is the improved latency related to check pointing separately on those streams ? * What is the cons of using the duplication if it has better latency? Are we harming the state correctness in any way? Additional Info: The two graphs configuration appear exactly the same in execution plan\web UI: [sourceOrders.keyBy,CoMapperA,OrdersStreams] [cid:image006.png@01D396BB.C79CA920][cid:image007.png@01D396BB.C79CA920][cid:image014.png@01D396BB.C79CA920][cid:image015.png@01D396BB.C79CA920]prsssssssssssssss [cid:image020.png@01D396BB.C79CA920] [sourcePrices.keyBy,CoMapperB,pricesStreams] Code without duplication looks something like: KeyedStream<Order> orderKeyedStream = ordersStream.keyBy(field); KeyedStream<Price> pricesKeyedStream = pricesStream.keyBy(field); orderKeyedStream.connect(pricesKeyedStream).flatMap(mapperA); orderKeyedStream.connect(pricesKeyedStream).flatMap(mapperB); Code used for duplication: (We duplicate streams and then do connect of pricesStreamA with ordersStreamA, and pricesStreamA with ordersStreamB, and keyBy as part of connect, similar to above). //duplicate prices streams SplitStream<Price> pricesStream = pricesStream .split( price -> ImmutableList.of("pricesStreamA "," pricesStreamB ") ); DataStream<Price> pricesStreamA = pricesStreams.select("pricesStreamA"); DataStream< Price > pricesStreamB= pricesStreams.select("pricesStreamB"); Thanks, Tovi
image002.emz
Description: image002.emz
image003.emz
Description: image003.emz
image004.emz
Description: image004.emz
image016.emz
Description: image016.emz
image017.emz
Description: image017.emz
image018.emz
Description: image018.emz
image019.emz
Description: image019.emz