Hi Till,

thanks for your reply!

The problem with that is, that I sometimes combine two elements:

So from x0 -> x1 -> x2 I join (x0, x1) which might become x0 -> x2 in the end.

The indices from zipWithIndex then are 0 and 2, resulting in equal joins flags. Sequential elements always have to have alternating flags, which gets violated here.

Best
Fridtjof

Am 01.02.16 um 12:26 schrieb Till Rohrmann:

Hi Fridtjof,

I might miss something, but can’t you assign the ids once before starting the iteration and then reuse them throughout the iterations? Of course you would have to add another field to your input data but then you don’t have to run the |zipWithIndex| for every iteration.

Cheers,
Till

​

On Mon, Feb 1, 2016 at 11:37 AM, Fridtjof Sander <fsan...@mailbox.tu-berlin.de <mailto:fsan...@mailbox.tu-berlin.de>> wrote:

    (tried to reformat)


    Hi,

    I have a problem which seems to be unsolvable in Flink at the
    moment (1.0-Snapshot, current master branch)
    and I would kindly ask for some input, ideas on alternative
    approaches or just a confirmatory "yup, that doesn't work".

    ### Here's the situation:

    I have a dataset and its elements are totally ascending sorted by
    some key (Int). Each element has a "next-pointer" to its
    successor, which is just another field with the key of the
    following element:

    x0 -> x1 -> x2 -> x3 -> ... -> xn

    The keys are not necessarily increasing by 1, so it may be that:
    x0 has key 2 and x1 has key 10, x2 has 11, x3 has 25 and so on. I
    need to process that set in the following way:

    iterate:

    find all pairs of elements where "next == key" BUT make sure no
    element appears in multiple pairs

    example: do pair (x0, x1), (x2, x3), (x4, x5), ... but don't pair
    (x1, x2), (x3, x4), ...

    then, if some condition is met, combine a pair

    run above procedure again with switched pairing-condition:

    example: do pair (x1, x2), (x3, x4), (x5, x6), ... do not pair
    (x0, x1), (x2, x3), ..

    I hope the problem is clear...


    ### Now my approach: pseudo-scala-code:


    val indexed = input.zipWithIndex

    val flagged = indexed.map((i, el) => el.setFlag(i % 2 == 0))

    val left = flagged.filter(el => el.flag)

    val right = flagged.filter(el => !el.flag)

    left.fullOuterJoin(right)

     .where(el.next)

     .equalTo(el.key)

     ...


    I attach my elements with a temporary key, that is increasing by
    1, with zipWithIndex. Then, I map that tempKey to a boolean
    joinFlag: true if key is even, false if key is odd. Then I filter
    all elements with true, and put them in a dataset that is the left
    side of the next == key join. The right side are all elements with
    flag == false In the second run, I switch the flag construction to
    el.setFlag(i % 2 != 0).

    That actually works, there is only one problem:


    ### The problem:


    In my approach, I must not loose the total ordering of the data,
    because only if that ordering is preserved, the assignment of
    alternating join-flags works. Initially it is done by
    range-partitioning and partition-sorting. However, that ordering
    is destroyed, when data is shuffled for the join. And I can not
    restore it, because I have to run the whole thing in an iteration,
    and range-partitioning is not supported within iterations.


    ### Help?

    It sounds all very complicated, but the only thing I really have
    to solve is that join without any element appearing in multiple
    pairs (as described in "the situation"). If anyone has any idea
    how to solve this, that person would make my day so hard...

    Anyways, thanks for your time!

    Best, Fridtjof



    Am 01.02.16 um 11:32 schrieb Fridtjof Sander:
    Hi,

    I have a problem which seems to be unsolvable in Flink at the
    moment (1.0-Snapshot, current master branch)
    and I would kindly ask for some input, ideas on alternative
    approaches or just a confirmatory "yup, that doesn't work".

    ### Here's the situation:

    I have a dataset and its elements are totally ascending sorted by
    some key (Int). Each element has a "next-pointer" to its
    successor, which is just another field with the key of the
    following element: x0 -> x1 -> x2 -> x3 -> ... -> xn The keys are
    not necessarily increasing by 1, so it may be that: x0 has key 2
    and x1 has key 10, x2 has 11, x3 has 25 and so on. I need to
    process that set in the following way: iterate: find all pairs of
    elements where "next == key" BUT make sure no element appears in
    multiple pairs example: do pair (x0, x1), (x2, x3), (x4, x5), ...
    but don't pair (x1, x2), (x3, x4), ... then, if some condition is
    met, combine a pair run above procedure again with switched
    pairing-condition: example: do pair (x1, x2), (x3, x4), (x5, x6),
    ... do not pair (x0, x1), (x2, x3), .. I hope the problem is
    clear... ### Now my approach: pseudo-scala-code:

    val indexed = input.zipWithIndex val flagged = indexed.map((i,
    el) => el.setFlag(i % 2 == 0)) val left = flagged.filter(el =>
    el.flag)
    val right = flagged.filter(el => !el.flag)
    left.fullOuterJoin(right) .where(el.next) .equalTo(el.key) ... I
    attach my elements with a temporary key, that is increasing by 1,
    with zipWithIndex. Then, I map that tempKey to a boolean
    joinFlag: true if key is even, false if key is odd. Then I filter
    all elements with true, and put them in a dataset that is the
    left side of the next == key join. The right side are all
    elements with flag == false In the second run, I switch the flag
    construction to el.setFlag(i % 2 != 0). That actually works,
    there is only one problem: ### The problem: In my approach, I
    must not loose the total ordering of the data, because only if
    that ordering is preserved, the assignment of alternating
    join-flags works. Initially it is done by range-partitioning and
    partition-sorting. However, that ordering is destroyed, when data
    is shuffled for the join. And I can not restore it, because I
    have to run the whole thing in an iteration, and
    range-partitioning is not supported within iterations. ### Help?
    It sounds all very complicated, but the only thing I really have
    to solve is that join without any element appearing in multiple
    pairs (as described in "the situation"). If anyone has any idea
    how to solve this, that person would make my day so hard...
    Anyways, thanks for your time! Best, Fridtjof




Reply via email to