So, the context here is that I am running an Apache Beam application on
Flink and Keyed stream is used e.g. to provide sharding for writing to
filesystem.
E.g. each worker is owning one or two shards and writing GBK windowed
values into files. So any custom IO which needs too shape parallelism could
need this, e.g. interact / query external services.

On Wed, Jan 2, 2019 at 10:13 PM Ken Krugler <kkrugler_li...@transpac.com>
wrote:

> Hi Jozef,
>
> Processing just a few keys (# of keys ≅ # of operators) in Flink isn’t
> common, from what I’ve seen.
>
> Another possible option is to broadcast all records, and then in each
> operator decide what records to process, based on the operator index and
> the key value.
>
> Something like this in your operator's open() method:
>
>     public void open(Configuration parameters) throws Exception {
>         super.open(parameters);
>         this.operatorIndex = getRuntimeContext().getIndexOfThisSubtask();
>         this.numOperators = getRuntimeContext().getIndexOfThisSubtask();
>     }
>
> And in your operator’s processing method...
>
>     int hash = calcPositiveHashCode(key);
>     if ((hash % this.operatorIndex) == this.numOperators) { … }
>
> — Ken
>
> > On Jan 2, 2019, at 11:32 AM, Jozef Vilcek <jozo.vil...@gmail.com> wrote:
> >
> > Thanks Ken. Yes, similar approach is suggested in post I shared in my
> > question. But to me it feels a bit hack-ish.
> > I would like to know if this is only solution with Flink or do I miss
> > something?
> > Can there be more API-ish support for such use-case from Flink? Is there
> a
> > reason why there is none? Or is there?
> >
> > On Wed, Jan 2, 2019 at 5:29 PM Ken Krugler <kkrugler_li...@transpac.com>
> > wrote:
> >
> >> FWIW, if you want exactly one record per operator, then this code <
> >>
> https://github.com/ScaleUnlimited/flink-crawler/blob/ba06aa87226b4c44e30aba6df68984d53cc15519/src/main/java/com/scaleunlimited/flinkcrawler/utils/FlinkUtils.java#L153
> >
> >> should generate key values that will be partitioned properly.
> >>
> >> — Ken
> >>
> >>> On Jan 2, 2019, at 12:16 AM, Jozef Vilcek <jozo.vil...@gmail.com>
> wrote:
> >>>
> >>> Hello,
> >>>
> >>> I am facing a problem where KeyedStream is purely parallelised on
> workers
> >>> for case where number of keys is close to parallelism.
> >>>
> >>> Some workers process zero keys, some more than one. This is because of
> >>> `KeyGroupRangeAssignment.assignKeyToParallelOperator()` in
> >>> `KeyGroupStreamPartitioner` as I found out in this post:
> >>>
> >>
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Use-keyBy-to-deterministically-hash-each-record-to-a-processor-task-slot-td16483.html
> >>>
> >>> I would like to find out what are my options here.
> >>> * is there a reason why custom partitioner can not be used in keyed
> >> stream?
> >>> * can there be an API support for creating keys correct KeyedStream
> >>> compatible keys? It would also make
> >>> `DataStreamUtils.reinterpretAsKeyedStream()` more useable for certain
> >>> scenarios.
> >>> * any other option I have?
> >>>
> >>> Many thanks in advance.
> >>>
> >>> Best,
> >>> Jozef
> >>
> >> --------------------------
> >> Ken Krugler
> >> +1 530-210-6378
> >> http://www.scaleunlimited.com
> >> Custom big data solutions & training
> >> Flink, Solr, Hadoop, Cascading & Cassandra
> >>
> >>
>
> --------------------------
> Ken Krugler
> +1 530-210-6378
> http://www.scaleunlimited.com
> Custom big data solutions & training
> Flink, Solr, Hadoop, Cascading & Cassandra
>
>

Reply via email to