BTW we should add an entry for this to the faq and point to the configuration or faq entry in the exception message.
On 20 Jul 2015, at 15:15, Vasiliki Kalavri <vasilikikala...@gmail.com> wrote: > Hi Shivani, > > why are you using a vertex-centric iteration to compute the approximate > Adamic-Adar? > It's not an iterative computation :) > > In fact, it should be as complex (in terms of operators) as the exact > Adamic-Adar, only more efficient because of the different neighborhood > representation. Are you having the same problem with the exact computation? > > Cheers, > Vasia. > > On 20 July 2015 at 14:41, Maximilian Michels <m...@apache.org> wrote: > Hi Shivani, > > The issue is that by the time the Hash Join is executed, the MutableHashTable > cannot allocate enough memory segments. That means that your other operators > are occupying them. It is fine that this also occurs on Travis because the > workers there have limited memory as well. > > Till suggested to change the memory fraction through the > ExuectionEnvironment. Can you try that? > > Cheers, > Max > > On Mon, Jul 20, 2015 at 2:23 PM, Shivani Ghatge <shgha...@gmail.com> wrote: > Hello Maximilian, > > Thanks for the suggestion. I will use it to check the program. But when I am > creating a PR for the same implementation with a Test, I am getting the same > error even on Travis build. So for that what would be the solution? > > Here is my PR https://github.com/apache/flink/pull/923 > And here is the Travis build status > https://travis-ci.org/apache/flink/builds/71695078 > > Also on the IDE it is working fine in Collection execution mode. > > Thanks and Regards, > Shivani > > On Mon, Jul 20, 2015 at 2:14 PM, Maximilian Michels <m...@apache.org> wrote: > Hi Shivani, > > Flink doesn't have enough memory to perform a hash join. You need to provide > Flink with more memory. You can either increase the "taskmanager.heap.mb" > config variable or set "taskmanager.memory.fraction" to some value greater > than 0.7 and smaller then 1.0. The first config variable allocates more > overall memory for Flink; the latter changes the ratio between Flink managed > memory (e.g. for hash join) and user memory (for you functions and Gelly's > code). > > If you run this inside an IDE, the memory is configured automatically and you > don't have control over that at the moment. You could, however, start a local > cluster (./bin/start-local) after you adjusted your flink-conf.yaml and run > your programs against that configured cluster. You can do that either through > your IDE using a RemoteEnvironment or by submitting the packaged JAR to the > local cluster using the command-line tool (./bin/flink). > > Hope that helps. > > Cheers, > Max > > On Mon, Jul 20, 2015 at 2:04 PM, Shivani Ghatge <shgha...@gmail.com> wrote: > Hello, > I am working on a problem which implements Adamic Adar Algorithm using Gelly. > I am running into this exception for all the Joins (including the one that > are part of the reduceOnNeighbors function) > > Too few memory segments provided. Hash Join needs at least 33 memory segments. > > > The problem persists even when I comment out some of the joins. > > Even after using edg = edg.join(graph.getEdges(), > JoinOperatorBase.JoinHint.BROADCAST_HASH_SECOND).where(0,1).equalTo(0,1).with(new > JoinEdge()); > > as suggested by @AndraLungu the problem persists. > > The code is > > > DataSet<Tuple2<Long, Long>> degrees = graph.getDegrees(); > > //get neighbors of each vertex in the HashSet for it's value > computedNeighbors = graph.reduceOnNeighbors(new GatherNeighbors(), > EdgeDirection.ALL); > > //get vertices with updated values for the final Graph which will be > used to get Adamic Edges > Vertices = computedNeighbors.join(degrees, > JoinOperatorBase.JoinHint.BROADCAST_HASH_FIRST).where(0).equalTo(0).with(new > JoinNeighborDegrees()); > > Graph<Long, Tuple3<Double, HashSet<Long>, List<Tuple3<Long, Long, > Double>>>, Double> updatedGraph = > Graph.fromDataSet(Vertices, edges, env); > > //configure Vertex Centric Iteration > VertexCentricConfiguration parameters = new > VertexCentricConfiguration(); > > parameters.setName("Find Adamic Adar Edge Weights"); > > parameters.setDirection(EdgeDirection.ALL); > > //run Vertex Centric Iteration to get the Adamic Adar Edges into the > vertex Value > updatedGraph = updatedGraph.runVertexCentricIteration(new > GetAdamicAdarEdges<Long>(), new NeighborsMessenger<Long>(), 1, parameters); > > //Extract Vertices of the updated graph > DataSet<Vertex<Long, Tuple3<Double, HashSet<Long>, List<Tuple3<Long, > Long, Double>>>>> vertices = updatedGraph.getVertices(); > > //Extract the list of Edges from the vertex values > DataSet<Tuple3<Long, Long, Double>> edg = vertices.flatMap(new > GetAdamicList()); > > //Partial weights for the edges are added > edg = edg.groupBy(0,1).reduce(new AdamGroup()); > > //Graph is updated with the Adamic Adar Edges > edg = edg.join(graph.getEdges(), > JoinOperatorBase.JoinHint.BROADCAST_HASH_SECOND).where(0,1).equalTo(0,1).with(new > JoinEdge()); > > Any idea how I could tackle this Exception? > > > >