Thanks for the suggestion. I copied the application jar to lib. The error doesn't come but I get another error related to Kafka ..
org.apache.kafka.common.KafkaException: Failed to construct kafka producer at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:433) at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:291) at org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer.<init>(FlinkKafkaProducer.java:114) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initProducer(FlinkKafkaProducer011.java:949) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:681) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:93) at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:359) at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.initializeState(TwoPhaseCommitSinkFunction.java:350) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initializeState(FlinkKafkaProducer011.java:856) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:258) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:694) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:682) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.kafka.common.KafkaException: org.apache.kafka.common.serialization.ByteArraySerializer is not an instance of org.apache.kafka.common.serialization.Serializer at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:248) at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:342) ... 17 more regards. On Wed, Feb 28, 2018 at 2:31 PM, Chesnay Schepler <ches...@apache.org> wrote: > Hello, > > this is probably caused by a known issue in 1.4.1: > https://issues.apache.org/jira/browse/FLINK-8741 > > This bug is not present in 1.4.0, and it will fixed in 1.4.2 which should > be released within the next days. > > As a temporary workaround you can copy your app-assembly-1.0.jar into the > /lib directory. > > > On 28.02.2018 08:45, Debasish Ghosh wrote: > > Hi - > > Facing a ClassNotFoundException while running Flink application that reads > from Kafka. This is a modified version of the NYC Taxi App that reads from > Kafka. > > I am using Flink 1.4.1 .. The application runs ok with Flink 1.3 .. > > Here's the exception .. > > java.lang.ClassNotFoundException: com.lightbend.fdp.sample. >> flink.app.TaxiRideTSAssigner >> at java.net.URLClassLoader.findClass(URLClassLoader.java:381) >> at java.lang.ClassLoader.loadClass(ClassLoader.java:424) >> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) >> at java.lang.ClassLoader.loadClass(ClassLoader.java:357) >> at java.lang.Class.forName0(Native Method) >> at java.lang.Class.forName(Class.java:348) >> at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream. >> resolveClass(InstantiationUtil.java:73) >> at java.io.ObjectInputStream.readNonProxyDesc( >> ObjectInputStream.java:1620) >> at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1521) >> at java.io.ObjectInputStream.readOrdinaryObject( >> ObjectInputStream.java:1781) >> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) >> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373) >> at org.apache.flink.util.InstantiationUtil.deserializeObject( >> InstantiationUtil.java:393) >> at org.apache.flink.util.InstantiationUtil.deserializeObject( >> InstantiationUtil.java:380) >> at org.apache.flink.util.InstantiationUtil.deserializeObject( >> InstantiationUtil.java:368) >> at org.apache.flink.util.SerializedValue.deserializeValue( >> SerializedValue.java:58) >> at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher. >> createPartitionStateHolders(AbstractFetcher.java:521) >> at org.apache.flink.streaming.connectors.kafka.internals. >> AbstractFetcher.<init>(AbstractFetcher.java:167) >> at org.apache.flink.streaming.connectors.kafka.internal. >> Kafka09Fetcher.<init>(Kafka09Fetcher.java:89) >> at org.apache.flink.streaming.connectors.kafka.internal. >> Kafka010Fetcher.<init>(Kafka010Fetcher.java:62) >> at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010. >> createFetcher(FlinkKafkaConsumer010.java:203) >> at org.apache.flink.streaming.connectors.kafka. >> FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:564) >> at org.apache.flink.streaming.api.operators.StreamSource. >> run(StreamSource.java:86) >> at org.apache.flink.streaming.api.operators.StreamSource. >> run(StreamSource.java:55) >> at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run( >> SourceStreamTask.java:94) >> at org.apache.flink.streaming.runtime.tasks.StreamTask. >> invoke(StreamTask.java:264) >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) >> at java.lang.Thread.run(Thread.java:745) > > > I run the application as follows .. > > $ ./bin/flink run > /Users/debasishghosh/flink/source/core/app/target/scala-2.11/app-assembly-1.0.jar >> --broker-list localhost:9092 --inTopic taxiin --outTopic taxiout > > > I verified that the jar contains the class .. > > $ jar tvf app-assembly-1.0.jar | grep TaxiRideTSAssigner >> 2090 Wed Feb 28 12:59:52 IST 2018 com/lightbend/fdp/sample/ >> flink/app/TaxiRideTSAssigner.class > > > > Here are the relevant dependencies in build .. > > val flinkScala = "org.apache.flink" %% > "flink-scala" % "1.4.1" % "provided" > val flinkStreamingScala = "org.apache.flink" %% > "flink-streaming-scala" % "1.4.1" % "provided" > val flinkKafka = "org.apache.flink" %% > "flink-connector-kafka-0.11" % "1.4.1" exclude("org.slf4j", > "slf4j-log4j12") > > > any help ? > > regards. > > > -- > Debasish Ghosh > http://manning.com/ghosh2 > http://manning.com/ghosh > > Twttr: @debasishg > Blog: http://debasishg.blogspot.com > Code: http://github.com/debasishg > > > -- Debasish Ghosh http://manning.com/ghosh2 http://manning.com/ghosh Twttr: @debasishg Blog: http://debasishg.blogspot.com Code: http://github.com/debasishg