Hi Fridtjof,
I might miss something, but can’t you assign the ids once
before starting the iteration and then reuse them throughout
the iterations? Of course you would have to add another field
to your input data but then you don’t have to run the
|zipWithIndex| for every iteration.
Cheers,
Till
On Mon, Feb 1, 2016 at 11:37 AM, Fridtjof Sander
<fsan...@mailbox.tu-berlin.de
<mailto:fsan...@mailbox.tu-berlin.de>> wrote:
(tried to reformat)
Hi,
I have a problem which seems to be unsolvable in Flink at
the moment (1.0-Snapshot, current master branch)
and I would kindly ask for some input, ideas on
alternative approaches or just a confirmatory "yup, that
doesn't work".
### Here's the situation:
I have a dataset and its elements are totally ascending
sorted by some key (Int). Each element has a
"next-pointer" to its successor, which is just another
field with the key of the following element:
x0 -> x1 -> x2 -> x3 -> ... -> xn
The keys are not necessarily increasing by 1, so it may be
that: x0 has key 2 and x1 has key 10, x2 has 11, x3 has 25
and so on. I need to process that set in the following way:
iterate:
find all pairs of elements where "next == key" BUT make
sure no element appears in multiple pairs
example: do pair (x0, x1), (x2, x3), (x4, x5), ... but
don't pair (x1, x2), (x3, x4), ...
then, if some condition is met, combine a pair
run above procedure again with switched pairing-condition:
example: do pair (x1, x2), (x3, x4), (x5, x6), ... do not
pair (x0, x1), (x2, x3), ..
I hope the problem is clear...
### Now my approach: pseudo-scala-code:
val indexed = input.zipWithIndex
val flagged = indexed.map((i, el) => el.setFlag(i % 2 == 0))
val left = flagged.filter(el => el.flag)
val right = flagged.filter(el => !el.flag)
left.fullOuterJoin(right)
.where(el.next)
.equalTo(el.key)
...
I attach my elements with a temporary key, that is
increasing by 1, with zipWithIndex. Then, I map that
tempKey to a boolean joinFlag: true if key is even, false
if key is odd. Then I filter all elements with true, and
put them in a dataset that is the left side of the next ==
key join. The right side are all elements with flag ==
false In the second run, I switch the flag construction to
el.setFlag(i % 2 != 0).
That actually works, there is only one problem:
### The problem:
In my approach, I must not loose the total ordering of the
data, because only if that ordering is preserved, the
assignment of alternating join-flags works. Initially it
is done by range-partitioning and partition-sorting.
However, that ordering is destroyed, when data is shuffled
for the join. And I can not restore it, because I have to
run the whole thing in an iteration, and
range-partitioning is not supported within iterations.
### Help?
It sounds all very complicated, but the only thing I
really have to solve is that join without any element
appearing in multiple pairs (as described in "the
situation"). If anyone has any idea how to solve this,
that person would make my day so hard...
Anyways, thanks for your time!
Best, Fridtjof
Am 01.02.16 um 11:32 schrieb Fridtjof Sander:
Hi,
I have a problem which seems to be unsolvable in Flink at
the moment (1.0-Snapshot, current master branch)
and I would kindly ask for some input, ideas on
alternative approaches or just a confirmatory "yup, that
doesn't work".
### Here's the situation:
I have a dataset and its elements are totally ascending
sorted by some key (Int). Each element has a
"next-pointer" to its successor, which is just another
field with the key of the following element: x0 -> x1 ->
x2 -> x3 -> ... -> xn The keys are not necessarily
increasing by 1, so it may be that: x0 has key 2 and x1
has key 10, x2 has 11, x3 has 25 and so on. I need to
process that set in the following way: iterate: find all
pairs of elements where "next == key" BUT make sure no
element appears in multiple pairs example: do pair (x0,
x1), (x2, x3), (x4, x5), ... but don't pair (x1, x2),
(x3, x4), ... then, if some condition is met, combine a
pair run above procedure again with switched
pairing-condition: example: do pair (x1, x2), (x3, x4),
(x5, x6), ... do not pair (x0, x1), (x2, x3), .. I hope
the problem is clear... ### Now my approach:
pseudo-scala-code:
val indexed = input.zipWithIndex val flagged =
indexed.map((i, el) => el.setFlag(i % 2 == 0)) val left =
flagged.filter(el => el.flag)
val right = flagged.filter(el => !el.flag)
left.fullOuterJoin(right) .where(el.next)
.equalTo(el.key) ... I attach my elements with a
temporary key, that is increasing by 1, with
zipWithIndex. Then, I map that tempKey to a boolean
joinFlag: true if key is even, false if key is odd. Then
I filter all elements with true, and put them in a
dataset that is the left side of the next == key join.
The right side are all elements with flag == false In the
second run, I switch the flag construction to
el.setFlag(i % 2 != 0). That actually works, there is
only one problem: ### The problem: In my approach, I must
not loose the total ordering of the data, because only if
that ordering is preserved, the assignment of alternating
join-flags works. Initially it is done by
range-partitioning and partition-sorting. However, that
ordering is destroyed, when data is shuffled for the
join. And I can not restore it, because I have to run the
whole thing in an iteration, and range-partitioning is
not supported within iterations. ### Help? It sounds all
very complicated, but the only thing I really have to
solve is that join without any element appearing in
multiple pairs (as described in "the situation"). If
anyone has any idea how to solve this, that person would
make my day so hard... Anyways, thanks for your time!
Best, Fridtjof