Hi Miguel, I'm sorry for the late reply; this e-mail got stuck in my spam folder. I'm glad that you've found a solution :)
I've never used flink with docker, so I'm probably not the best person to advise you on this. However, if I understand correctly, you're changing the configuration before submitting the job but while the flink cluster is already running. I don't know if docker is supposed to do something differently, but after a flink cluster has been started, nodes won't reload any changes you make to the flink-conf.yaml. You'll either have to make your changes before starting the cluster or re-start. Cheers, -Vasia. On 14 November 2016 at 18:33, Miguel Coimbra <miguel.e.coim...@gmail.com> wrote: > Hello, > > I believe I have figured this out. > > First, I tried Aandrey Melentyev's suggestion of executing with Apache > Flink 1.1.3, both with default conf/flink-conf.yaml parameters as well as > with some changes to provide additional memory. However, the same error > happened. > > Note: I changed my project's pom.xml and generated the .jar again using > Maven. > I also copied the new .jar to both Docker instances. > > The test machine has 256 GB RAM and it is a scenario of two Docker > containers. > I send attached the relevant parts of the logs of the JobManager and of > the TaskManager. > Regarding memory in the TaskManager log, I was looking at a couple of > executions and noticed something strange: > > 2016-11-14 15:48:45,256 INFO org.apache.flink.runtime.io.ne > twork.buffer.NetworkBufferPool - Allocated 64 MB for network buffer pool > (number of memory segments: 2048, bytes per segment: 32768). > 2016-11-14 15:48:45,413 INFO org.apache.flink.runtime.taskm > anager.TaskManager - Limiting managed memory to 0.7 of the > currently free heap space (310 MB), memory will be allocated lazily. > > After that, I looked at the start of the TaskManager log and found this: > > 2016-11-14 15:48:38,843 INFO org.apache.flink.runtime.taskm > anager.TaskManager - Starting TaskManager (Version: 1.1.3, > Rev:8e8d454, Date:10.10.2016 @ 13:26:32 UTC) > 2016-11-14 15:48:38,843 INFO org.apache.flink.runtime.taskm > anager.TaskManager - Current user: flink > 2016-11-14 15:48:38,844 INFO org.apache.flink.runtime.taskm > anager.TaskManager - JVM: OpenJDK 64-Bit Server VM - Oracle > Corporation - 1.8/25.92-b14 > 2016-11-14 15:48:38,844 INFO org.apache.flink.runtime.taskm > anager.TaskManager - Maximum heap size: 512 MiBytes > 2016-11-14 15:48:38,844 INFO org.apache.flink.runtime.taskm > anager.TaskManager - JAVA_HOME: > /usr/lib/jvm/java-1.8-openjdk/jre > 2016-11-14 15:48:38,850 INFO org.apache.flink.runtime.taskm > anager.TaskManager - Hadoop version: 2.7.2 > 2016-11-14 15:48:38,850 INFO org.apache.flink.runtime.taskm > anager.TaskManager - JVM Options: > 2016-11-14 15:48:38,850 INFO org.apache.flink.runtime.taskm > anager.TaskManager - -XX:+UseG1GC > > > *2016-11-14 15:48:38,850 INFO > org.apache.flink.runtime.taskmanager.TaskManager - > -Xms512M2016-11-14 15:48:38,850 INFO > org.apache.flink.runtime.taskmanager.TaskManager - > -Xmx512M*2016-11-14 15:48:38,850 INFO org.apache.flink.runtime.taskm > anager.TaskManager - -XX:MaxDirectMemorySize=8388607T > > It seems it is running with only 512 MB, which is the default. > This in spite of me having edited the flink-conf.yaml file before > invoking the program for the cluster. > I looked at the log of the JobManager and the same thing happened: it was > using the default 256 MB instead of my 1024MB. > > - To recap, I built the Docker Flink image with (I send the Dockerfile > attached): > > cd docker-flink-image-builder/ > ls > Dockerfile Dockerfile~ README.md README.md~ > bluemix-docker-compose.sh* build.sh* docker-compose-bluemix.yml > ./build.sh > > The only file I changed from those is the Dockerfile. > This set of files was obtained from the Flink repository. > I used docker-compose up to start the standalone cluster: > > screen > cd docker-flink-image-builder/ > ls > Dockerfile Dockerfile~ README.md README.md~ > bluemix-docker-compose.sh* build.sh* docker-compose-bluemix.yml > docker-compose.yml docker-entrypoint.sh* > docker-compose up > > Then I accessed each Docker instance: > > docker exec -it $(docker ps --filter name=jobmanager --format={{.ID}}) > /bin/sh > docker exec -it $(docker ps --filter > name=dockerflinkimagebuilder_taskmanager_1 > --format={{.ID}}) /bin/sh > > While inside each of those, I started a bash shell and changed the config > file like so: > > bash > cd /home/myuser/docker-image-build-context/flink-1.1.3/conf > vi flink-conf.yaml > > I have edited (on both the JobManager and the TaskManager) the following > settings: > > # The heap size for the JobManager JVM > jobmanager.heap.mb: 1024 > > # The heap size for the TaskManager JVM > taskmanager.heap.mb: 4096 > > # The number of buffers for the network stack. > taskmanager.network.numberOfBuffers: 4096 > > It seems that changes I make to the flink-config.yaml file *are only > reflected after I kill the cluster and call *docker-compose up again. > > docker exec -it $(docker ps --filter name=jobmanager --format={{.ID}}) > flink run -m 707a534982e6:6123 -c flink.graph.example.App > /home/myuser/flink-graph-example-0.0.1-SNAPSHOT.jar > /home/myuser/com-dblp.ungraph.txt > > The cluster now started with the correct memory values, but the result was > the same (it is in the logs). > However, I then doubled the memory again, so that I had: > > # The heap size for the JobManager JVM > jobmanager.heap.mb: 2048 > > # The heap size for the TaskManager JVM > taskmanager.heap.mb: 8192 > > After this, I killed the cluster (CTRL+C) on the screen which had started > it (graceful exit). > This time, after starting again with docker-compose up, I launched the > program again and it worked! > > However, there is something I don't understand, perhaps because I am new > to the Docker ecosystem. > When do the changes to the flink-conf.yaml file get activated? > > From my understanding, I have to do this: > > 1 - Launch cluster with docker-compose up > 2 - exec -it into each of the Docker instances and manually edit the > configuration file > 3 - CTRL+C to gracefully kill cluster > 4 - Relaunch cluster - it will now display correct heap values for the > JobManager and TaskManager. > > *This is cumbersome.* > I know I can make my own scripts to automate this, but is this really the > correct way to launch a Flink standalone cluster on Docker with *custom > memory options?* > > Should I instead change the Dockerfile to include a custom flink-conf.yaml > file when building the image? (so this would be taken right from the start) > What is the correct way to tackle this? > > Thank you very much! > > Output is below in case you are curious: > > myuser@myserver:~/docker-flink-image-builder$ docker exec -it $(docker ps > --filter name=jobmanager --format={{.ID}}) flink run -m 707a534982e6:6123 > -c flink.graph.example.App /home/myuser/flink-graph-example-0.0.1-SNAPSHOT.jar > /home/myuser/com-dblp.ungraph.txt > Cluster configuration: Standalone cluster with JobManager at / > 172.19.0.2:6123 > Using address 172.19.0.2:6123 to connect to JobManager. > JobManager web interface address http://172.19.0.2:8081 > Starting execution of program > Submitting job with JobID: 55544e0ebc1f5014df53b200974afdbf. Waiting for > job completion. > Connected to JobManager at Actor[akka.tcp://flink@172.19. > 0.2:6123/user/jobmanager#-1305686264] > 11/14/2016 17:13:33 Job execution switched to status RUNNING. > 11/14/2016 17:13:33 DataSource (at main(App.java:25) ( > org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to > SCHEDULED > 11/14/2016 17:13:33 DataSource (at main(App.java:25) ( > org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to > DEPLOYING > 11/14/2016 17:13:33 DataSource (at main(App.java:25) ( > org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to > RUNNING > 11/14/2016 17:13:34 DataSink (count())(1/1) switched to SCHEDULED > 11/14/2016 17:13:34 DataSink (count())(1/1) switched to DEPLOYING > 11/14/2016 17:13:34 DataSink (count())(1/1) switched to RUNNING > 11/14/2016 17:13:36 DataSink (count())(1/1) switched to FINISHED > 11/14/2016 17:13:36 DataSource (at main(App.java:25) ( > org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to > FINISHED > Tuple size: 1049866 > Submitting job with JobID: ab0931dc89e4a86de17549eeb518fde6. Waiting for > job completion. > Connected to JobManager at Actor[akka.tcp://flink@172.19. > 0.2:6123/user/jobmanager#-1305686264] > 11/14/2016 17:13:37 Job execution switched to status RUNNING. > 11/14/2016 17:13:37 CHAIN DataSource (at main(App.java:25) ( > org.apache.flink.api.java.io.TupleCsvInputFormat)) -> Map (Map at > fromTuple2DataSet(Graph.java:343))(1/1) switched to SCHEDULED 11/14/2016 > 17:13:37 CHAIN DataSource (at main(App.java:25) ( > org.apache.flink.api.java.io.TupleCsvInputFormat)) -> Map (Map at > fromTuple2DataSet(Graph.java:343))(1/1) switched to DEPLOYING 11/14/2016 > 17:13:37 CHAIN DataSource (at main(App.java:25) ( > org.apache.flink.api.java.io.TupleCsvInputFormat)) -> Map (Map at > fromTuple2DataSet(Graph.java:343))(1/1) switched to RUNNING > 11/14/2016 17:13:39 CHAIN DataSource (at main(App.java:25) ( > org.apache.flink.api.java.io.TupleCsvInputFormat)) -> Map (Map at > fromTuple2DataSet(Graph.java:343))(1/1) switched to FINISHED > 11/14/2016 17:13:39 CHAIN FlatMap (FlatMap at > fromDataSet(Graph.java:216)) -> Combine(Distinct at > fromDataSet(Graph.java:216))(1/1) switched to SCHEDULED > 11/14/2016 17:13:39 CHAIN FlatMap (FlatMap at > fromDataSet(Graph.java:216)) -> Combine(Distinct at > fromDataSet(Graph.java:216))(1/1) switched to DEPLOYING > 11/14/2016 17:13:39 CHAIN Map (Map at mapEdges(Graph.java:596)) -> > FlatMap (FlatMap at getUndirected(Graph.java:926))(1/1) switched to > SCHEDULED > 11/14/2016 17:13:39 CHAIN Map (Map at mapEdges(Graph.java:596)) -> > FlatMap (FlatMap at getUndirected(Graph.java:926))(1/1) switched to > DEPLOYING > 11/14/2016 17:13:39 CHAIN FlatMap (FlatMap at > fromDataSet(Graph.java:216)) -> Combine(Distinct at > fromDataSet(Graph.java:216))(1/1) switched to RUNNING > 11/14/2016 17:13:39 CHAIN Map (Map at mapEdges(Graph.java:596)) -> > FlatMap (FlatMap at getUndirected(Graph.java:926))(1/1) switched to > RUNNING > 11/14/2016 17:13:40 CoGroup (Messaging)(1/1) switched to SCHEDULED > 11/14/2016 17:13:40 CoGroup (Messaging)(1/1) switched to DEPLOYING > 11/14/2016 17:13:40 CoGroup (Messaging)(1/1) switched to RUNNING > 11/14/2016 17:13:44 CHAIN Reduce (Distinct at > fromDataSet(Graph.java:216)) -> Map (Map at fromDataSet(Graph.java:217))(1/1) > switched to SCHEDULED > 11/14/2016 17:13:44 CHAIN Reduce (Distinct at > fromDataSet(Graph.java:216)) -> Map (Map at fromDataSet(Graph.java:217))(1/1) > switched to DEPLOYING > 11/14/2016 17:13:44 CHAIN Reduce (Distinct at > fromDataSet(Graph.java:216)) -> Map (Map at fromDataSet(Graph.java:217))(1/1) > switched to RUNNING > 11/14/2016 17:13:49 CHAIN Map (Map at mapEdges(Graph.java:596)) -> > FlatMap (FlatMap at getUndirected(Graph.java:926))(1/1) switched to > FINISHED > 11/14/2016 17:13:50 CHAIN FlatMap (FlatMap at > fromDataSet(Graph.java:216)) -> Combine(Distinct at > fromDataSet(Graph.java:216))(1/1) switched to FINISHED > 11/14/2016 17:13:54 CHAIN Reduce (Distinct at > fromDataSet(Graph.java:216)) -> Map (Map at fromDataSet(Graph.java:217))(1/1) > switched to FINISHED > 11/14/2016 17:13:54 IterationHead(Scatter-gather iteration > (org.apache.flink.graph.library.ConnectedComponents$CCUpdater@53f48368 | > org.apache.flink.graph.library.ConnectedComponents$CCMessenger@24d4d7c9))(1/1) > switched to SCHEDULED > 11/14/2016 17:13:54 IterationHead(Scatter-gather iteration > (org.apache.flink.graph.library.ConnectedComponents$CCUpdater@53f48368 | > org.apache.flink.graph.library.ConnectedComponents$CCMessenger@24d4d7c9))(1/1) > switched to DEPLOYING > 11/14/2016 17:13:54 IterationHead(Scatter-gather iteration > (org.apache.flink.graph.library.ConnectedComponents$CCUpdater@53f48368 | > org.apache.flink.graph.library.ConnectedComponents$CCMessenger@24d4d7c9))(1/1) > switched to RUNNING > 11/14/2016 17:13:55 CoGroup (Vertex State Updates)(1/1) switched to > SCHEDULED > 11/14/2016 17:13:55 CoGroup (Vertex State Updates)(1/1) switched to > DEPLOYING > 11/14/2016 17:13:55 CoGroup (Vertex State Updates)(1/1) switched to > RUNNING > 11/14/2016 17:14:06 Sync (Scatter-gather iteration > (org.apache.flink.graph.library.ConnectedComponents$CCUpdater@53f48368 | > org.apache.flink.graph.library.ConnectedComponents$CCMessenger@24d4d7c9))(1/1) > switched to SCHEDULED > 11/14/2016 17:14:06 Sync (Scatter-gather iteration > (org.apache.flink.graph.library.ConnectedComponents$CCUpdater@53f48368 | > org.apache.flink.graph.library.ConnectedComponents$CCMessenger@24d4d7c9))(1/1) > switched to DEPLOYING > 11/14/2016 17:14:06 Sync (Scatter-gather iteration > (org.apache.flink.graph.library.ConnectedComponents$CCUpdater@53f48368 | > org.apache.flink.graph.library.ConnectedComponents$CCMessenger@24d4d7c9))(1/1) > switched to RUNNING > 11/14/2016 17:15:00 Sync (Scatter-gather iteration > (org.apache.flink.graph.library.ConnectedComponents$CCUpdater@53f48368 | > org.apache.flink.graph.library.ConnectedComponents$CCMessenger@24d4d7c9))(1/1) > switched to FINISHED > 11/14/2016 17:15:00 DataSink (count())(1/1) switched to SCHEDULED > 11/14/2016 17:15:00 DataSink (count())(1/1) switched to DEPLOYING > 11/14/2016 17:15:00 CoGroup (Vertex State Updates)(1/1) switched to > FINISHED > 11/14/2016 17:15:00 DataSink (count())(1/1) switched to RUNNING > 11/14/2016 17:15:00 CoGroup (Messaging)(1/1) switched to FINISHED > 11/14/2016 17:15:00 DataSink (count())(1/1) switched to FINISHED > 11/14/2016 17:15:00 IterationHead(Scatter-gather iteration > (org.apache.flink.graph.library.ConnectedComponents$CCUpdater@53f48368 | > org.apache.flink.graph.library.ConnectedComponents$CCMessenger@24d4d7c9))(1/1) > switched to FINISHED > 11/14/2016 17:15:00 Job execution switched to status FINISHED. > Component count: 317080 > Program execution finished > Job with JobID ab0931dc89e4a86de17549eeb518fde6 has finished. > Job Runtime: 83229 ms > Accumulator Results: > - e6c358969906b4ce1d682d6840281848 (java.lang.Long): 317080 > > Thanks you for the attention. It seems solved. > > > Kind regards, > > Miguel E. Coimbra > Email: miguel.e.coim...@gmail.com <miguel.e.coim...@ist.utl.pt> > Skype: miguel.e.coimbra > > On 14 November 2016 at 09:26, Ufuk Celebi <u...@apache.org> wrote: > >> What do the TaskManager logs say wrt to allocation of managed memory? >> >> Something like: >> >> Limiting managed memory to ... of the currently free heap space ..., >> memory will be allocated lazily. >> >> What else did you configure in flink-conf? >> >> Looping in Greg and Vasia who maintain Gelly and are most-familiar with >> the internals. >> >> – Ufuk >> >> >> On 8 November 2016 at 22:35:22, Miguel Coimbra ( >> miguel.e.coim...@gmail.com) wrote: >> > Dear community, >> > >> > I have a problem which I hope you'll be able to help with. >> > I apologize in advance for the verbosity of the post. >> > I am running the Flink standalone cluster (not even storing to the >> > filesystem) with 2 Docker containers. >> > >> > I set the image of the Dockerfile for Flink 1.1.2, which was the same >> > version of the main class in the .jar >> > The Docker image was configured to use Java 8, which is what the >> project's >> > pom.xml requires as well. >> > I have also edited the TaskManager conf/flink-con.yaml to have the >> > following values: >> > >> > .... >> > taskmanager.heap.mb: 7512 >> > .... >> > taskmanager.network.numberOfBuffers: 16048 >> > .... >> > >> > >> > Properties of this host/docker setup: >> > - host machine has *256 GB *of RAM >> > - job manager container is running with default flink config >> > - task manager has *7.5 GB *of memory available >> > - task manager number of buffers is *16048 *which is very generous >> compared >> > to the default value >> > >> > I am testing on the SNAP DBLP dataset: >> > https://snap.stanford.edu/data/com-DBLP.html >> > It has: >> > >> > 317080 nodes >> > 1049866 edges >> > >> > These are the relevant parts of the pom.xml of the project: >> > *(note: the project executes without error for local executions without >> the >> > cluster)* >> > >> > .... >> > >> > UTF-8 >> > >> > UTF-8 >> > 1.8 >> > 1.8 >> > 1.1.2 >> > >> > ..... >> > >> > >> > org.apache.flink >> > flink-java >> > ${flink.version} >> > >> > >> > org.apache.flink >> > flink-core >> > ${flink.version} >> > >> > >> > org.apache.flink >> > flink-streaming-java_2.10 >> > ${flink.version} >> > >> > >> > org.apache.flink >> > flink-clients_2.10 >> > ${flink.version} >> > >> > >> > org.apache.flink >> > flink-gelly_2.10 >> > ${flink.version} >> > >> > >> > junit >> > junit >> > 3.8.1 >> > test >> > >> > >> > >> > I am running (what I believe to be) a simple Gelly application, >> performing >> > the ConnectedComponents algorithm with 30 iterations: >> > >> > public static void main(String[] args) { >> > final ExecutionEnvironment env = >> > ExecutionEnvironment.getExecutionEnvironment(); >> > >> > >> > final String dataPath = args[0]; >> > >> > final DataSet> edgeTuples = >> > env.readCsvFile(dataPath) >> > .fieldDelimiter("\t") // node IDs are separated by spaces >> > .ignoreComments("#") // comments start with "%" >> > .types(Long.class, Long.class); >> > >> > try { >> > System.out.println("Tuple size: " + edgeTuples.count()); >> > } catch (Exception e1) { >> > e1.printStackTrace(); >> > } >> > >> > /* >> > * @param the key type for edge and vertex identifiers >> > * @param the value type for vertices >> > * @param the value type for edges >> > * public class Graph >> > */ >> > >> > >> > final Graph graph = Graph.fromTuple2DataSet( >> > edgeTuples, >> > new MapFunction() { >> > private static final long serialVersionUID = >> > 8713516577419451509L; >> > public Long map(Long value) { >> > return value; >> > } >> > }, >> > env >> > ); >> > >> > >> > try { >> > /** >> > * @param key type >> > * @param vertex value type >> > * @param edge value type >> > * @param the return type >> > >> > class ConnectedComponents, EV> >> > implements GraphAlgorithm>> >> > */ >> > >> > DataSet> verticesWithComponents = >> > graph.run(new ConnectedComponents(30)); >> > System.out.println("Component count: " + >> > verticesWithComponents.count()); >> > } catch (Exception e) { >> > e.printStackTrace(); >> > } >> > } >> > >> > >> > However, the following is output on the host machine on execution: >> > >> > docker exec -it $(docker ps --filter name=jobmanager --format={{.ID}}) >> > flink run -m 3de7625b8e28:6123 -c flink.graph.example.App >> > /home/myuser/flink-graph-example-0.0.1-SNAPSHOT.jar >> > /home/myuser/com-dblp.ungraph.txt >> > >> > Cluster configuration: Standalone cluster with JobManager at / >> > 172.19.0.2:6123 >> > Using address 172.19.0.2:6123 to connect to JobManager. >> > JobManager web interface address http://172.19.0.2:8081 >> > Starting execution of program >> > Submitting job with JobID: fd6a12896b749e9ed439bbb196c6aaae. Waiting >> for >> > job completion. >> > Connected to JobManager at Actor[akka.tcp:// >> > flink@172.19.0.2:6123/user/jobmanager#-658812967] >> > >> > 11/08/2016 21:22:44 DataSource (at main(App.java:25) >> > (org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to >> > SCHEDULED >> > 11/08/2016 21:22:44 DataSource (at main(App.java:25) >> > (org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to >> > DEPLOYING >> > 11/08/2016 21:22:44 DataSource (at main(App.java:25) >> > (org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to >> RUNNING >> > 11/08/2016 21:22:44 DataSink (count())(1/1) switched to SCHEDULED >> > 11/08/2016 21:22:44 DataSink (count())(1/1) switched to DEPLOYING >> > 11/08/2016 21:22:44 DataSink (count())(1/1) switched to RUNNING >> > 11/08/2016 21:22:44 DataSink (count())(1/1) switched to FINISHED >> > 11/08/2016 21:22:44 DataSource (at main(App.java:25) >> > (org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to >> > FINISHED >> > 11/08/2016 21:22:44 Job execution switched to status FINISHED. >> > Tuple size: 1049866 >> > Submitting job with JobID: d68d6d775cc222d9fd0728d9666e83de. Waiting >> for >> > job completion. >> > Connected to JobManager at Actor[akka.tcp:// >> > flink@172.19.0.2:6123/user/jobmanager#-658812967] >> > 11/08/2016 21:22:45 Job execution switched to status RUNNING. >> > 11/08/2016 21:22:45 CHAIN DataSource (at main(App.java:25) >> > (org.apache.flink.api.java.io.TupleCsvInputFormat)) -> Map (Map at >> > fromTuple2DataSet(Graph.java:343))(1/1) switched to SCHEDULED >> > >> > 11/08/2016 21:22:45 CHAIN DataSource (at main(App.java:25) >> > (org.apache.flink.api.java.io.TupleCsvInputFormat)) -> Map (Map at >> > fromTuple2DataSet(Graph.java:343))(1/1) switched to DEPLOYING >> > >> > 11/08/2016 21:22:45 CHAIN DataSource (at main(App.java:25) >> > (org.apache.flink.api.java.io.TupleCsvInputFormat)) -> Map (Map at >> > fromTuple2DataSet(Graph.java:343))(1/1) switched to RUNNING >> > 11/08/2016 21:22:45 CHAIN DataSource (at main(App.java:25) >> > (org.apache.flink.api.java.io.TupleCsvInputFormat)) -> Map (Map at >> > fromTuple2DataSet(Graph.java:343))(1/1) switched to FINISHED >> > 11/08/2016 21:22:45 CHAIN FlatMap (FlatMap at >> > fromDataSet(Graph.java:216)) -> Combine(Distinct at >> > fromDataSet(Graph.java:216))(1/1) switched to SCHEDULED >> > 11/08/2016 21:22:45 CHAIN Map (Map at mapEdges(Graph.java:596)) -> >> > FlatMap (FlatMap at getUndirected(Graph.java:926))(1/1) switched to >> > SCHEDULED >> > 11/08/2016 21:22:45 CHAIN FlatMap (FlatMap at >> > fromDataSet(Graph.java:216)) -> Combine(Distinct at >> > fromDataSet(Graph.java:216))(1/1) switched to DEPLOYING >> > 11/08/2016 21:22:45 CHAIN Map (Map at mapEdges(Graph.java:596)) -> >> > FlatMap (FlatMap at getUndirected(Graph.java:926))(1/1) switched to >> > DEPLOYING >> > 11/08/2016 21:22:45 CHAIN Map (Map at mapEdges(Graph.java:596)) -> >> > FlatMap (FlatMap at getUndirected(Graph.java:926))(1/1) switched to >> RUNNING >> > 11/08/2016 21:22:45 CHAIN FlatMap (FlatMap at >> > fromDataSet(Graph.java:216)) -> Combine(Distinct at >> > fromDataSet(Graph.java:216))(1/1) switched to RUNNING >> > 11/08/2016 21:22:45 CoGroup (Messaging)(1/1) switched to SCHEDULED >> > 11/08/2016 21:22:45 CoGroup (Messaging)(1/1) switched to DEPLOYING >> > 11/08/2016 21:22:45 CoGroup (Messaging)(1/1) switched to RUNNING >> > 11/08/2016 21:22:45 CHAIN Reduce (Distinct at >> > fromDataSet(Graph.java:216)) -> Map (Map at >> > fromDataSet(Graph.java:217))(1/1) switched to SCHEDULED >> > 11/08/2016 21:22:45 CHAIN Reduce (Distinct at >> > fromDataSet(Graph.java:216)) -> Map (Map at >> > fromDataSet(Graph.java:217))(1/1) switched to DEPLOYING >> > 11/08/2016 21:22:45 CHAIN Reduce (Distinct at >> > fromDataSet(Graph.java:216)) -> Map (Map at >> > fromDataSet(Graph.java:217))(1/1) switched to RUNNING >> > 11/08/2016 21:22:47 CHAIN Map (Map at mapEdges(Graph.java:596)) -> >> > FlatMap (FlatMap at getUndirected(Graph.java:926))(1/1) switched to >> FINISHED >> > 11/08/2016 21:22:47 CHAIN FlatMap (FlatMap at >> > fromDataSet(Graph.java:216)) -> Combine(Distinct at >> > fromDataSet(Graph.java:216))(1/1) switched to FINISHED >> > 11/08/2016 21:22:48 CHAIN Reduce (Distinct at >> > fromDataSet(Graph.java:216)) -> Map (Map at >> > fromDataSet(Graph.java:217))(1/1) switched to FINISHED >> > 11/08/2016 21:22:48 IterationHead(Scatter-gather iteration >> > (org.apache.flink.graph.library.ConnectedComponents$CCUpdater@650eab8 | >> > org.apache.flink.graph.library.ConnectedComponents$CCMessenger@30f5a68a >> ))(1/1) >> > switched to SCHEDULED >> > 11/08/2016 21:22:48 IterationHead(Scatter-gather iteration >> > (org.apache.flink.graph.library.ConnectedComponents$CCUpdater@650eab8 | >> > org.apache.flink.graph.library.ConnectedComponents$CCMessenger@30f5a68a >> ))(1/1) >> > switched to DEPLOYING >> > 11/08/2016 21:22:48 IterationHead(Scatter-gather iteration >> > (org.apache.flink.graph.library.ConnectedComponents$CCUpdater@650eab8 | >> > org.apache.flink.graph.library.ConnectedComponents$CCMessenger@30f5a68a >> ))(1/1) >> > switched to RUNNING >> > 11/08/2016 21:22:48 IterationHead(Scatter-gather iteration >> > (org.apache.flink.graph.library.ConnectedComponents$CCUpdater@650eab8 | >> > org.apache.flink.graph.library.ConnectedComponents$CCMessenger@30f5a68a >> ))(1/1) >> > switched to FAILED >> > java.lang.IllegalArgumentException: Too few memory segments provided. >> Hash >> > Table needs at least 33 memory segments. >> > at >> > org.apache.flink.runtime.operators.hash.CompactingHashTable. >> (CompactingHashTable.java:206) >> > at >> > org.apache.flink.runtime.operators.hash.CompactingHashTable. >> (CompactingHashTable.java:191) >> > at >> > org.apache.flink.runtime.iterative.task.IterationHeadTask.in >> itCompactingHashTable(IterationHeadTask.java:175) >> > at >> > org.apache.flink.runtime.iterative.task.IterationHeadTask.ru >> n(IterationHeadTask.java:272) >> > at >> > org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351) >> > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) >> > at java.lang.Thread.run(Thread.java:745) >> > >> > 11/08/2016 21:22:48 Job execution switched to status FAILING. >> > java.lang.IllegalArgumentException: Too few memory segments provided. >> Hash >> > Table needs at least 33 memory segments. >> > at >> > org.apache.flink.runtime.operators.hash.CompactingHashTable. >> (CompactingHashTable.java:206) >> > at >> > org.apache.flink.runtime.operators.hash.CompactingHashTable. >> (CompactingHashTable.java:191) >> > at >> > org.apache.flink.runtime.iterative.task.IterationHeadTask.in >> itCompactingHashTable(IterationHeadTask.java:175) >> > at >> > org.apache.flink.runtime.iterative.task.IterationHeadTask.ru >> n(IterationHeadTask.java:272) >> > at >> > org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351) >> > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) >> > at java.lang.Thread.run(Thread.java:745) >> > >> > The results I found online so far were not enough, and I am not sure as >> to >> > the best way to solve this. >> > >> > If anyone can help diagnose and correct this issue, I would be very >> > thankful. >> > >> > Best regards, >> > >> > Miguel E. Coimbra >> > Email: miguel.e.coim...@gmail.com >> > Skype: miguel.e.coimbra >> > >> >> >