Thanks for the suggestion. I copied the application jar to lib. The error
doesn't come but I get another error related to Kafka ..

org.apache.kafka.common.KafkaException: Failed to construct kafka producer
at
org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:433)
at
org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:291)
at
org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer.<init>(FlinkKafkaProducer.java:114)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initProducer(FlinkKafkaProducer011.java:949)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:681)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:93)
at
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:359)
at
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.initializeState(TwoPhaseCommitSinkFunction.java:350)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initializeState(FlinkKafkaProducer011.java:856)
at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:258)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:694)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:682)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.kafka.common.KafkaException:
org.apache.kafka.common.serialization.ByteArraySerializer is not an
instance of org.apache.kafka.common.serialization.Serializer
at
org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:248)
at
org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:342)
... 17 more

regards.

On Wed, Feb 28, 2018 at 2:31 PM, Chesnay Schepler <ches...@apache.org>
wrote:

> Hello,
>
> this is probably caused by a known issue in 1.4.1:
> https://issues.apache.org/jira/browse/FLINK-8741
>
> This bug is not present in 1.4.0, and it will fixed in 1.4.2 which should
> be released within the next days.
>
> As a temporary workaround you can copy your app-assembly-1.0.jar into the
> /lib directory.
>
>
> On 28.02.2018 08:45, Debasish Ghosh wrote:
>
> Hi -
>
> Facing a ClassNotFoundException while running Flink application that reads
> from Kafka. This is a modified version of the NYC Taxi App that reads from
> Kafka.
>
> I am using Flink 1.4.1 .. The application runs ok with Flink 1.3 ..
>
> Here's the exception ..
>
> java.lang.ClassNotFoundException: com.lightbend.fdp.sample.
>> flink.app.TaxiRideTSAssigner
>> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>> at java.lang.Class.forName0(Native Method)
>> at java.lang.Class.forName(Class.java:348)
>> at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.
>> resolveClass(InstantiationUtil.java:73)
>> at java.io.ObjectInputStream.readNonProxyDesc(
>> ObjectInputStream.java:1620)
>> at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1521)
>> at java.io.ObjectInputStream.readOrdinaryObject(
>> ObjectInputStream.java:1781)
>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
>> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)
>> at org.apache.flink.util.InstantiationUtil.deserializeObject(
>> InstantiationUtil.java:393)
>> at org.apache.flink.util.InstantiationUtil.deserializeObject(
>> InstantiationUtil.java:380)
>> at org.apache.flink.util.InstantiationUtil.deserializeObject(
>> InstantiationUtil.java:368)
>> at org.apache.flink.util.SerializedValue.deserializeValue(
>> SerializedValue.java:58)
>> at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.
>> createPartitionStateHolders(AbstractFetcher.java:521)
>> at org.apache.flink.streaming.connectors.kafka.internals.
>> AbstractFetcher.<init>(AbstractFetcher.java:167)
>> at org.apache.flink.streaming.connectors.kafka.internal.
>> Kafka09Fetcher.<init>(Kafka09Fetcher.java:89)
>> at org.apache.flink.streaming.connectors.kafka.internal.
>> Kafka010Fetcher.<init>(Kafka010Fetcher.java:62)
>> at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010.
>> createFetcher(FlinkKafkaConsumer010.java:203)
>> at org.apache.flink.streaming.connectors.kafka.
>> FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:564)
>> at org.apache.flink.streaming.api.operators.StreamSource.
>> run(StreamSource.java:86)
>> at org.apache.flink.streaming.api.operators.StreamSource.
>> run(StreamSource.java:55)
>> at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(
>> SourceStreamTask.java:94)
>> at org.apache.flink.streaming.runtime.tasks.StreamTask.
>> invoke(StreamTask.java:264)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>> at java.lang.Thread.run(Thread.java:745)
>
>
> I run the application as follows ..
>
> $ ./bin/flink run 
> /Users/debasishghosh/flink/source/core/app/target/scala-2.11/app-assembly-1.0.jar
>> --broker-list localhost:9092 --inTopic taxiin --outTopic taxiout
>
>
> I verified that the jar contains the class ..
>
> $ jar tvf app-assembly-1.0.jar | grep TaxiRideTSAssigner
>>   2090 Wed Feb 28 12:59:52 IST 2018 com/lightbend/fdp/sample/
>> flink/app/TaxiRideTSAssigner.class
>
>
>
> Here are the relevant dependencies in build ..
>
> val flinkScala            =      "org.apache.flink"             %%
>  "flink-scala"                    % "1.4.1" % "provided"
> val flinkStreamingScala   =      "org.apache.flink"             %%
>  "flink-streaming-scala"          % "1.4.1" % "provided"
> val flinkKafka            =      "org.apache.flink"             %%
>  "flink-connector-kafka-0.11"     % "1.4.1" exclude("org.slf4j",
> "slf4j-log4j12")
>
>
> any help ?
>
> regards.
>
>
> --
> Debasish Ghosh
> http://manning.com/ghosh2
> http://manning.com/ghosh
>
> Twttr: @debasishg
> Blog: http://debasishg.blogspot.com
> Code: http://github.com/debasishg
>
>
>


-- 
Debasish Ghosh
http://manning.com/ghosh2
http://manning.com/ghosh

Twttr: @debasishg
Blog: http://debasishg.blogspot.com
Code: http://github.com/debasishg

Reply via email to