[ 
https://issues.apache.org/jira/browse/FLINK-15670?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17019053#comment-17019053
 ] 

Stephan Ewen commented on FLINK-15670:
--------------------------------------

*Prerequisites*

The user would have to specify a Kafka Topic that as the same number of 
partitions as the maximum parallelism of the receiving operator.

*Implementation Outline Sink*

The sink would invoke Flink's partitioning the hash function and keygroup 
assignment function.
The sink the calls the Kafka Producer to send the record to that specific 
partition.

*Implementation Outline Source*

A source operator based on the current Kafka Source would simply select to read 
the partitions equal to the key group indices that it got assigned. 

For the new Sources (FLIP-27), the state will no longer be a Union State, so 
not all source tasks have access to all partition states. But the list state 
partitioning among parallel tasks follows the same logic as the keygroup range 
splitting. So as long as the number of partitions and keygroups is the same, 
the distribution should align. We need checks in the code to guard that 
assumption, though.



> Provide a Kafka Source/Sink pair that aligns Kafka's Partitions and Flink's 
> KeyGroups
> -------------------------------------------------------------------------------------
>
>                 Key: FLINK-15670
>                 URL: https://issues.apache.org/jira/browse/FLINK-15670
>             Project: Flink
>          Issue Type: Improvement
>          Components: API / DataStream, Connectors / Kafka
>            Reporter: Stephan Ewen
>            Priority: Major
>              Labels: usability
>             Fix For: 1.11.0
>
>
> This Source/Sink pair would serve two purposes:
> 1. You can read topics that are already partitioned by key and process them 
> without partitioning them again (avoid shuffles)
> 2. You can use this to shuffle through Kafka, thereby decomposing the job 
> into smaller jobs and independent pipelined regions that fail over 
> independently.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to