Hi Dawid, i used the official maven archetype for a Flink project based on scala from here[1]
mvn archetype:generate \ -DarchetypeGroupId=org.apache.flink \ -DarchetypeArtifactId=flink-quickstart-scala \ -DarchetypeVersion=1.11.2 [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/project-configuration.html#create-project Best, Jan Von: Dawid Wysakowicz <dwysakow...@apache.org> Gesendet: Donnerstag, 14. Januar 2021 12:42 An: Jan Oelschlegel <oelschle...@integration-factory.de>; user@flink.apache.org Betreff: Re: StreamingFileSink with ParquetAvroWriters Hi Jan Could you make sure you are packaging that dependency with your job jar? There are instructions how to configure your build setup[1]. Especially the part how to build a jar with dependencies might come in handy[2]. Best, Dawid [1] https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/project-configuration.html [2] https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/project-configuration.html#appendix-template-for-building-a-jar-with-dependencies On 13/01/2021 17:49, Jan Oelschlegel wrote: Hi, i'm using Flink (v.1.11.2) and would like to use the StreamingFileSink for writing into HDFS in Parquet format. As it says in the documentation I have added the dependencies: <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-parquet_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> And this is my file sink definition: val sink: StreamingFileSink[Event] = StreamingFileSink .forBulkFormat( new Path("hdfs://namenode.local:8020/user/datastream/"), ParquetAvroWriters.forReflectRecord(classOf[Event]) ) .build() If I execute this in cluster I get the following error: java.lang.NoClassDefFoundError: org/apache/parquet/avro/AvroParquetWriter at org.apache.flink.formats.parquet.avro.ParquetAvroWriters.createAvroParquetWriter(ParquetAvroWriters.java:84) at org.apache.flink.formats.parquet.avro.ParquetAvroWriters.lambda$forReflectRecord$3c375096$1(ParquetAvroWriters.java:73) at org.apache.flink.formats.parquet.ParquetWriterFactory.create(ParquetWriterFactory.java:57) at org.apache.flink.streaming.api.functions.sink.filesystem.BulkBucketWriter.openNew(BulkBucketWriter.java:69) at org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedPartFileWriter$OutputStreamBasedBucketWriter.openNewInProgressFile(OutputStreamBasedPartFileWriter.java:83) at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.rollPartFile(Bucket.java:209) at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:200) at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:282) at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.onElement(StreamingFileSinkHelper.java:104) at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.invoke(StreamingFileSink.java:420) at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310) at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409) at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352) at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:185) at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:213) Looks like there are some dependencies missing. How can I fix this? Jan O. HINWEIS: Dies ist eine vertrauliche Nachricht und nur für den Adressaten bestimmt. Es ist nicht erlaubt, diese Nachricht zu kopieren oder Dritten zugänglich zu machen. Sollten Sie diese Nachricht irrtümlich erhalten haben, bitte ich um Ihre Mitteilung per E-Mail oder unter der oben angegebenen Telefonnummer. HINWEIS: Dies ist eine vertrauliche Nachricht und nur für den Adressaten bestimmt. Es ist nicht erlaubt, diese Nachricht zu kopieren oder Dritten zugänglich zu machen. Sollten Sie diese Nachricht irrtümlich erhalten haben, bitte ich um Ihre Mitteilung per E-Mail oder unter der oben angegebenen Telefonnummer.