Have you checked if the class (org/apache/parquet/avro/AvroParquetWriter) is in the jar that you are submitting.
On 15/01/2021 12:05, Jan Oelschlegel wrote: > > Hi Dawid, > > > > i used the official maven archetype for a Flink project based on scala > from here[1] > > > > mvn archetype:generate \ > > -DarchetypeGroupId=org.apache.flink \ > > -DarchetypeArtifactId=flink-quickstart-scala \ > > -DarchetypeVersion=1.11.2 > > > > > > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/project-configuration.html#create-project > > > > Best, > > Jan > > > > > > *Von:*Dawid Wysakowicz <dwysakow...@apache.org> > *Gesendet:* Donnerstag, 14. Januar 2021 12:42 > *An:* Jan Oelschlegel <oelschle...@integration-factory.de>; > user@flink.apache.org > *Betreff:* Re: StreamingFileSink with ParquetAvroWriters > > > > Hi Jan > > Could you make sure you are packaging that dependency with your job > jar? There are instructions how to configure your build setup[1]. > Especially the part how to build a jar with dependencies might come in > handy[2]. > > Best, > > Dawid > > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/project-configuration.html > > [2] > https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/project-configuration.html#appendix-template-for-building-a-jar-with-dependencies > > On 13/01/2021 17:49, Jan Oelschlegel wrote: > > Hi, > > > > i’m using Flink (v.1.11.2) and would like to use the > StreamingFileSink for writing into HDFS in Parquet format. > > > > As it says in the documentation I have added the dependencies: > > > > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-parquet_${scala.binary.version}</artifactId> > <version>${flink.version}</version> > </dependency> > > > > And this is my file sink definition: > > > > val sink: StreamingFileSink[Event] = StreamingFileSink > > ./forBulkFormat/( > > new Path("hdfs://namenode.local:8020/user/datastream/"), > > ParquetAvroWriters./forReflectRecord/(/classOf/[Event]) > > ) > > .build() > > > > > > If I execute this in cluster I get the following error: > > > > java.lang.NoClassDefFoundError: > org/apache/parquet/avro/AvroParquetWriter > > at > > org.apache.flink.formats.parquet.avro.ParquetAvroWriters.createAvroParquetWriter(ParquetAvroWriters.java:84) > > at > > org.apache.flink.formats.parquet.avro.ParquetAvroWriters.lambda$forReflectRecord$3c375096$1(ParquetAvroWriters.java:73) > > at > > org.apache.flink.formats.parquet.ParquetWriterFactory.create(ParquetWriterFactory.java:57) > > at > > org.apache.flink.streaming.api.functions.sink.filesystem.BulkBucketWriter.openNew(BulkBucketWriter.java:69) > > at > > org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedPartFileWriter$OutputStreamBasedBucketWriter.openNewInProgressFile(OutputStreamBasedPartFileWriter.java:83) > > at > > org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.rollPartFile(Bucket.java:209) > > at > > org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:200) > > at > > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:282) > > at > > org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.onElement(StreamingFileSinkHelper.java:104) > > at > > org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.invoke(StreamingFileSink.java:420) > > at > > org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56) > > at > > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) > > at > > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) > > at > > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) > > at > > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) > > at > > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) > > at > > org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310) > > at > > org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409) > > at > > org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352) > > at > > org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:185) > > at > > org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141) > > at > > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755) > > at > > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) > > at > > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) > > at > > org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:213) > > > > > > Looks like there are some dependencies missing. How can I fix this? > > > > > > Jan O. > > HINWEIS: Dies ist eine vertrauliche Nachricht und nur für den > Adressaten bestimmt. Es ist nicht erlaubt, diese Nachricht zu > kopieren oder Dritten zugänglich zu machen. Sollten Sie diese > Nachricht irrtümlich erhalten haben, bitte ich um Ihre Mitteilung > per E-Mail oder unter der oben angegebenen Telefonnummer. > > HINWEIS: Dies ist eine vertrauliche Nachricht und nur für den > Adressaten bestimmt. Es ist nicht erlaubt, diese Nachricht zu kopieren > oder Dritten zugänglich zu machen. Sollten Sie diese Nachricht > irrtümlich erhalten haben, bitte ich um Ihre Mitteilung per E-Mail > oder unter der oben angegebenen Telefonnummer.
signature.asc
Description: OpenPGP digital signature