[ https://issues.apache.org/jira/browse/FLINK-20617?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17250821#comment-17250821 ]
Yang Wang commented on FLINK-20617: ----------------------------------- [~Georger] Do you have a test via adding "-Dyarn.per-job-cluster.include-user-jar=DISABLED" in you submission command? I am copying the analysis on the ML. For application mode, the job submission happens in the JobManager side. We are using an embedded client to submit the job. So the user jar will be added to distributed cache. When deploying a task to TaskManager, it will be downloaded again and run in user classloader even though we already have it in the system classpath. I think it might be the reason why these classes are loaded by different classloaders. For per-job mode, we are recovering the job and the user jars will not be added to distributed cache. Please also refer the discussion here[1]. [1]. https://lists.apache.org/list.html?u...@flink.apache.org:lte=1M:failed%20w%2F%20Application%20Mode%20but%20succeeded%20w%2F%20Per-Job%20Cluster%20Mode > Kafka Consumer Deserializer Exception on application mode > --------------------------------------------------------- > > Key: FLINK-20617 > URL: https://issues.apache.org/jira/browse/FLINK-20617 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka > Affects Versions: 1.11.2 > Environment: application mode > flink 1.11.2 with hadoop 2.6.0-cdh5.15.0 > Reporter: Georger > Priority: Critical > Attachments: taskmanager.out > > > Kafka source may has some issues on application mode > > when i run it with application mode on flink 1.11.2 it can't startup > the detail Excetion is: > org.apache.kafka.common.KafkaException: Failed to construct kafka consumer > at > org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:789) > at > org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:643) > at > org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:623) > at > org.apache.flink.streaming.connectors.kafka.internal.KafkaPartitionDiscoverer.initializeConnections(KafkaPartitionDiscoverer.java:58) > at > org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.open(AbstractPartitionDiscoverer.java:94) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:550) > at > org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:479) > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) > at java.lang.Thread.run(Thread.java:745) > Caused by: org.apache.kafka.common.KafkaException: > org.apache.kafka.common.serialization.ByteArrayDeserializer is not an > instance of org.apache.kafka.common.serialization.Deserializer > at > org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:263) > at > org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:688) > ... 15 more > The pom is: > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-connector-kafka_2.11</artifactId> > <version>${flink.version}</version> > <exclusions> > <exclusion> > <groupId>org.slf4j</groupId> > <artifactId>slf4j-api</artifactId> > </exclusion> > <exclusion> > <groupId>org.apache.kafka</groupId> > <artifactId>kafka-clients</artifactId> > </exclusion> > </exclusions> > </dependency> > <dependency> > <groupId>org.apache.kafka</groupId> > <artifactId>kafka-clients</artifactId> > <version>1.0.1</version> > </dependency> -- This message was sent by Atlassian Jira (v8.3.4#803005)