Thank you @vino yang <yanghua1...@gmail.com>  for the reply. I suspect
keyBy will beneficial in those cases where my subsequent operators are
computationally intensive. Their computation time being > than network
reshuffling cost.

Regards,
Komal

On Mon, 9 Dec 2019 at 15:23, vino yang <yanghua1...@gmail.com> wrote:

> Hi Komal,
>
> KeyBy(Hash Partition, logically partition) and rebalance(physical
> partition) are both one of the partitions been supported by Flink.[1]
>
> Generally speaking, partitioning may cause network communication(network
> shuffles) costs which may cause more time cost. The example provided by you
> may be benefit from operator chain[2] if you remove the keyBy operation.
>
> Best,
> Vino
>
> [1]:
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/#datastream-transformations
> [2]:
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/concepts/runtime.html#tasks-and-operator-chains
>
> Komal Mariam <komal.mar...@gmail.com> 于2019年12月9日周一 上午9:11写道:
>
>> Anyone?
>>
>> On Fri, 6 Dec 2019 at 19:07, Komal Mariam <komal.mar...@gmail.com> wrote:
>>
>>> Hello everyone,
>>>
>>> I want to get some insights on the KeyBy (and Rebalance) operations as
>>> according to my understanding they partition our tasks over the defined
>>> parallelism and thus should make our pipeline faster.
>>>
>>> I am reading a topic which contains 170,000,000 pre-stored records with
>>> 11 Kafka partitions and replication factor of 1.   Hence I use
>>> .setStartFromEarliest() to read the stream.
>>> My Flink is a 4 node cluster with 3 taskmanagers, each having 10 cores
>>> and 1 job manager with 6 cores. (10 task slots per TM hence I set
>>> environment parallelism to 30).
>>>
>>> There are about 10,000 object IDs hence 10,000 keys.  Right now I'm
>>> keeping the number of records fixed to get a handle on how fast they're
>>> being processed.
>>>
>>> When I remove keyBy, I get the same results in 39 secs as opposed to 52
>>> secs with KeyBy. Infact, even when I vary the parallelism down to 10 or
>>> below I still get the same extra overhead of 9 to 13secs. My data is mostly
>>> uniformly distributed on it's key so I can rule out skew.  Rebalance
>>> likewise has the same latency as keyBy.
>>>
>>>  What I want to know is what may be causing this overhead? And is there
>>> any way to decrease it?
>>>
>>> Here's the script I'm running for testing purposes:
>>> --------------
>>> DataStream JSONStream  = env.addSource(new FlinkKafkaConsumer<>("data",
>>> new
>>> JSONKeyValueDeserializationSchema(false),properties).setStartFromEarliest())
>>>
>>> DataStream<Point> myPoints = JSONStream.map(new jsonToPoint());
>>>
>>> mypoints.keyBy("oID").filter(new findDistancefromPOI());
>>>
>>> public class findDistancefromPOI extends RichFilterFunction<Point> {
>>>     public boolean filter(Point input) throws Exception {
>>>         Double distance = computeEuclideanDist(
>>> 16.4199  , 89.974  ,input.X(),input.Y);
>>>          return distance > 0;
>>>     }
>>> }
>>>
>>> Best Regards,
>>> Komal
>>>
>>

Reply via email to