Anyone?
On Fri, 6 Dec 2019 at 19:07, Komal Mariam <[email protected]> wrote:
> Hello everyone,
>
> I want to get some insights on the KeyBy (and Rebalance) operations as
> according to my understanding they partition our tasks over the defined
> parallelism and thus should make our pipeline faster.
>
> I am reading a topic which contains 170,000,000 pre-stored records with 11
> Kafka partitions and replication factor of 1. Hence I use
> .setStartFromEarliest() to read the stream.
> My Flink is a 4 node cluster with 3 taskmanagers, each having 10 cores and
> 1 job manager with 6 cores. (10 task slots per TM hence I set environment
> parallelism to 30).
>
> There are about 10,000 object IDs hence 10,000 keys. Right now I'm
> keeping the number of records fixed to get a handle on how fast they're
> being processed.
>
> When I remove keyBy, I get the same results in 39 secs as opposed to 52
> secs with KeyBy. Infact, even when I vary the parallelism down to 10 or
> below I still get the same extra overhead of 9 to 13secs. My data is mostly
> uniformly distributed on it's key so I can rule out skew. Rebalance
> likewise has the same latency as keyBy.
>
> What I want to know is what may be causing this overhead? And is there
> any way to decrease it?
>
> Here's the script I'm running for testing purposes:
> --------------
> DataStream JSONStream = env.addSource(new FlinkKafkaConsumer<>("data",
> new
> JSONKeyValueDeserializationSchema(false),properties).setStartFromEarliest())
>
> DataStream<Point> myPoints = JSONStream.map(new jsonToPoint());
>
> mypoints.keyBy("oID").filter(new findDistancefromPOI());
>
> public class findDistancefromPOI extends RichFilterFunction<Point> {
> public boolean filter(Point input) throws Exception {
> Double distance = computeEuclideanDist(
> 16.4199 , 89.974 ,input.X(),input.Y);
> return distance > 0;
> }
> }
>
> Best Regards,
> Komal
>