Hi Chesnay, I do not think so this will work. There will be KeyGroupStreamPartitioner involved, which calls key selector and then involve KeyGroupRangeAssignment https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/KeyGroupStreamPartitioner.java#L60
KeyGroupRangeAssignment do murmurHash(key.hashCode()) On Thu, Jan 3, 2019 at 11:39 AM Chesnay Schepler <ches...@apache.org> wrote: > What you could try is using a KeySelector that maps the input records to > a range of 0-N. This should effectively behave like partitionCustom, > except that you get a keyedStream back. > > On 02.01.2019 22:13, Ken Krugler wrote: > > Hi Jozef, > > > > Processing just a few keys (# of keys ≅ # of operators) in Flink isn’t > common, from what I’ve seen. > > > > Another possible option is to broadcast all records, and then in each > operator decide what records to process, based on the operator index and > the key value. > > > > Something like this in your operator's open() method: > > > > public void open(Configuration parameters) throws Exception { > > super.open(parameters); > > this.operatorIndex = > getRuntimeContext().getIndexOfThisSubtask(); > > this.numOperators = getRuntimeContext().getIndexOfThisSubtask(); > > } > > > > And in your operator’s processing method... > > > > int hash = calcPositiveHashCode(key); > > if ((hash % this.operatorIndex) == this.numOperators) { … } > > > > — Ken > > > >> On Jan 2, 2019, at 11:32 AM, Jozef Vilcek <jozo.vil...@gmail.com> > wrote: > >> > >> Thanks Ken. Yes, similar approach is suggested in post I shared in my > >> question. But to me it feels a bit hack-ish. > >> I would like to know if this is only solution with Flink or do I miss > >> something? > >> Can there be more API-ish support for such use-case from Flink? Is > there a > >> reason why there is none? Or is there? > >> > >> On Wed, Jan 2, 2019 at 5:29 PM Ken Krugler <kkrugler_li...@transpac.com > > > >> wrote: > >> > >>> FWIW, if you want exactly one record per operator, then this code < > >>> > https://github.com/ScaleUnlimited/flink-crawler/blob/ba06aa87226b4c44e30aba6df68984d53cc15519/src/main/java/com/scaleunlimited/flinkcrawler/utils/FlinkUtils.java#L153 > > > >>> should generate key values that will be partitioned properly. > >>> > >>> — Ken > >>> > >>>> On Jan 2, 2019, at 12:16 AM, Jozef Vilcek <jozo.vil...@gmail.com> > wrote: > >>>> > >>>> Hello, > >>>> > >>>> I am facing a problem where KeyedStream is purely parallelised on > workers > >>>> for case where number of keys is close to parallelism. > >>>> > >>>> Some workers process zero keys, some more than one. This is because of > >>>> `KeyGroupRangeAssignment.assignKeyToParallelOperator()` in > >>>> `KeyGroupStreamPartitioner` as I found out in this post: > >>>> > >>> > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Use-keyBy-to-deterministically-hash-each-record-to-a-processor-task-slot-td16483.html > >>>> I would like to find out what are my options here. > >>>> * is there a reason why custom partitioner can not be used in keyed > >>> stream? > >>>> * can there be an API support for creating keys correct KeyedStream > >>>> compatible keys? It would also make > >>>> `DataStreamUtils.reinterpretAsKeyedStream()` more useable for certain > >>>> scenarios. > >>>> * any other option I have? > >>>> > >>>> Many thanks in advance. > >>>> > >>>> Best, > >>>> Jozef > >>> -------------------------- > >>> Ken Krugler > >>> +1 530-210-6378 > >>> http://www.scaleunlimited.com > >>> Custom big data solutions & training > >>> Flink, Solr, Hadoop, Cascading & Cassandra > >>> > >>> > > -------------------------- > > Ken Krugler > > +1 530-210-6378 > > http://www.scaleunlimited.com > > Custom big data solutions & training > > Flink, Solr, Hadoop, Cascading & Cassandra > > > > > >