Hi Chesnay, I do not think so this will work. There will be
KeyGroupStreamPartitioner involved, which calls key selector and then
involve KeyGroupRangeAssignment
https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/KeyGroupStreamPartitioner.java#L60

KeyGroupRangeAssignment do murmurHash(key.hashCode())

On Thu, Jan 3, 2019 at 11:39 AM Chesnay Schepler <ches...@apache.org> wrote:

> What you could try is using a KeySelector that maps the input records to
> a range of 0-N. This should effectively behave like partitionCustom,
> except that you get a keyedStream back.
>
> On 02.01.2019 22:13, Ken Krugler wrote:
> > Hi Jozef,
> >
> > Processing just a few keys (# of keys ≅ # of operators) in Flink isn’t
> common, from what I’ve seen.
> >
> > Another possible option is to broadcast all records, and then in each
> operator decide what records to process, based on the operator index and
> the key value.
> >
> > Something like this in your operator's open() method:
> >
> >      public void open(Configuration parameters) throws Exception {
> >          super.open(parameters);
> >          this.operatorIndex =
> getRuntimeContext().getIndexOfThisSubtask();
> >          this.numOperators = getRuntimeContext().getIndexOfThisSubtask();
> >      }
> >
> > And in your operator’s processing method...
> >
> >      int hash = calcPositiveHashCode(key);
> >      if ((hash % this.operatorIndex) == this.numOperators) { … }
> >
> > — Ken
> >
> >> On Jan 2, 2019, at 11:32 AM, Jozef Vilcek <jozo.vil...@gmail.com>
> wrote:
> >>
> >> Thanks Ken. Yes, similar approach is suggested in post I shared in my
> >> question. But to me it feels a bit hack-ish.
> >> I would like to know if this is only solution with Flink or do I miss
> >> something?
> >> Can there be more API-ish support for such use-case from Flink? Is
> there a
> >> reason why there is none? Or is there?
> >>
> >> On Wed, Jan 2, 2019 at 5:29 PM Ken Krugler <kkrugler_li...@transpac.com
> >
> >> wrote:
> >>
> >>> FWIW, if you want exactly one record per operator, then this code <
> >>>
> https://github.com/ScaleUnlimited/flink-crawler/blob/ba06aa87226b4c44e30aba6df68984d53cc15519/src/main/java/com/scaleunlimited/flinkcrawler/utils/FlinkUtils.java#L153
> >
> >>> should generate key values that will be partitioned properly.
> >>>
> >>> — Ken
> >>>
> >>>> On Jan 2, 2019, at 12:16 AM, Jozef Vilcek <jozo.vil...@gmail.com>
> wrote:
> >>>>
> >>>> Hello,
> >>>>
> >>>> I am facing a problem where KeyedStream is purely parallelised on
> workers
> >>>> for case where number of keys is close to parallelism.
> >>>>
> >>>> Some workers process zero keys, some more than one. This is because of
> >>>> `KeyGroupRangeAssignment.assignKeyToParallelOperator()` in
> >>>> `KeyGroupStreamPartitioner` as I found out in this post:
> >>>>
> >>>
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Use-keyBy-to-deterministically-hash-each-record-to-a-processor-task-slot-td16483.html
> >>>> I would like to find out what are my options here.
> >>>> * is there a reason why custom partitioner can not be used in keyed
> >>> stream?
> >>>> * can there be an API support for creating keys correct KeyedStream
> >>>> compatible keys? It would also make
> >>>> `DataStreamUtils.reinterpretAsKeyedStream()` more useable for certain
> >>>> scenarios.
> >>>> * any other option I have?
> >>>>
> >>>> Many thanks in advance.
> >>>>
> >>>> Best,
> >>>> Jozef
> >>> --------------------------
> >>> Ken Krugler
> >>> +1 530-210-6378
> >>> http://www.scaleunlimited.com
> >>> Custom big data solutions & training
> >>> Flink, Solr, Hadoop, Cascading & Cassandra
> >>>
> >>>
> > --------------------------
> > Ken Krugler
> > +1 530-210-6378
> > http://www.scaleunlimited.com
> > Custom big data solutions & training
> > Flink, Solr, Hadoop, Cascading & Cassandra
> >
> >
>
>

Reply via email to