Hi, We regularly see the following two exceptions in a number of jobs shortly after they have been resumed during our flink cluster startup:
org.apache.kafka.common.KafkaException: Error registering mbean kafka.consumer:type=consumer-node-metrics,client-id=consumer-1,node-id=node--1 at org.apache.kafka.common.metrics.JmxReporter.reregister(JmxReporter.java:159) at org.apache.kafka.common.metrics.JmxReporter.metricChange(JmxReporter.java:77) at org.apache.kafka.common.metrics.Metrics.registerMetric(Metrics.java:436) at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:249) at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:234) at org.apache.kafka.common.network.Selector$SelectorMetrics.maybeRegisterConnectionMetrics(Selector.java:749) at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:327) at org.apache.kafka.common.network.Selector.poll(Selector.java:303) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:188) at org.apache.kafka.clients.consumer.internals.Fetcher.getTopicMetadata(Fetcher.java:283) at org.apache.kafka.clients.consumer.KafkaConsumer.partitionsFor(KafkaConsumer.java:1344) at org.apache.flink.streaming.connectors.kafka.internal.Kafka09PartitionDiscoverer.getAllPartitionsForTopics(Kafka09PartitionDiscoverer.java:77) at org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.discoverPartitions(AbstractPartitionDiscoverer.java:131) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:473) at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) at java.lang.Thread.run(Thread.java:748) Caused by: javax.management.InstanceAlreadyExistsException: kafka.consumer:type=consumer-node-metrics,client-id=consumer-1,node-id=node--1 at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437) at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898) at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966) at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900) at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324) at com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522) at org.apache.kafka.common.metrics.JmxReporter.reregister(JmxReporter.java:157) ... 21 more java.lang.Exception: Failed to send ExecutionStateChange notification to JobManager at org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$org$apache$flink$runtime$taskmanager$TaskManager$$handleTaskMessage$3$$anonfun$apply$2.apply(TaskManager.scala:439) at org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$org$apache$flink$runtime$taskmanager$TaskManager$$handleTaskMessage$3$$anonfun$apply$2.apply(TaskManager.scala:423) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36) at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55) at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91) at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91) at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91) at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72) at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90) at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka.tcp://fl...@ip-10-150-24-22.eu-west-1.compute.internal:41775/user/jobmanager#163569829]] after [30000 ms]. Sender[null] sent message of type "org.apache.flink.runtime.messages.JobManagerMessages$LeaderSessionMessage". at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604) at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126) at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601) at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109) at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599) at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329) at akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280) at akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284) at akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236) at java.lang.Thread.run(Thread.java:748) Neither seem related to the job code at all, but seem to be problems with the metrics on the flink connector and something internal to flink. They seem to happen once at startup, and don't repeat once the cluster reaches a steady state. The jobs also appear to be running correctly in spite of these Exceptions appearing in their "Exception" tab in the jobmanager. Is there something that we need to fix in our setup? Are there any implications around missing metrics etc? Best regards, Mark Harris -- hivehome.com <http://www.hivehome.com> Hive | London | Cambridge | Houston | Toronto**** The information contained in or attached to this email is confidential and intended only for the use of the individual(s) to which it is addressed. It may contain information which is confidential and/or covered by legal professional or other privilege. The views expressed in this email are not necessarily the views of Centrica plc, and the company, its directors, officers or employees make no representation or accept any liability for their accuracy or completeness unless expressly stated to the contrary. **** Centrica Hive Limited (company no: 5782908), registered in England and Wales with its registered office at Millstream, Maidenhead Road, Windsor, Berkshire SL4 5GD.