What you could try is using a KeySelector that maps the input records to
a range of 0-N. This should effectively behave like partitionCustom,
except that you get a keyedStream back.
On 02.01.2019 22:13, Ken Krugler wrote:
Hi Jozef,
Processing just a few keys (# of keys ≅ # of operators) in Flink isn’t common,
from what I’ve seen.
Another possible option is to broadcast all records, and then in each operator
decide what records to process, based on the operator index and the key value.
Something like this in your operator's open() method:
public void open(Configuration parameters) throws Exception {
super.open(parameters);
this.operatorIndex = getRuntimeContext().getIndexOfThisSubtask();
this.numOperators = getRuntimeContext().getIndexOfThisSubtask();
}
And in your operator’s processing method...
int hash = calcPositiveHashCode(key);
if ((hash % this.operatorIndex) == this.numOperators) { … }
— Ken
On Jan 2, 2019, at 11:32 AM, Jozef Vilcek <jozo.vil...@gmail.com> wrote:
Thanks Ken. Yes, similar approach is suggested in post I shared in my
question. But to me it feels a bit hack-ish.
I would like to know if this is only solution with Flink or do I miss
something?
Can there be more API-ish support for such use-case from Flink? Is there a
reason why there is none? Or is there?
On Wed, Jan 2, 2019 at 5:29 PM Ken Krugler <kkrugler_li...@transpac.com>
wrote:
FWIW, if you want exactly one record per operator, then this code <
https://github.com/ScaleUnlimited/flink-crawler/blob/ba06aa87226b4c44e30aba6df68984d53cc15519/src/main/java/com/scaleunlimited/flinkcrawler/utils/FlinkUtils.java#L153>
should generate key values that will be partitioned properly.
— Ken
On Jan 2, 2019, at 12:16 AM, Jozef Vilcek <jozo.vil...@gmail.com> wrote:
Hello,
I am facing a problem where KeyedStream is purely parallelised on workers
for case where number of keys is close to parallelism.
Some workers process zero keys, some more than one. This is because of
`KeyGroupRangeAssignment.assignKeyToParallelOperator()` in
`KeyGroupStreamPartitioner` as I found out in this post:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Use-keyBy-to-deterministically-hash-each-record-to-a-processor-task-slot-td16483.html
I would like to find out what are my options here.
* is there a reason why custom partitioner can not be used in keyed
stream?
* can there be an API support for creating keys correct KeyedStream
compatible keys? It would also make
`DataStreamUtils.reinterpretAsKeyedStream()` more useable for certain
scenarios.
* any other option I have?
Many thanks in advance.
Best,
Jozef
--------------------------
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra
--------------------------
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra