Hello Maximilian, Thanks for the suggestion. I will use it to check the program. But when I am creating a PR for the same implementation with a Test, I am getting the same error even on Travis build. So for that what would be the solution?
Here is my PR https://github.com/apache/flink/pull/923 And here is the Travis build status https://travis-ci.org/apache/flink/builds/71695078 Also on the IDE it is working fine in Collection execution mode. Thanks and Regards, Shivani On Mon, Jul 20, 2015 at 2:14 PM, Maximilian Michels <m...@apache.org> wrote: > Hi Shivani, > > Flink doesn't have enough memory to perform a hash join. You need to > provide Flink with more memory. You can either increase the > "taskmanager.heap.mb" config variable or set "taskmanager.memory.fraction" > to some value greater than 0.7 and smaller then 1.0. The first config > variable allocates more overall memory for Flink; the latter changes the > ratio between Flink managed memory (e.g. for hash join) and user memory > (for you functions and Gelly's code). > > If you run this inside an IDE, the memory is configured automatically and > you don't have control over that at the moment. You could, however, start a > local cluster (./bin/start-local) after you adjusted your flink-conf.yaml > and run your programs against that configured cluster. You can do that > either through your IDE using a RemoteEnvironment or by submitting the > packaged JAR to the local cluster using the command-line tool (./bin/flink). > > Hope that helps. > > Cheers, > Max > > On Mon, Jul 20, 2015 at 2:04 PM, Shivani Ghatge <shgha...@gmail.com> > wrote: > >> Hello, >> I am working on a problem which implements Adamic Adar Algorithm using >> Gelly. >> I am running into this exception for all the Joins (including the one >> that are part of the reduceOnNeighbors function) >> >> Too few memory segments provided. Hash Join needs at least 33 memory >> segments. >> >> >> The problem persists even when I comment out some of the joins. >> >> Even after using edg = edg.join(graph.getEdges(), >> JoinOperatorBase.JoinHint.BROADCAST_HASH_SECOND).where(0,1).equalTo(0,1).with(new >> JoinEdge()); >> >> as suggested by @AndraLungu the problem persists. >> >> The code is >> >> >> DataSet<Tuple2<Long, Long>> degrees = graph.getDegrees(); >> >> //get neighbors of each vertex in the HashSet for it's value >> computedNeighbors = graph.reduceOnNeighbors(new >> GatherNeighbors(), EdgeDirection.ALL); >> >> //get vertices with updated values for the final Graph which will >> be used to get Adamic Edges >> Vertices = computedNeighbors.join(degrees, >> JoinOperatorBase.JoinHint.BROADCAST_HASH_FIRST).where(0).equalTo(0).with(new >> JoinNeighborDegrees()); >> >> Graph<Long, Tuple3<Double, HashSet<Long>, List<Tuple3<Long, Long, >> Double>>>, Double> updatedGraph = >> Graph.fromDataSet(Vertices, edges, env); >> >> //configure Vertex Centric Iteration >> VertexCentricConfiguration parameters = new >> VertexCentricConfiguration(); >> >> parameters.setName("Find Adamic Adar Edge Weights"); >> >> parameters.setDirection(EdgeDirection.ALL); >> >> //run Vertex Centric Iteration to get the Adamic Adar Edges into >> the vertex Value >> updatedGraph = updatedGraph.runVertexCentricIteration(new >> GetAdamicAdarEdges<Long>(), new NeighborsMessenger<Long>(), 1, parameters); >> >> //Extract Vertices of the updated graph >> DataSet<Vertex<Long, Tuple3<Double, HashSet<Long>, >> List<Tuple3<Long, Long, Double>>>>> vertices = updatedGraph.getVertices(); >> >> //Extract the list of Edges from the vertex values >> DataSet<Tuple3<Long, Long, Double>> edg = vertices.flatMap(new >> GetAdamicList()); >> >> //Partial weights for the edges are added >> edg = edg.groupBy(0,1).reduce(new AdamGroup()); >> >> //Graph is updated with the Adamic Adar Edges >> edg = edg.join(graph.getEdges(), >> JoinOperatorBase.JoinHint.BROADCAST_HASH_SECOND).where(0,1).equalTo(0,1).with(new >> JoinEdge()); >> >> Any idea how I could tackle this Exception? >> > >