Hi all!

While working on FLINK-2905, I was wondering what a good (and fast) way to compute the intersect between two data sets (Gelly vertices in my case) with unknown size would be.

I came up with three ways to solve this:

Consider two sets:

DataSet<Vertex<K, VV>> verticesLeft =  this.getVertices();
DataSet<Vertex<K, VV>> verticesRight = graph.getVertices();

Way 1 (join)

intersectVertices = verticesLeft.join(verticesRight)
 .where(0)
 .equalTo(0)
 .with(new JoinFunction<Vertex<K, VV>, /* .. * ./>() {
  @Override
  public Vertex<K, VV> join(Vertex<K, VV> first, Vertex<K, VV> second)
   throws Exception {
    return first;
  }
});

Way 2 (coGroup)

intersectVertices = verticesLeft.coGroup(verticesRight)
 .where(0)
 .equalTo(0)
 .with(new CoGroupFunction<Vertex<K, VV>, /* .. */ >() {
  @Override
  public void coGroup(Iterable<Vertex<K, VV>> first,
        Iterable<Vertex<K, VV>> second,
        Collector<Vertex<K, VV>> out) throws Exception {
   Iterator<Vertex<K, VV>> leftIt = first.iterator();
   Iterator<Vertex<K, VV>> rightIt = second.iterator();
   if (leftIt.hasNext() && rightIt.hasNext()) {
    out.collect(leftIt.next());
   }
  }
 });

Way 3 (union + groupBy + aggregate)

intersectVertices = verticesLeft.union(verticesRight)
 .map(new MapFunction<Vertex<K, VV>, Tuple3<K, VV, Integer>>() {
  @Override
  public Tuple3<K, VV, Integer> map(Vertex<K, VV> vertex)
        throws Exception {
   return new Tuple3<>(vertex.f0, vertex.f1, 1);
  }
 }).withForwardedFields("f0;f1")
 .groupBy(0) // vertex id
 .aggregate(Aggregations.SUM, 2)
 .filter(new FilterFunction<Tuple3<K, VV, Integer>>() {
  @Override
  public boolean filter(Tuple3<K, VV, Integer> value) {
   return value.f2 == 2;
  }
 })
 .map(new MapFunction<Tuple3<K, VV, Integer>, Vertex<K, VV>>() {
  @Override
  public Vertex<K, VV> map(Tuple3<K, VV, Integer> vertexWithAggregate) {
   return new Vertex<>(vertexWithAggregate.f0, vertexWithAggregate.f1);
  }
 }).withForwardedFields("f0;f1");

Thanks for your input.

Best,

Martin



Reply via email to