Hi,
Thanks for the suggestion. I'm trying to use the delta iteration so that I
can
get the empty work set convergence criteria for free. But since doing an
outer
join between the work set and the solution set is not possible using
cogroup, I
will try to adapt my algorithm to use the bulk iteration.
Best, Kien Truong
Sent using CloudMagic Email
[https://cloudmagic.com/k/d/mailapp?ct=ta&cv=8.0.55&pv=5.1.1&source=email_footer_2]
On Mon, Nov 16, 2015 at 11:02 PM, Stephan Ewen < se...@apache.org
[se...@apache.org] > wrote:
It is actually very important that the co group in delta iterations works
like
that. If the CoGroup touched every element in the solution set, the
"decreasing work"
effect would not happen.
The delta iterations are designed for cases where specific updates to the
solution are made, driven by the workset. Driving an operator by solution
set contents would result in a "bulk iteration"
style pattern, so the idea would be to use a proper bulk iteration for
those
cases.
Does that make sense?
On Mon, Nov 16, 2015 at 10:54 PM, Fabian Hueske < fhue...@gmail.com
[fhue...@gmail.com] > wrote:
Hi,
this is an artifact of how the solution set is internally implemented.
Usually,
a CoGroup is executed using a sort-merge strategy, i.e., both input are
sorted,
merged, and handed to the CoGroup function in a streaming fashion. Both
inputs
are treated equally, and if one of both inputs does not contain a key which
is
contained in the other input, the CoGroup function is called with an empty
iterator.
The solution set of a delta iteration is stored in a hash table (with only
one
entry per key). When a solution set is coGrouped with another data set, the
other input is sorted and probed against the hash table. The solution set
iterator of the CoGroup function will contain one element if the hash table
contains an element and be empty if the hash table doesn't contain an entry
for
the key. However, the hash table will not check that all elements of the
hash
table have been looked-up in order to identify elements of the solution set
for
which no corresponding element was present in the other data set.
So, the CoGroup with a solution set works only in one direction as stated
in the
documentation. This is kind of intended by the way the solution set CoGroup
is
implemented, but we should definitely updated the documentation to cover
this
case!
If you have a use case that requires a solution set CoGroup with the
missing
behavior you should open a JIRA issue.
Otherwise it would be great if you could also open a JIRA issue to extend
the
documentation.
Thank you, Fabian
2015-11-16 1:02 GMT+01:00 Truong Duc Kien < duckientru...@gmail.com
[duckientru...@gmail.com] > :
Hi,
When running CoGroup between the solution set and a different dataset
inside a DeltaIteration, the CoGroupFunction only get called for items
that exist in the other dataset, simillar to an inner join. This is not
the documented behavior for CoGroup:
If a DataSet has a group with no matching key in the other DataSet,
the CoGroupFunction is called with an empty group for the non-existing
group.
The following code shows the problem.
import org.apache.flink.api.scala._
import org.apache.flink.util.Collector
object CoGroupExample {
def coGroupFuntion(first: Iterator[(Int, Int)],
second: Iterator[(Int, Int)],
out: Collector[(Int, Int)]): Unit = {
if (second.hasNext) {
out.collect(second.next)
} else {
printf("Not in second set: %s\n", first.next)
println("These two lines doesn't appear when " +
"running cogroup on solution set")
}
}
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
env.getConfig.disableSysoutLogging()
val d1 = env.fromElements(
new Tuple2(1, 1),
new Tuple2(2, 1) ,
new Tuple2(3, 1)
)
d1.iterateDelta(d1, 1, Array{0}) {
(solutionSet, workSet) => {
val f = workSet.filter(_._1 != 1)
println("Cogroup on solution set with delta iteration")
val newSolutionSet = solutionSet.coGroup(f)
.where(0)
.equalTo(0)
.apply(coGroupFuntion _)
(newSolutionSet, newSolutionSet)
}
}.print()
println("Normal cogroup")
val d2 = d1.filter(_._1 != 1)
d1.coGroup(d2).where(0).equalTo(0).apply(coGroupFuntion _).print()
}
}
Is this the expected behavior or should I file a bug about this ?
Best regards,
Kien Truong