[ 
https://issues.apache.org/jira/browse/FLINK-20617?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17250821#comment-17250821
 ] 

Yang Wang commented on FLINK-20617:
-----------------------------------

[~Georger] Do you have a test via adding 
"-Dyarn.per-job-cluster.include-user-jar=DISABLED" in you submission command?

 

I am copying the analysis on the ML.

For application mode, the job submission happens in the JobManager side. We are 
using an embedded client to submit the job. So the user jar will be added to 
distributed cache. When deploying a task to TaskManager, it will be downloaded 
again and run in user classloader even though we already have it in the system 
classpath. I think it might be the reason why these classes are loaded by 
different classloaders.

 

For per-job mode, we are recovering the job and the user jars will not be added 
to distributed cache.

 

Please also refer the discussion here[1]. 

[1]. 
https://lists.apache.org/list.html?u...@flink.apache.org:lte=1M:failed%20w%2F%20Application%20Mode%20but%20succeeded%20w%2F%20Per-Job%20Cluster%20Mode

> Kafka Consumer Deserializer Exception on application mode
> ---------------------------------------------------------
>
>                 Key: FLINK-20617
>                 URL: https://issues.apache.org/jira/browse/FLINK-20617
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka
>    Affects Versions: 1.11.2
>         Environment: application mode
> flink 1.11.2 with  hadoop 2.6.0-cdh5.15.0
>            Reporter: Georger
>            Priority: Critical
>         Attachments: taskmanager.out
>
>
> Kafka source may has some issues on application mode
>  
> when i run it with application mode on  flink 1.11.2 it can't startup
> the detail Excetion is:
> org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
>     at 
> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:789)
>     at 
> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:643)
>     at 
> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:623)
>     at 
> org.apache.flink.streaming.connectors.kafka.internal.KafkaPartitionDiscoverer.initializeConnections(KafkaPartitionDiscoverer.java:58)
>     at 
> org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.open(AbstractPartitionDiscoverer.java:94)
>     at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:550)
>     at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>     at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
>     at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:479)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528)
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
>     at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.kafka.common.KafkaException: 
> org.apache.kafka.common.serialization.ByteArrayDeserializer is not an 
> instance of org.apache.kafka.common.serialization.Deserializer
>     at 
> org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:263)
>     at 
> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:688)
>     ... 15 more
> The pom is:
> <dependency>
>  <groupId>org.apache.flink</groupId>
>  <artifactId>flink-connector-kafka_2.11</artifactId>
>  <version>${flink.version}</version>
>  <exclusions>
>  <exclusion>
>  <groupId>org.slf4j</groupId>
>  <artifactId>slf4j-api</artifactId>
>  </exclusion>
>  <exclusion>
>  <groupId>org.apache.kafka</groupId>
>  <artifactId>kafka-clients</artifactId>
>  </exclusion>
>  </exclusions>
> </dependency>
> <dependency>
>  <groupId>org.apache.kafka</groupId>
>  <artifactId>kafka-clients</artifactId>
>  <version>1.0.1</version>
> </dependency>



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to