Hi Jozef, Processing just a few keys (# of keys ≅ # of operators) in Flink isn’t common, from what I’ve seen.
Another possible option is to broadcast all records, and then in each operator decide what records to process, based on the operator index and the key value. Something like this in your operator's open() method: public void open(Configuration parameters) throws Exception { super.open(parameters); this.operatorIndex = getRuntimeContext().getIndexOfThisSubtask(); this.numOperators = getRuntimeContext().getIndexOfThisSubtask(); } And in your operator’s processing method... int hash = calcPositiveHashCode(key); if ((hash % this.operatorIndex) == this.numOperators) { … } — Ken > On Jan 2, 2019, at 11:32 AM, Jozef Vilcek <jozo.vil...@gmail.com> wrote: > > Thanks Ken. Yes, similar approach is suggested in post I shared in my > question. But to me it feels a bit hack-ish. > I would like to know if this is only solution with Flink or do I miss > something? > Can there be more API-ish support for such use-case from Flink? Is there a > reason why there is none? Or is there? > > On Wed, Jan 2, 2019 at 5:29 PM Ken Krugler <kkrugler_li...@transpac.com> > wrote: > >> FWIW, if you want exactly one record per operator, then this code < >> https://github.com/ScaleUnlimited/flink-crawler/blob/ba06aa87226b4c44e30aba6df68984d53cc15519/src/main/java/com/scaleunlimited/flinkcrawler/utils/FlinkUtils.java#L153> >> should generate key values that will be partitioned properly. >> >> — Ken >> >>> On Jan 2, 2019, at 12:16 AM, Jozef Vilcek <jozo.vil...@gmail.com> wrote: >>> >>> Hello, >>> >>> I am facing a problem where KeyedStream is purely parallelised on workers >>> for case where number of keys is close to parallelism. >>> >>> Some workers process zero keys, some more than one. This is because of >>> `KeyGroupRangeAssignment.assignKeyToParallelOperator()` in >>> `KeyGroupStreamPartitioner` as I found out in this post: >>> >> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Use-keyBy-to-deterministically-hash-each-record-to-a-processor-task-slot-td16483.html >>> >>> I would like to find out what are my options here. >>> * is there a reason why custom partitioner can not be used in keyed >> stream? >>> * can there be an API support for creating keys correct KeyedStream >>> compatible keys? It would also make >>> `DataStreamUtils.reinterpretAsKeyedStream()` more useable for certain >>> scenarios. >>> * any other option I have? >>> >>> Many thanks in advance. >>> >>> Best, >>> Jozef >> >> -------------------------- >> Ken Krugler >> +1 530-210-6378 >> http://www.scaleunlimited.com >> Custom big data solutions & training >> Flink, Solr, Hadoop, Cascading & Cassandra >> >> -------------------------- Ken Krugler +1 530-210-6378 http://www.scaleunlimited.com Custom big data solutions & training Flink, Solr, Hadoop, Cascading & Cassandra