Hi all!
While working on FLINK-2905, I was wondering what a good (and fast) way
to compute the intersect between two data sets (Gelly vertices in my
case) with unknown size would be.
I came up with three ways to solve this:
Consider two sets:
DataSet<Vertex<K, VV>> verticesLeft = this.getVertices();
DataSet<Vertex<K, VV>> verticesRight = graph.getVertices();
Way 1 (join)
intersectVertices = verticesLeft.join(verticesRight)
.where(0)
.equalTo(0)
.with(new JoinFunction<Vertex<K, VV>, /* .. * ./>() {
@Override
public Vertex<K, VV> join(Vertex<K, VV> first, Vertex<K, VV> second)
throws Exception {
return first;
}
});
Way 2 (coGroup)
intersectVertices = verticesLeft.coGroup(verticesRight)
.where(0)
.equalTo(0)
.with(new CoGroupFunction<Vertex<K, VV>, /* .. */ >() {
@Override
public void coGroup(Iterable<Vertex<K, VV>> first,
Iterable<Vertex<K, VV>> second,
Collector<Vertex<K, VV>> out) throws Exception {
Iterator<Vertex<K, VV>> leftIt = first.iterator();
Iterator<Vertex<K, VV>> rightIt = second.iterator();
if (leftIt.hasNext() && rightIt.hasNext()) {
out.collect(leftIt.next());
}
}
});
Way 3 (union + groupBy + aggregate)
intersectVertices = verticesLeft.union(verticesRight)
.map(new MapFunction<Vertex<K, VV>, Tuple3<K, VV, Integer>>() {
@Override
public Tuple3<K, VV, Integer> map(Vertex<K, VV> vertex)
throws Exception {
return new Tuple3<>(vertex.f0, vertex.f1, 1);
}
}).withForwardedFields("f0;f1")
.groupBy(0) // vertex id
.aggregate(Aggregations.SUM, 2)
.filter(new FilterFunction<Tuple3<K, VV, Integer>>() {
@Override
public boolean filter(Tuple3<K, VV, Integer> value) {
return value.f2 == 2;
}
})
.map(new MapFunction<Tuple3<K, VV, Integer>, Vertex<K, VV>>() {
@Override
public Vertex<K, VV> map(Tuple3<K, VV, Integer> vertexWithAggregate) {
return new Vertex<>(vertexWithAggregate.f0, vertexWithAggregate.f1);
}
}).withForwardedFields("f0;f1");
Thanks for your input.
Best,
Martin