[ https://issues.apache.org/jira/browse/FLINK-20617?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17250874#comment-17250874 ]
Yang Wang commented on FLINK-20617: ----------------------------------- [~Georger] Thanks for your verification. I believe it is a limitation of the current application mode. Actually, we do not need to add the user jar to system classpath in application mode. Since it will always be added to distributed cache and then pulled by TaskManager. cc [~kkl0u], [~aljoscha], do you think it makes sense to not put user jars in the system classpath in application mode? > Kafka Consumer Deserializer Exception on application mode > --------------------------------------------------------- > > Key: FLINK-20617 > URL: https://issues.apache.org/jira/browse/FLINK-20617 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka > Affects Versions: 1.11.2 > Environment: application mode > flink 1.11.2 with hadoop 2.6.0-cdh5.15.0 > Reporter: Georger > Priority: Critical > Attachments: taskmanager.out > > > Kafka source may has some issues on application mode > > when i run it with application mode on flink 1.11.2 it can't startup > the detail Excetion is: > org.apache.kafka.common.KafkaException: Failed to construct kafka consumer > at > org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:789) > at > org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:643) > at > org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:623) > at > org.apache.flink.streaming.connectors.kafka.internal.KafkaPartitionDiscoverer.initializeConnections(KafkaPartitionDiscoverer.java:58) > at > org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.open(AbstractPartitionDiscoverer.java:94) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:550) > at > org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:479) > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) > at java.lang.Thread.run(Thread.java:745) > Caused by: org.apache.kafka.common.KafkaException: > org.apache.kafka.common.serialization.ByteArrayDeserializer is not an > instance of org.apache.kafka.common.serialization.Deserializer > at > org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:263) > at > org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:688) > ... 15 more > The pom is: > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-connector-kafka_2.11</artifactId> > <version>${flink.version}</version> > <exclusions> > <exclusion> > <groupId>org.slf4j</groupId> > <artifactId>slf4j-api</artifactId> > </exclusion> > <exclusion> > <groupId>org.apache.kafka</groupId> > <artifactId>kafka-clients</artifactId> > </exclusion> > </exclusions> > </dependency> > <dependency> > <groupId>org.apache.kafka</groupId> > <artifactId>kafka-clients</artifactId> > <version>1.0.1</version> > </dependency> -- This message was sent by Atlassian Jira (v8.3.4#803005)