Hi all,
I’m currently working in my research on a variant of Flink which allows to 
assign subpartitions while runtime to TaskExecutors (just having a look at 
joins for the moment). In this variant, I made some changes to the 
ResultPartitionDeploymentDescriptor which lead to the creation of many more 
subpartitions than InputChannels. (eg. usually we would have 4 subpartitions 
and 4 InputChannels with 4 receivers - let’s now imagine we have 40 
subpartitions, 4 InputChannels and 4 receivers)
Without further changes, the receivers request their single subpartition - 
receive it and are done. Only 4 of 40 subpartitions are consumed and also only 
1/10 of the data is transferred. (This setup runs without a problem. 
Nevertheless it’s of course not desired to read only 10% of the data!)
Now I would like to decide while execution to give receivers the additional 36 
subpartitions to consume.
My idea was to let the receivers still think receiving their initial 
subpartition index and add the data for other subpartitions on sender side.
I know that we have to separate between local and remote subpartitions.
In case of local subpartitions this is the SingleInputGate. It usually holds 
multiple InputChannels. For the local case, the InputChannel is a 
LocalInputChannel. This LocalInputChannel itself holds one 
PipelinedSubpartitionView.
Here I add more views for additional subpartition indices on the fly.
In the end, buffers from different subpartitions arrive and are deserialized by 
the AbstractRecordReader.
Unfortunately I’m loosing data when consuming concurrently from several 
subpartitions when using a low over partitioning factor. Have not identified 
why yet…
But more problematic: When reading records which are taller than only some 
Integer values, I get Exceptions while deserialization. That’s probably the 
case as soon as one record does not match into one buffer anymore. Am I right?
If that is the case, instead of adding PipelinedSubpartitionViews - I should 
add complete new InputGates for subpartition assignments. (having new 
InputChannels, deserializers …)
The InputGates are already created with task instantiation and are described by 
the InputGateDeploymentDescriptor.
The challenge is now to add SingleInputGates to a running task.

Maybe someone has an idea where to start with this or has any other thoughts - 
maybe there is an other solution I’m missing.

Thank you.
Benjamin Burkhardt

Reply via email to