I still have them.

To be clear: I do need to compare each element with its successor, I just can't have one element to be paired multiple times in the same step. That's why I divide the join into two steps:

data: x0 -> x1 -> x2

The first join will only pair (x0, x1). These may or may not be combined to one element. However, x2 will be in the result-set (of the first join) anyway: Elements will only be removed if they are paired with others and combined with them.

If x0 and x1 are combined to x12, then the second join will pair (x12, x2). If not, it will pair (x1, x2) and x0 will be in the result-set (of the second join).

This two-step join happens inside of an iteration.

If I wouldn't have this two-step join, I would pair (x0, x1) and (x1, x2). In the event that both are combined, x1 is used twice to build a new element, and that must not happen.


Am 1. Februar 2016 13:00:05 MEZ, schrieb Till Rohrmann <trohrm...@apache.org>:

   In the described case, can it be that you still have elements with
   `id % 2 == 1` in your data set or are they filtered out? If they are
   filtered out, then you can simply shift the indices for each
   iteration to the right.

   On Mon, Feb 1, 2016 at 12:32 PM, Fridtjof Sander
   <fsan...@mailbox.tu-berlin.de <mailto:fsan...@mailbox.tu-berlin.de>>
   wrote:

       Hi Till,

       thanks for your reply!

       The problem with that is, that I sometimes combine two elements:

       So from x0 -> x1 -> x2 I join (x0, x1) which might become x0 ->
       x2 in the end.

       The indices from zipWithIndex then are 0 and 2, resulting in
       equal joins flags. Sequential elements always have to have
       alternating flags, which gets violated here.

       Best
       Fridtjof

       Am 01.02.16 um 12:26 schrieb Till Rohrmann:

        Hi Fridtjof,

        I might miss something, but can’t you assign the ids once
        before starting the iteration and then reuse them throughout
        the iterations? Of course you would have to add another field
        to your input data but then you don’t have to run the
        |zipWithIndex| for every iteration.

        Cheers,
        Till

        ​

        On Mon, Feb 1, 2016 at 11:37 AM, Fridtjof Sander
        <fsan...@mailbox.tu-berlin.de
        <mailto:fsan...@mailbox.tu-berlin.de>> wrote:

            (tried to reformat)


            Hi,

            I have a problem which seems to be unsolvable in Flink at
            the moment (1.0-Snapshot, current master branch)
            and I would kindly ask for some input, ideas on
            alternative approaches or just a confirmatory "yup, that
            doesn't work".

            ### Here's the situation:

            I have a dataset and its elements are totally ascending
            sorted by some key (Int). Each element has a
            "next-pointer" to its successor, which is just another
            field with the key of the following element:

            x0 -> x1 -> x2 -> x3 -> ... -> xn

            The keys are not necessarily increasing by 1, so it may be
            that: x0 has key 2 and x1 has key 10, x2 has 11, x3 has 25
            and so on. I need to process that set in the following way:

            iterate:

            find all pairs of elements where "next == key" BUT make
            sure no element appears in multiple pairs

            example: do pair (x0, x1), (x2, x3), (x4, x5), ... but
            don't pair (x1, x2), (x3, x4), ...

            then, if some condition is met, combine a pair

            run above procedure again with switched pairing-condition:

            example: do pair (x1, x2), (x3, x4), (x5, x6), ... do not
            pair (x0, x1), (x2, x3), ..

            I hope the problem is clear...


            ### Now my approach: pseudo-scala-code:


            val indexed = input.zipWithIndex

            val flagged = indexed.map((i, el) => el.setFlag(i % 2 == 0))

            val left = flagged.filter(el => el.flag)

            val right = flagged.filter(el => !el.flag)

            left.fullOuterJoin(right)

             .where(el.next)

             .equalTo(el.key)

             ...


            I attach my elements with a temporary key, that is
            increasing by 1, with zipWithIndex. Then, I map that
            tempKey to a boolean joinFlag: true if key is even, false
            if key is odd. Then I filter all elements with true, and
            put them in a dataset that is the left side of the next ==
            key join. The right side are all elements with flag ==
            false In the second run, I switch the flag construction to
            el.setFlag(i % 2 != 0).

            That actually works, there is only one problem:


            ### The problem:


            In my approach, I must not loose the total ordering of the
            data, because only if that ordering is preserved, the
            assignment of alternating join-flags works. Initially it
            is done by range-partitioning and partition-sorting.
            However, that ordering is destroyed, when data is shuffled
            for the join. And I can not restore it, because I have to
            run the whole thing in an iteration, and
            range-partitioning is not supported within iterations.


            ### Help?

            It sounds all very complicated, but the only thing I
            really have to solve is that join without any element
            appearing in multiple pairs (as described in "the
            situation"). If anyone has any idea how to solve this,
            that person would make my day so hard...

            Anyways, thanks for your time!

            Best, Fridtjof



            Am 01.02.16 um 11:32 schrieb Fridtjof Sander:
            Hi,

            I have a problem which seems to be unsolvable in Flink at
            the moment (1.0-Snapshot, current master branch)
            and I would kindly ask for some input, ideas on
            alternative approaches or just a confirmatory "yup, that
            doesn't work".

            ### Here's the situation:

            I have a dataset and its elements are totally ascending
            sorted by some key (Int). Each element has a
            "next-pointer" to its successor, which is just another
            field with the key of the following element: x0 -> x1 ->
            x2 -> x3 -> ... -> xn The keys are not necessarily
            increasing by 1, so it may be that: x0 has key 2 and x1
            has key 10, x2 has 11, x3 has 25 and so on. I need to
            process that set in the following way: iterate: find all
            pairs of elements where "next == key" BUT make sure no
            element appears in multiple pairs example: do pair (x0,
            x1), (x2, x3), (x4, x5), ... but don't pair (x1, x2),
            (x3, x4), ... then, if some condition is met, combine a
            pair run above procedure again with switched
            pairing-condition: example: do pair (x1, x2), (x3, x4),
            (x5, x6), ... do not pair (x0, x1), (x2, x3), .. I hope
            the problem is clear... ### Now my approach:
            pseudo-scala-code:

            val indexed = input.zipWithIndex val flagged =
            indexed.map((i, el) => el.setFlag(i % 2 == 0)) val left =
            flagged.filter(el => el.flag)
            val right = flagged.filter(el => !el.flag)
            left.fullOuterJoin(right) .where(el.next)
            .equalTo(el.key) ... I attach my elements with a
            temporary key, that is increasing by 1, with
            zipWithIndex. Then, I map that tempKey to a boolean
            joinFlag: true if key is even, false if key is odd. Then
            I filter all elements with true, and put them in a
            dataset that is the left side of the next == key join.
            The right side are all elements with flag == false In the
            second run, I switch the flag construction to
            el.setFlag(i % 2 != 0). That actually works, there is
            only one problem: ### The problem: In my approach, I must
            not loose the total ordering of the data, because only if
            that ordering is preserved, the assignment of alternating
            join-flags works. Initially it is done by
            range-partitioning and partition-sorting. However, that
            ordering is destroyed, when data is shuffled for the
            join. And I can not restore it, because I have to run the
            whole thing in an iteration, and range-partitioning is
            not supported within iterations. ### Help? It sounds all
            very complicated, but the only thing I really have to
            solve is that join without any element appearing in
            multiple pairs (as described in "the situation"). If
            anyone has any idea how to solve this, that person would
            make my day so hard... Anyways, thanks for your time!
            Best, Fridtjof





Reply via email to