Hi Tovi,
I think your code without duplication performs two separate shuffle
operations whereas the other code only performs one shuffle.
Further latency impacts might be due to the overhead involved in
maintaining the partitioning for a keyed stream/key groups and switching
key contexts in the operator.
Did you check the latency of the following?
DataStream<> ds =
orderKeyedStream.connect(pricesKeyedStream).flatMap(identityMapper);
ds.flatMap(mapperA);
ds.flatMap(mapperB);
Regards,
Timo
Am 1/1/18 um 2:50 PM schrieb Sofer, Tovi :
Hi group,
We have the following graph below, on which we added metrics for
latency calculation.
We have two streams which are consumed by two operators:
·ordersStream and pricesStream – they are both consumed by two
operators: CoMapperA and CoMapperB, each using connect.
Initially we thought that for stream consumed by two operators – that
we need to duplicate the stream to two separate streams, so we did it
using split as below.
Then we understood it is not a must , and two operators can consume
same stream, so we removed the duplicate part.
However – when checking latency – we found that latency with
duplicated streams was much better than without duplication (about twice).
_My questions: _
·Is the improved latency related to check pointing separately on those
streams ?
·What is the cons of using the duplication if it has better latency?
Are we harming the state correctness in any way?
_Additional Info:_
The two graphs configuration appear exactly the same in execution
plan\web UI:
sourceOrders.keyBy,CoMapperA,OrdersStreams
prsssssssssssssss
sourcePrices.keyBy,CoMapperB,pricesStreams
_Code without duplication looks something like: _
KeyedStream<Order> orderKeyedStream = ordersStream.keyBy(field);
KeyedStream<Price> pricesKeyedStream = pricesStream.keyBy(field);
orderKeyedStream.connect(pricesKeyedStream).flatMap(mapperA);
orderKeyedStream.connect(pricesKeyedStream).flatMap(mapperB);
_Code used for duplication: _
(We duplicate streams and then do connect of pricesStreamA with
ordersStreamA, and pricesStreamA with ordersStreamB, and keyBy as part
of connect, similar to above).__
//duplicate prices streams
SplitStream<Price> pricesStream = pricesStream
.split( price -> ImmutableList.of("pricesStreamA ","
pricesStreamB ") );
DataStream<Price> pricesStreamA =
pricesStreams.select("pricesStreamA");
DataStream< Price > pricesStreamB=
pricesStreams.select("pricesStreamB");
Thanks,
Tovi