Thanks, I will try that. On Tue, Sep 26, 2017 at 8:24 AM Aljoscha Krettek <aljos...@apache.org> wrote:
> I'm not sure whether the JM is reading it or not. But you can manually set > the values on the Configuration using the setter methods. > > > On 26. Sep 2017, at 16:58, Hao Sun <ha...@zendesk.com> wrote: > > Thanks Aljoscha, I still have questions. > Do I have to parse the yaml to a Configuration file? If JM is not reading > the config how is reading it? the thread is [main] from the logs. > Why JM does not read the config file by default? > > def createLocalEnvironment(parallelism: Int = > JavaEnv.getDefaultLocalParallelism): > StreamExecutionEnvironment = { > new StreamExecutionEnvironment(JavaEnv.createLocalEnvironment(parallelism)) > } > > @PublicEvolving > def createLocalEnvironmentWithWebUI(config: Configuration = null): > StreamExecutionEnvironment = { > val conf: Configuration = if (config == null) new Configuration() else > config > new > StreamExecutionEnvironment(JavaEnv.createLocalEnvironmentWithWebUI(conf)) > } > > > On Tue, Sep 26, 2017 at 6:25 AM Aljoscha Krettek <aljos...@apache.org> > wrote: > >> Hi, >> >> I think the GlobalConfiguration is not necessarily read by the (local) >> JobManager. You could try using >> StreamExecutionEnvironment.createLocalEnvironment(int, Configuration) to >> manually specify a configuration. >> >> Best, >> Aljoscha >> >> On 26. Sep 2017, at 05:49, Hao Sun <ha...@zendesk.com> wrote: >> >> Hi I am running flink in dev mode through Intellij, I have >> flink-conf.yaml correctly configured and from the log you can see job >> manager is reading it. >> >> 2017-09-25 20:41:52.255 [main] INFO >> org.apache.flink.configuration.GlobalConfiguration - *Loading >> configuration property: state.backend, rocksdb* >> 2017-09-25 20:41:52.256 [main] INFO >> org.apache.flink.configuration.GlobalConfiguration - Loading >> configuration property: state.backend.fs.checkpointdir, >> /tmp/flink/checkpoints/ >> 2017-09-25 20:41:52.256 [main] INFO >> org.apache.flink.configuration.GlobalConfiguration - *Loading >> configuration property: state.checkpoints.dir, /tmp/flink/checkpoints-meta/* >> 2017-09-25 20:41:52.256 [main] INFO >> org.apache.flink.configuration.GlobalConfiguration - Loading >> configuration property: state.savepoints.dir, /tmp/flink/savepoints/ >> >> *But I still somehow get this error* >> java.lang.IllegalStateException: CheckpointConfig says to persist >> periodic checkpoints, but no checkpoint directory has been configured. You >> can configure configure one via key 'state.checkpoints.dir'. >> at >> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.<init>(CheckpointCoordinator.java:209) >> at >> org.apache.flink.runtime.executiongraph.ExecutionGraph.enableCheckpointing(ExecutionGraph.java:451) >> at >> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:278) >> at org.apache.flink.runtime.jobmanager.JobManager.org >> <http://org.apache.flink.runtime.jobmanager.jobmanager.org/> >> $apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1315) >> at >> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:495) >> at >> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) >> at >> org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:38) >> at >> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) >> at >> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33) >> at >> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28) >> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) >> at >> org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28) >> at akka.actor.Actor$class.aroundReceive(Actor.scala:467) >> at >> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:125) >> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) >> at akka.actor.ActorCell.invoke(ActorCell.scala:487) >> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) >> at akka.dispatch.Mailbox.run(Mailbox.scala:220) >> at >> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) >> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >> at >> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >> at >> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >> at >> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >> >> >> *My program only has this related to checkpointing* >> >> val env = StreamExecutionEnvironment.*getExecutionEnvironment >> *env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)env.enableCheckpointing(2 >> * 60 * 1000) >> >> >> Need some help to dig through this. Thanks >> >> =================== Full log ================= >> >> 2017-09-25 20:41:51.466 [ForkJoinPool-1-worker-13] INFO >> com.zendesk.consul.Consul - Collecting kafka nodes from >> Consul(consul.docker:8500) for tags=List(dev) >> 2017-09-25 20:41:51.946 [main] INFO >> org.apache.flink.api.java.typeutils.TypeExtractor - class >> com.zendesk.fraud_prevention.data_types.MaxwellFilterEvents does not >> contain a setter for field events >> 2017-09-25 20:41:51.946 [main] INFO >> org.apache.flink.api.java.typeutils.TypeExtractor - class >> com.zendesk.fraud_prevention.data_types.MaxwellFilterEvents is not a valid >> POJO type because not all fields are valid POJO fields. >> 2017-09-25 20:41:51.985 [main] INFO >> org.apache.flink.api.java.typeutils.TypeExtractor - class >> com.zendesk.fraud_prevention.data_types.MaxwellFPSEvent does not contain a >> setter for field accountId >> 2017-09-25 20:41:51.985 [main] INFO >> org.apache.flink.api.java.typeutils.TypeExtractor - class >> com.zendesk.fraud_prevention.data_types.MaxwellFPSEvent is not a valid POJO >> type because not all fields are valid POJO fields. >> 2017-09-25 20:41:52.017 [ForkJoinPool-1-worker-13] INFO >> com.zendesk.consul.Consul - Collecting kafka nodes from >> Consul(consul.docker:8500) for tags=List(dev) >> 2017-09-25 20:41:52.198 [main] INFO >> o.a.flink.streaming.api.environment.LocalStreamEnvironment - Running job >> on local embedded Flink mini cluster >> 2017-09-25 20:41:52.253 [main] INFO >> org.apache.flink.configuration.GlobalConfiguration - Loading >> configuration property: jobmanager.rpc.address, localhost >> 2017-09-25 20:41:52.255 [main] INFO >> org.apache.flink.configuration.GlobalConfiguration - Loading >> configuration property: jobmanager.rpc.port, 6123 >> 2017-09-25 20:41:52.255 [main] INFO >> org.apache.flink.configuration.GlobalConfiguration - Loading >> configuration property: jobmanager.web.port, 8081 >> 2017-09-25 20:41:52.255 [main] INFO >> org.apache.flink.configuration.GlobalConfiguration - Loading >> configuration property: jobmanager.heap.mb, 1024 >> 2017-09-25 20:41:52.255 [main] INFO >> org.apache.flink.configuration.GlobalConfiguration - Loading >> configuration property: taskmanager.heap.mb, 1024 >> 2017-09-25 20:41:52.255 [main] INFO >> org.apache.flink.configuration.GlobalConfiguration - Loading >> configuration property: taskmanager.memory.preallocate, false >> 2017-09-25 20:41:52.255 [main] INFO >> org.apache.flink.configuration.GlobalConfiguration - Loading >> configuration property: taskmanager.numberOfTaskSlots, 1 >> 2017-09-25 20:41:52.255 [main] INFO >> org.apache.flink.configuration.GlobalConfiguration - Loading >> configuration property: parallelism.default, 1 >> 2017-09-25 20:41:52.255 [main] INFO >> org.apache.flink.configuration.GlobalConfiguration - Loading >> configuration property: fs.hdfs.hadoopconf, flink/conf >> 2017-09-25 20:41:52.255 [main] INFO >> org.apache.flink.configuration.GlobalConfiguration - Loading >> configuration property: state.backend, rocksdb >> 2017-09-25 20:41:52.256 [main] INFO >> org.apache.flink.configuration.GlobalConfiguration - Loading >> configuration property: state.backend.fs.checkpointdir, >> /tmp/flink/checkpoints/ >> 2017-09-25 20:41:52.256 [main] INFO >> org.apache.flink.configuration.GlobalConfiguration - Loading >> configuration property: state.checkpoints.dir, /tmp/flink/checkpoints-meta/ >> 2017-09-25 20:41:52.256 [main] INFO >> org.apache.flink.configuration.GlobalConfiguration - Loading >> configuration property: state.savepoints.dir, /tmp/flink/savepoints/ >> 2017-09-25 20:41:52.256 [main] INFO >> org.apache.flink.configuration.GlobalConfiguration - Loading >> configuration property: taskmanager.log.path, >> /tmp/flink_logs/flink_console.log >> 2017-09-25 20:41:52.256 [main] INFO >> org.apache.flink.configuration.GlobalConfiguration - Loading >> configuration property: jobmanager.web.log.path, >> /tmp/flink_logs/flink_console.log >> 2017-09-25 20:41:52.256 [main] INFO >> org.apache.flink.configuration.GlobalConfiguration - Loading >> configuration property: high-availability, zookeeper >> 2017-09-25 20:41:52.256 [main] INFO >> org.apache.flink.configuration.GlobalConfiguration - Loading >> configuration property: high-availability.zookeeper.quorum, >> 172.18.0.7:2181 <http://172.18.0.7:2181/> >> 2017-09-25 20:41:52.256 [main] INFO >> org.apache.flink.configuration.GlobalConfiguration - Loading >> configuration property: high-availability.zookeeper.path.root, /flink >> 2017-09-25 20:41:52.256 [main] INFO >> org.apache.flink.configuration.GlobalConfiguration - Loading >> configuration property: high-availability.zookeeper.path.cluster-id, >> /flink_default_ns >> 2017-09-25 20:41:52.256 [main] INFO >> org.apache.flink.configuration.GlobalConfiguration - Loading >> configuration property: high-availability.zookeeper.storageDir, >> /tmp/flink/ha-recovery >> 2017-09-25 20:41:52.257 [main] INFO >> org.apache.flink.runtime.minicluster.FlinkMiniCluster - Disabled >> queryable state server >> 2017-09-25 20:41:52.271 [main] INFO >> org.apache.flink.runtime.minicluster.FlinkMiniCluster - Starting >> FlinkMiniCluster. >> 2017-09-25 20:41:52.442 [flink-akka.actor.default-dispatcher-4] INFO >> akka.event.slf4j.Slf4jLogger - Slf4jLogger started >> 2017-09-25 20:41:52.472 [main] INFO >> org.apache.flink.runtime.blob.BlobServer - Created BLOB server storage >> directory >> /var/folders/37/6tz6x9x97d19_ltcm_1tl_f00000gp/T/blobStore-af52ff47-49d3-4307-bb4e-016c5e5d8a39 >> 2017-09-25 20:41:52.477 [main] INFO >> org.apache.flink.runtime.blob.BlobServer - Started BLOB server at >> 0.0.0.0:56706 <http://0.0.0.0:56706/> - max >> concurrent requests: 50 - max backlog: 1000 >> 2017-09-25 20:41:52.487 [main] INFO >> org.apache.flink.runtime.metrics.MetricRegistry - No metrics reporter >> configured, no metrics will be exposed/reported. >> 2017-09-25 20:41:52.496 [flink-akka.actor.default-dispatcher-4] INFO >> org.apache.flink.runtime.jobmanager.MemoryArchivist - Started memory >> archivist akka://flink/user/archive_1 >> 2017-09-25 20:41:52.501 [flink-akka.actor.default-dispatcher-3] INFO >> org.apache.flink.runtime.jobmanager.JobManager - Starting JobManager at >> akka://flink/user/jobmanager_1. >> 2017-09-25 20:41:52.501 [flink-akka.actor.default-dispatcher-3] INFO >> o.a.f.r.h.nonha.embedded.EmbeddedLeaderService - Proposing leadership to >> contender org.apache.flink.runtime.jobmanager.JobManager@5b4d1f99 @ >> akka://flink/user/jobmanager_1 >> 2017-09-25 20:41:52.508 [main] INFO >> o.apache.flink.runtime.taskexecutor.TaskManagerConfiguration - Messages >> have a max timeout of 10000 ms >> 2017-09-25 20:41:52.514 [main] INFO >> org.apache.flink.runtime.taskexecutor.TaskManagerServices - Temporary >> file directory '/var/folders/37/6tz6x9x97d19_ltcm_1tl_f00000gp/T': total >> 464 GB, usable 61 GB (13.15% usable) >> 2017-09-25 20:41:52.590 [flink-akka.actor.default-dispatcher-3] INFO >> org.apache.flink.runtime.jobmanager.JobManager - JobManager >> akka://flink/user/jobmanager_1 was granted leadership with leader >> session ID Some(d2f1c68f-2982-474f-8b7f-271d3f4e4192). >> 2017-09-25 20:41:52.592 [flink-akka.actor.default-dispatcher-2] INFO >> o.a.f.r.h.nonha.embedded.EmbeddedLeaderService - Received confirmation of >> leadership for leader akka://flink/user/jobmanager_1 , >> session=d2f1c68f-2982-474f-8b7f-271d3f4e4192 >> 2017-09-25 20:41:52.602 [flink-akka.actor.default-dispatcher-2] INFO >> o.a.f.r.c.standalone.StandaloneResourceManager - Trying to associate with >> JobManager leader akka://flink/user/jobmanager_1 >> 2017-09-25 20:41:52.610 [flink-akka.actor.default-dispatcher-3] INFO >> o.a.f.r.c.standalone.StandaloneResourceManager - Resource Manager >> associating with leading JobManager Actor[ >> akka://flink/user/jobmanager_1#-1948249729] - leader session >> d2f1c68f-2982-474f-8b7f-271d3f4e4192 >> 2017-09-25 20:41:52.899 [main] INFO >> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool - Allocated >> 363 MB for network buffer pool (number of memory segments: 11620, bytes per >> segment: 32768). >> 2017-09-25 20:41:52.915 [main] INFO >> org.apache.flink.runtime.io.network.NetworkEnvironment - Starting the >> network environment and its components. >> 2017-09-25 20:41:52.917 [main] INFO >> org.apache.flink.runtime.taskexecutor.TaskManagerServices - Limiting >> managed memory to 1145 MB, memory will be allocated lazily. >> 2017-09-25 20:41:52.922 [main] INFO >> org.apache.flink.runtime.io.disk.iomanager.IOManager - I/O manager uses >> directory >> /var/folders/37/6tz6x9x97d19_ltcm_1tl_f00000gp/T/flink-io-698c06ae-3d56-4765-b9b8-8a5d8b70ffd6 >> for spill files. >> 2017-09-25 20:41:52.923 [main] INFO >> org.apache.flink.runtime.metrics.MetricRegistry - No metrics reporter >> configured, no metrics will be exposed/reported. >> 2017-09-25 20:41:52.963 [main] INFO >> org.apache.flink.runtime.filecache.FileCache - User file cache uses >> directory >> /var/folders/37/6tz6x9x97d19_ltcm_1tl_f00000gp/T/flink-dist-cache-a99429d5-981d-4cd0-9eeb-5d6678c650f0 >> 2017-09-25 20:41:52.973 [flink-akka.actor.default-dispatcher-2] INFO >> org.apache.flink.runtime.filecache.FileCache - User file cache uses >> directory >> /var/folders/37/6tz6x9x97d19_ltcm_1tl_f00000gp/T/flink-dist-cache-dd359949-3837-461c-accc-41ebc67a1d8f >> 2017-09-25 20:41:52.974 [flink-akka.actor.default-dispatcher-2] INFO >> org.apache.flink.runtime.taskmanager.TaskManager - Starting TaskManager >> actor at akka://flink/user/taskmanager_1#1248014944. >> 2017-09-25 20:41:52.974 [flink-akka.actor.default-dispatcher-2] INFO >> org.apache.flink.runtime.taskmanager.TaskManager - TaskManager data >> connection information: d7308d8350e736f55357e74e04f5c106 @ localhost >> (dataPort=-1) >> 2017-09-25 20:41:52.975 [flink-akka.actor.default-dispatcher-2] INFO >> org.apache.flink.runtime.taskmanager.TaskManager - TaskManager has 8 task >> slot(s). >> 2017-09-25 20:41:52.976 [flink-akka.actor.default-dispatcher-2] INFO >> org.apache.flink.runtime.taskmanager.TaskManager - Memory usage stats: >> [HEAP: 385/684/3641 MB, NON HEAP: 35/36/-1 MB (used/committed/max)] >> 2017-09-25 20:41:52.980 [flink-akka.actor.default-dispatcher-2] INFO >> org.apache.flink.runtime.taskmanager.TaskManager - Trying to register at >> JobManager akka://flink/user/jobmanager_1 (attempt 1, timeout: 500 >> milliseconds) >> 2017-09-25 20:41:52.982 [flink-akka.actor.default-dispatcher-2] INFO >> o.a.f.r.c.standalone.StandaloneResourceManager - TaskManager >> d7308d8350e736f55357e74e04f5c106 has started. >> 2017-09-25 20:41:52.984 [flink-akka.actor.default-dispatcher-3] INFO >> org.apache.flink.runtime.instance.InstanceManager - Registered >> TaskManager at localhost (akka://flink/user/taskmanager_1) as >> 8ad7f698f504d459202cdb8a9d6a9b34. Current number of registered hosts is 1. >> Current number of alive task slots is 8. >> 2017-09-25 20:41:52.987 [flink-akka.actor.default-dispatcher-2] INFO >> org.apache.flink.runtime.taskmanager.TaskManager - Successful >> registration at JobManager (akka://flink/user/jobmanager_1), starting >> network stack and library cache. >> 2017-09-25 20:41:52.989 [flink-akka.actor.default-dispatcher-2] INFO >> org.apache.flink.runtime.taskmanager.TaskManager - Determined BLOB server >> address to be localhost/127.0.0.1:56706 >> <http://127.0.0.1:56706/>. Starting BLOB cache. >> 2017-09-25 20:41:52.990 [flink-akka.actor.default-dispatcher-2] INFO >> org.apache.flink.runtime.blob.BlobCache - Created BLOB cache storage >> directory >> /var/folders/37/6tz6x9x97d19_ltcm_1tl_f00000gp/T/blobStore-196b0de7-b951-4074-8865-a138283e66c1 >> 2017-09-25 20:41:52.999 [flink-akka.actor.default-dispatcher-3] INFO >> org.apache.flink.runtime.client.JobSubmissionClientActor - Received >> SubmitJobAndWait(JobGraph(jobId: 0f0d880310bc9098027c2e4877f999fb)) but >> there is no connection to a JobManager yet. >> 2017-09-25 20:41:52.999 [flink-akka.actor.default-dispatcher-3] INFO >> org.apache.flink.runtime.client.JobSubmissionClientActor - Received job >> Kafka 0.10 Example development (0f0d880310bc9098027c2e4877f999fb). >> 2017-09-25 20:41:53.000 [flink-akka.actor.default-dispatcher-3] INFO >> org.apache.flink.runtime.client.JobSubmissionClientActor - Disconnect >> from JobManager null. >> 2017-09-25 20:41:53.003 [flink-akka.actor.default-dispatcher-2] INFO >> org.apache.flink.runtime.client.JobSubmissionClientActor - Connect to >> JobManager Actor[akka://flink/user/jobmanager_1#-1948249729]. >> 2017-09-25 20:41:53.003 [flink-akka.actor.default-dispatcher-2] INFO >> org.apache.flink.runtime.client.JobSubmissionClientActor - Connected to >> JobManager at Actor[akka://flink/user/jobmanager_1#-1948249729] with >> leader session id d2f1c68f-2982-474f-8b7f-271d3f4e4192. >> 2017-09-25 20:41:53.004 [flink-akka.actor.default-dispatcher-2] INFO >> org.apache.flink.runtime.client.JobSubmissionClientActor - Sending >> message to JobManager akka://flink/user/jobmanager_1 to submit job Kafka >> 0.10 Example development (0f0d880310bc9098027c2e4877f999fb) and wait for >> progress >> 2017-09-25 20:41:53.005 [flink-akka.actor.default-dispatcher-3] INFO >> org.apache.flink.runtime.client.JobSubmissionClientActor - Upload jar >> files to job manager akka://flink/user/jobmanager_1. >> 2017-09-25 20:41:53.007 [flink-akka.actor.default-dispatcher-3] INFO >> org.apache.flink.runtime.client.JobSubmissionClientActor - Submit job to >> the job manager akka://flink/user/jobmanager_1. >> 2017-09-25 20:41:53.009 [flink-akka.actor.default-dispatcher-2] INFO >> org.apache.flink.runtime.jobmanager.JobManager - Submitting job >> 0f0d880310bc9098027c2e4877f999fb (Kafka 0.10 Example development). >> 2017-09-25 20:41:53.014 [flink-akka.actor.default-dispatcher-2] INFO >> org.apache.flink.runtime.jobmanager.JobManager - Using restart strategy >> FixedDelayRestartStrategy(maxNumberRestartAttempts=1, >> delayBetweenRestartAttempts=10000) for 0f0d880310bc9098027c2e4877f999fb. >> 2017-09-25 20:41:53.025 [flink-akka.actor.default-dispatcher-2] INFO >> org.apache.flink.runtime.executiongraph.ExecutionGraph - Job recovers via >> failover strategy: full graph restart >> 2017-09-25 20:41:53.055 [flink-akka.actor.default-dispatcher-2] INFO >> org.apache.flink.runtime.jobmanager.JobManager - Running initialization >> on master for job Kafka 0.10 Example development >> (0f0d880310bc9098027c2e4877f999fb). >> 2017-09-25 20:41:53.055 [flink-akka.actor.default-dispatcher-2] INFO >> org.apache.flink.runtime.jobmanager.JobManager - Successfully ran >> initialization on master in 0 ms. >> 2017-09-25 20:41:53.071 [flink-akka.actor.default-dispatcher-2] INFO >> org.apache.flink.runtime.jobmanager.JobManager - No state backend has >> been configured, using default state backend (Memory / JobManager) >> 2017-09-25 20:41:53.078 [flink-akka.actor.default-dispatcher-2] ERROR >> org.apache.flink.runtime.jobmanager.JobManager - Failed to submit job >> 0f0d880310bc9098027c2e4877f999fb (Kafka 0.10 Example development) >> java.lang.IllegalStateException: CheckpointConfig says to persist >> periodic checkpoints, but no checkpoint directory has been configured. You >> can configure configure one via key 'state.checkpoints.dir'. >> at >> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.<init>(CheckpointCoordinator.java:209) >> at >> org.apache.flink.runtime.executiongraph.ExecutionGraph.enableCheckpointing(ExecutionGraph.java:451) >> at >> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:278) >> at org.apache.flink.runtime.jobmanager.JobManager.org >> <http://org.apache.flink.runtime.jobmanager.jobmanager.org/> >> $apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1315) >> at >> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:495) >> at >> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) >> at >> org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:38) >> at >> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) >> at >> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33) >> at >> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28) >> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) >> at >> org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28) >> at akka.actor.Actor$class.aroundReceive(Actor.scala:467) >> at >> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:125) >> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) >> at akka.actor.ActorCell.invoke(ActorCell.scala:487) >> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) >> at akka.dispatch.Mailbox.run(Mailbox.scala:220) >> at >> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) >> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >> at >> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >> at >> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >> at >> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >> 2017-09-25 20:41:53.085 [flink-akka.actor.default-dispatcher-3] INFO >> org.apache.flink.runtime.client.JobSubmissionClientActor - Terminate >> JobClientActor. >> 2017-09-25 20:41:53.085 [flink-akka.actor.default-dispatcher-3] INFO >> org.apache.flink.runtime.client.JobSubmissionClientActor - Disconnect >> from JobManager Actor[akka://flink/user/jobmanager_1#-1948249729]. >> 2017-09-25 20:41:53.086 [main] INFO >> org.apache.flink.runtime.client.JobClient - Job execution failed >> 2017-09-25 20:41:53.086 [main] INFO >> org.apache.flink.runtime.minicluster.FlinkMiniCluster - Stopping >> FlinkMiniCluster. >> 2017-09-25 20:41:53.090 [flink-akka.actor.default-dispatcher-2] INFO >> org.apache.flink.runtime.taskmanager.TaskManager - Stopping TaskManager >> akka://flink/user/taskmanager_1#1248014944. >> 2017-09-25 20:41:53.090 [flink-akka.actor.default-dispatcher-4] INFO >> org.apache.flink.runtime.jobmanager.JobManager - Stopping JobManager >> akka://flink/user/jobmanager_1. >> 2017-09-25 20:41:53.091 [flink-akka.actor.default-dispatcher-2] INFO >> org.apache.flink.runtime.taskmanager.TaskManager - Disassociating from >> JobManager >> 2017-09-25 20:41:53.093 [flink-akka.actor.default-dispatcher-2] INFO >> org.apache.flink.runtime.blob.BlobCache - Shutting down BlobCache >> 2017-09-25 20:41:53.103 [flink-akka.actor.default-dispatcher-4] INFO >> org.apache.flink.runtime.blob.BlobServer - Stopped BLOB server at >> 0.0.0.0:56706 <http://0.0.0.0:56706/> >> 2017-09-25 20:41:53.104 [flink-akka.actor.default-dispatcher-2] INFO >> org.apache.flink.runtime.io.disk.iomanager.IOManager - I/O manager >> removed spill file directory >> /var/folders/37/6tz6x9x97d19_ltcm_1tl_f00000gp/T/flink-io-698c06ae-3d56-4765-b9b8-8a5d8b70ffd6 >> 2017-09-25 20:41:53.104 [flink-akka.actor.default-dispatcher-2] INFO >> org.apache.flink.runtime.io.network.NetworkEnvironment - Shutting down >> the network environment and its components. >> 2017-09-25 20:41:53.122 [flink-akka.actor.default-dispatcher-2] INFO >> org.apache.flink.runtime.taskmanager.TaskManager - Task manager >> akka://flink/user/taskmanager_1 is completely shut down. >> >> >> >