Hi again,

Glad that you solved your problem :)

Splitting code into smaller functions has its advantages, but more 
operators/tasks means more overhead for JobManager/TaskManager to manage them. 
Usually that’s not a big issue, but as I said, you were running your cluster on 
extremely low memory settings.

Piotrek

> On 9 Oct 2018, at 18:09, jpreis...@free.fr wrote:
> 
> Hi Piotrek,
> 
> Thank you for your answer. Actually it was necessary to increase the memory 
> of the JobManager (I had tested it but I had not restarted Flink ...).
> 
> I will also work on optimization. I thought it was good practice to create as 
> much function as possible based on their functional value (for example: 
> create two FilterFunctions that have a different functional meaning). So I 
> will try to have fewer functions (for example: gather my two FilterFunctions 
> in one).
> 
> Thanks again Piotrek !
> 
> Julien.
> 
> ----- Mail original -----
> De: "Piotr Nowojski" <pi...@data-artisans.com>
> À: jpreis...@free.fr
> Cc: user@flink.apache.org
> Envoyé: Mardi 9 Octobre 2018 10:37:58
> Objet: Re: JobManager did not respond within 60000 ms
> 
> Hi, 
> 
> 
> You have quite complicated job graph and very low memory settings for the job 
> manager and task manager. It might be that long GC pauses are causing this 
> problem. 
> 
> 
> Secondly, there are quite some results in google search of this error that 
> points toward high-availability issues. Have you read those previously 
> reported problems? 
> 
> 
> Thanks, Piotrek 
> 
> 
> 
> 
> 
> On 9 Oct 2018, at 09:57, jpreis...@free.fr wrote: 
> 
> 
> I have a streaming job that works in standalone cluster. Flink version is 
> 1.4.1. Everything was working so far. But since I added new treatments, I can 
> not start my job anymore. I have this exception : 
> 
> org.apache.flink.client.program.ProgramInvocationException: The program 
> execution failed: JobManager did not respond within 60000 ms 
> at 
> org.apache.flink.client.program.ClusterClient.runDetached(ClusterClient.java:524)
>  
> at 
> org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:103)
>  
> at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:456) 
> at 
> org.apache.flink.client.program.DetachedEnvironment.finalizeExecute(DetachedEnvironment.java:77)
>  
> at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:402) 
> at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:802) 
> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:282) 
> at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1054) 
> at org.apache.flink.client.CliFrontend$1.call(CliFrontend.java:1101) 
> at org.apache.flink.client.CliFrontend$1.call(CliFrontend.java:1098) 
> at 
> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
>  
> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1098) 
> Caused by: org.apache.flink.runtime.client.JobTimeoutException: JobManager 
> did not respond within 60000 ms 
> at 
> org.apache.flink.runtime.client.JobClient.submitJobDetached(JobClient.java:437)
>  
> at 
> org.apache.flink.client.program.ClusterClient.runDetached(ClusterClient.java:516)
>  
> ... 11 more 
> Caused by: java.util.concurrent.TimeoutException 
> at 
> java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771) 
> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915) 
> at 
> org.apache.flink.runtime.client.JobClient.submitJobDetached(JobClient.java:435)
>  
> ... 12 more 
> 
> I see a very strange behavior. When I comment on a function (any one, for 
> example a FilterFunction, which was present before or after my modification). 
> I tried to change the configuration (akka.client.timeout and akka.framesize) 
> without success. 
> 
> This is my flink-conf.yaml 
> jobmanager.rpc.address: myhost 
> jobmanager.rpc.port: 6123 
> jobmanager.heap.mb: 128 
> taskmanager.heap.mb: 1024 
> taskmanager.numberOfTaskSlots: 100 
> taskmanager.memory.preallocate: false 
> taskmanager.data.port: 6121 
> parallelism.default: 1 
> taskmanager.tmp.dirs: /dohdev/flink/tmp/tskmgr 
> blob.storage.directory: /dohdev/flink/tmp/blob 
> jobmanager.web.port: -1 
> high-availability: zookeeper 
> high-availability.zookeeper.quorum: localhost:2181 
> high-availability.zookeeper.path.root: /dohdev/flink 
> high-availability.cluster-id: dev 
> high-availability.storageDir: file:////mnt/metaflink 
> high-availability.zookeeper.storageDir: 
> /mnt/metaflink/inh/agregateur/recovery 
> restart-strategy: fixed-delay 
> restart-strategy.fixed-delay.attempts: 1000 
> restart-strategy.fixed-delay.delay: 5 s 
> zookeeper.sasl.disable: true 
> blob.service.cleanup.interval: 60 
> 
> And I launch a job with this command : bin/flink run -d myjar.jar 
> 
> I added as an attachment a graph of my job when it works (Graph.PNG). 
> 
> Do you have an idea of the problem ? 
> 
> Thanks. 
> Julien 
> 
> 
> <Graph.PNG> 

Reply via email to