Thanks for your replies. We use Flink from within a standalone Java 8 application (no Hadoop, no clustering), so it's basically boils down to running a simple code like this:
import java.util.*; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.graph.*; import org.apache.flink.graph.library.CommunityDetection; public class FlinkTester { final Random random = new Random(1); final float density = 3.0F; public static void main(String[] args) throws Exception { new FlinkTester().execute(1000000, 4); } private void execute(int numEdges, int parallelism) throws Exception { final ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(parallelism); final Graph<Long, Long, Double> graph = createGraph(numEdges, env); final long start = System.currentTimeMillis(); List<Vertex<Long, Long>> vertices = graph.run(new CommunityDetection<Long>(10, 0.5)).getVertices().collect(); System.out.println(vertices.size() + " vertices processed in " + (System.currentTimeMillis()-start)/1000 + " s"); } private Graph<Long, Long, Double> createGraph(int numEdges, ExecutionEnvironment env) { System.out.println("Creating new graph of " + numEdges + " edges..."); final int maxNumVertices = (int)(numEdges/density); final Map<Long, Vertex<Long, Long>> vertexMap = new HashMap<>(maxNumVertices); final Map<String, Edge<Long, Double>> edgeMap = new HashMap<>(numEdges); while (edgeMap.size() < numEdges) { long sourceId = random.nextInt(maxNumVertices) + 1; long targetId = sourceId; while (targetId == sourceId) targetId = random.nextInt(maxNumVertices) + 1; final String edgeKey = sourceId + "#" + targetId; if (!edgeMap.containsKey(edgeKey)) { edgeMap.put(edgeKey, new Edge<>(sourceId, targetId, 1D)); if (!vertexMap.containsKey(sourceId)) vertexMap.put(sourceId, new Vertex<>(sourceId, sourceId)); if (!vertexMap.containsKey(targetId)) vertexMap.put(targetId, new Vertex<>(targetId, targetId)); } } System.out.println(edgeMap.size() + " edges created between " + vertexMap.size() + " vertices."); return Graph.fromCollection(vertexMap.values(), edgeMap.values(), env); } } No matter what graph algorithm you pick for benchmarking (above it's CommunityDetection) the bigger the graph the wider performance gap (and higher CPU/memory consumption) you observe when comparing the execution times between the old engine (<= Flink 1.4.2) and the new one (checked on 1.5.6, 1.8.2 and 1.9.1). Just run the code yourselves (you may play with the number of edges and parallel threads). Best, Jakub