Have you checked if the class
(org/apache/parquet/avro/AvroParquetWriter) is in the jar that you are
submitting.

On 15/01/2021 12:05, Jan Oelschlegel wrote:
>
> Hi Dawid,
>
>  
>
> i used the official maven archetype for a Flink project based on scala
> from here[1]
>
>  
>
> mvn archetype:generate                               \
>
>   -DarchetypeGroupId=org.apache.flink              \
>
>   -DarchetypeArtifactId=flink-quickstart-scala      \
>
>   -DarchetypeVersion=1.11.2
>
>  
>
>  
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/project-configuration.html#create-project
>
>  
>
> Best,
>
> Jan
>
>  
>
>  
>
> *Von:*Dawid Wysakowicz <dwysakow...@apache.org>
> *Gesendet:* Donnerstag, 14. Januar 2021 12:42
> *An:* Jan Oelschlegel <oelschle...@integration-factory.de>;
> user@flink.apache.org
> *Betreff:* Re: StreamingFileSink with ParquetAvroWriters
>
>  
>
> Hi Jan
>
> Could you make sure you are packaging that dependency with your job
> jar? There are instructions how to configure your build setup[1].
> Especially the part how to build a jar with dependencies might come in
> handy[2].
>
> Best,
>
> Dawid
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/project-configuration.html
>
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/project-configuration.html#appendix-template-for-building-a-jar-with-dependencies
>
> On 13/01/2021 17:49, Jan Oelschlegel wrote:
>
>     Hi,
>
>      
>
>     i’m using Flink (v.1.11.2) and would like to use the
>     StreamingFileSink for writing into HDFS in Parquet format.
>
>      
>
>     As it says in the documentation I have added the dependencies:
>
>      
>
>     <dependency>
>        <groupId>org.apache.flink</groupId>
>        <artifactId>flink-parquet_${scala.binary.version}</artifactId>
>        <version>${flink.version}</version>
>     </dependency>
>
>      
>
>     And this is my file sink definition:
>
>      
>
>     val sink: StreamingFileSink[Event] = StreamingFileSink
>
>       ./forBulkFormat/(
>
>         new Path("hdfs://namenode.local:8020/user/datastream/"),
>
>         ParquetAvroWriters./forReflectRecord/(/classOf/[Event])
>
>       )
>
>       .build()
>
>      
>
>      
>
>     If I execute this in cluster I get the following error:
>
>      
>
>     java.lang.NoClassDefFoundError:
>     org/apache/parquet/avro/AvroParquetWriter
>
>         at
>     
> org.apache.flink.formats.parquet.avro.ParquetAvroWriters.createAvroParquetWriter(ParquetAvroWriters.java:84)
>
>         at
>     
> org.apache.flink.formats.parquet.avro.ParquetAvroWriters.lambda$forReflectRecord$3c375096$1(ParquetAvroWriters.java:73)
>
>         at
>     
> org.apache.flink.formats.parquet.ParquetWriterFactory.create(ParquetWriterFactory.java:57)
>
>         at
>     
> org.apache.flink.streaming.api.functions.sink.filesystem.BulkBucketWriter.openNew(BulkBucketWriter.java:69)
>
>         at
>     
> org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedPartFileWriter$OutputStreamBasedBucketWriter.openNewInProgressFile(OutputStreamBasedPartFileWriter.java:83)
>
>         at
>     
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.rollPartFile(Bucket.java:209)
>
>         at
>     
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:200)
>
>         at
>     
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:282)
>
>         at
>     
> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.onElement(StreamingFileSinkHelper.java:104)
>
>         at
>     
> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.invoke(StreamingFileSink.java:420)
>
>         at
>     
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
>
>         at
>     
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
>
>         at
>     
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
>
>         at
>     
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
>
>         at
>     
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
>
>         at
>     
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
>
>         at
>     
> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310)
>
>         at
>     
> org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409)
>
>         at
>     
> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352)
>
>         at
>     
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:185)
>
>         at
>     
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
>
>         at
>     
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
>
>         at
>     
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
>
>         at
>     
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
>
>         at
>     
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:213)
>
>      
>
>      
>
>     Looks like there are some dependencies missing. How can I fix this?
>
>      
>
>      
>
>     Jan O.
>
>     HINWEIS: Dies ist eine vertrauliche Nachricht und nur für den
>     Adressaten bestimmt. Es ist nicht erlaubt, diese Nachricht zu
>     kopieren oder Dritten zugänglich zu machen. Sollten Sie diese
>     Nachricht irrtümlich erhalten haben, bitte ich um Ihre Mitteilung
>     per E-Mail oder unter der oben angegebenen Telefonnummer.
>
> HINWEIS: Dies ist eine vertrauliche Nachricht und nur für den
> Adressaten bestimmt. Es ist nicht erlaubt, diese Nachricht zu kopieren
> oder Dritten zugänglich zu machen. Sollten Sie diese Nachricht
> irrtümlich erhalten haben, bitte ich um Ihre Mitteilung per E-Mail
> oder unter der oben angegebenen Telefonnummer. 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to