Hello everyone,

I want to get some insights on the KeyBy (and Rebalance) operations as
according to my understanding they partition our tasks over the defined
parallelism and thus should make our pipeline faster.

I am reading a topic which contains 170,000,000 pre-stored records with 11
Kafka partitions and replication factor of 1.   Hence I use
.setStartFromEarliest() to read the stream.
My Flink is a 4 node cluster with 3 taskmanagers, each having 10 cores and
1 job manager with 6 cores. (10 task slots per TM hence I set environment
parallelism to 30).

There are about 10,000 object IDs hence 10,000 keys.  Right now I'm keeping
the number of records fixed to get a handle on how fast they're being
processed.

When I remove keyBy, I get the same results in 39 secs as opposed to 52
secs with KeyBy. Infact, even when I vary the parallelism down to 10 or
below I still get the same extra overhead of 9 to 13secs. My data is mostly
uniformly distributed on it's key so I can rule out skew.  Rebalance
likewise has the same latency as keyBy.

 What I want to know is what may be causing this overhead? And is there any
way to decrease it?

Here's the script I'm running for testing purposes:
--------------
DataStream JSONStream  = env.addSource(new FlinkKafkaConsumer<>("data", new
JSONKeyValueDeserializationSchema(false),properties).setStartFromEarliest())

DataStream<Point> myPoints = JSONStream.map(new jsonToPoint());

mypoints.keyBy("oID").filter(new findDistancefromPOI());

public class findDistancefromPOI extends RichFilterFunction<Point> {
    public boolean filter(Point input) throws Exception {
        Double distance = computeEuclideanDist(
16.4199  , 89.974  ,input.X(),input.Y);
         return distance > 0;
    }
}

Best Regards,
Komal

Reply via email to