Hi Tai, I was referring to co-partitioning, not co-location of leaders,
i.e. multiple topics that share the same partitioning scheme. By example,
say I have 2 topics which share the same keyspace and which are produced by
something other than Flink using identical partitioner. The data in these 2
topics is already co-partitioned and shouldn't require any shuffle for
aggregations by key. From the partition assignment code and your
explanation (and the docs) I understand that the kafka connector will
assign the partitions to operator subtasks by iterating over the list,
thereby most likely breaking the existing co-partitioning, and so even if I
provide partitionCustom with the exact same code that was used to partition
the data in the external producer, any aggregation by the existing message
key will have to still incur the unnecessary shuffle. Correct ? Or does
partitionCustom on the data stream somehow override the behaviour of
assignPartitions on the source ?


On Mon, Jun 20, 2016 at 11:01 AM, Tai Gordon <tzuli...@gmail.com> wrote:

> Hi Michal,
>
> Whether or not the external system's partitioning scheme is referenced when
> assigning
> partitions to the consumer parallel subtasks depends on the implementation
> of each connector / source.
>
> First, clarification on “co-partitioning": from your context I’m assuming
> you’re referring to co-location of Kafka partition leaders? If so, as you
> correctly identified, the current Kafka consumer connecter does not take
> the co-location of Kafka partitions into account when assigning partitions.
> To exploit this, though, even if the deterministic assignment takes leader
> location into account, subtasks will also need to be co-located with the
> leaders. As far as I know, there’s currently no way for subtask’s to access
> their location info at runtime.
> On the other hand, if you’re referring to “co-partitioning the data"
> consumed from Kafka, you can use a custom partitioner on the consumed data
> stream:
>
> https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/#physical-partitioning
> .
>
> Hope this helps!
>
> Regards,
> Gordon
>
> On June 16, 2016 at 7:50:05 PM, Michal Hariš (michal.har...@gmail.com)
> wrote:
>
> Hi, I was recently looking into a kafka connector issue (FLINK-4023 /
> FLINK-4069), when it was pointed out that partition assignment will not be
> deterministic if the partition discovery is imply moved to the open()
> method. In the assignPartitions of FlinkKafkaConsumerBase a modulo on the
> __index__ of the total list of topic-partitions subscribed to is used. It
> is clear that calling it from the open() method in each task can produce
> different lists and so some partitions can be consumed multiple times while
> others not consumed at all. In a related discussion it was suggested to
> build this list in a deterministic way so that each partitions sees the
> same index for the same topic-partition. This would work for the issues
> above, but it highlighted for me another issue which relates to partition
> assignment itself - hence starting a different thread.
>
> It don't understand at this point the way how Flink does co-group on
> multiple topics but having worked in the Kafka zone for a number of years,
> ignoring the physical partition id which is deterministic at the Kafka
> cluster level, and using a transient list (even if it is constructed
> deterministically) means that co-partitioning cannot be exploited for a
> straight co-group and Flink has to always do its own shuffle. I think using
> getPartition() on each topic-partition instead of list index in the
> assignPartition is necessary, even if it may result in an unbalanced work
> distribution among Flink consumer instances. But it seems to me that in
> Flink, the partitioning schemes that exist outside its runtime are ignored.
> Is it because any source outside Flink's realm is treated as to be imported
> and no partitioning is assumed for simplicity/control or is it because this
> is expected to produce even load-balancing of work? What else am I missing?
>
> Michal
>

Reply via email to