Hi again, Glad that you solved your problem :)
Splitting code into smaller functions has its advantages, but more operators/tasks means more overhead for JobManager/TaskManager to manage them. Usually that’s not a big issue, but as I said, you were running your cluster on extremely low memory settings. Piotrek > On 9 Oct 2018, at 18:09, jpreis...@free.fr wrote: > > Hi Piotrek, > > Thank you for your answer. Actually it was necessary to increase the memory > of the JobManager (I had tested it but I had not restarted Flink ...). > > I will also work on optimization. I thought it was good practice to create as > much function as possible based on their functional value (for example: > create two FilterFunctions that have a different functional meaning). So I > will try to have fewer functions (for example: gather my two FilterFunctions > in one). > > Thanks again Piotrek ! > > Julien. > > ----- Mail original ----- > De: "Piotr Nowojski" <pi...@data-artisans.com> > À: jpreis...@free.fr > Cc: user@flink.apache.org > Envoyé: Mardi 9 Octobre 2018 10:37:58 > Objet: Re: JobManager did not respond within 60000 ms > > Hi, > > > You have quite complicated job graph and very low memory settings for the job > manager and task manager. It might be that long GC pauses are causing this > problem. > > > Secondly, there are quite some results in google search of this error that > points toward high-availability issues. Have you read those previously > reported problems? > > > Thanks, Piotrek > > > > > > On 9 Oct 2018, at 09:57, jpreis...@free.fr wrote: > > > I have a streaming job that works in standalone cluster. Flink version is > 1.4.1. Everything was working so far. But since I added new treatments, I can > not start my job anymore. I have this exception : > > org.apache.flink.client.program.ProgramInvocationException: The program > execution failed: JobManager did not respond within 60000 ms > at > org.apache.flink.client.program.ClusterClient.runDetached(ClusterClient.java:524) > > at > org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:103) > > at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:456) > at > org.apache.flink.client.program.DetachedEnvironment.finalizeExecute(DetachedEnvironment.java:77) > > at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:402) > at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:802) > at org.apache.flink.client.CliFrontend.run(CliFrontend.java:282) > at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1054) > at org.apache.flink.client.CliFrontend$1.call(CliFrontend.java:1101) > at org.apache.flink.client.CliFrontend$1.call(CliFrontend.java:1098) > at > org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30) > > at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1098) > Caused by: org.apache.flink.runtime.client.JobTimeoutException: JobManager > did not respond within 60000 ms > at > org.apache.flink.runtime.client.JobClient.submitJobDetached(JobClient.java:437) > > at > org.apache.flink.client.program.ClusterClient.runDetached(ClusterClient.java:516) > > ... 11 more > Caused by: java.util.concurrent.TimeoutException > at > java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771) > at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915) > at > org.apache.flink.runtime.client.JobClient.submitJobDetached(JobClient.java:435) > > ... 12 more > > I see a very strange behavior. When I comment on a function (any one, for > example a FilterFunction, which was present before or after my modification). > I tried to change the configuration (akka.client.timeout and akka.framesize) > without success. > > This is my flink-conf.yaml > jobmanager.rpc.address: myhost > jobmanager.rpc.port: 6123 > jobmanager.heap.mb: 128 > taskmanager.heap.mb: 1024 > taskmanager.numberOfTaskSlots: 100 > taskmanager.memory.preallocate: false > taskmanager.data.port: 6121 > parallelism.default: 1 > taskmanager.tmp.dirs: /dohdev/flink/tmp/tskmgr > blob.storage.directory: /dohdev/flink/tmp/blob > jobmanager.web.port: -1 > high-availability: zookeeper > high-availability.zookeeper.quorum: localhost:2181 > high-availability.zookeeper.path.root: /dohdev/flink > high-availability.cluster-id: dev > high-availability.storageDir: file:////mnt/metaflink > high-availability.zookeeper.storageDir: > /mnt/metaflink/inh/agregateur/recovery > restart-strategy: fixed-delay > restart-strategy.fixed-delay.attempts: 1000 > restart-strategy.fixed-delay.delay: 5 s > zookeeper.sasl.disable: true > blob.service.cleanup.interval: 60 > > And I launch a job with this command : bin/flink run -d myjar.jar > > I added as an attachment a graph of my job when it works (Graph.PNG). > > Do you have an idea of the problem ? > > Thanks. > Julien > > > <Graph.PNG>