In order to answer this question you would need to figure out where the
second parquet-avro dependency comes from. You can check your job via `mvn
dependency:tree` and then check whether you have another dependency which
pulls in parquet-avro. Another source where the additional dependency could
come from is the deployment. If you deploy your cluster on Yarn, then you
can get the Hadoop dependencies on your classpath. This is another thing
you might wanna check.

Cheers,
Till

On Thu, Feb 4, 2021 at 12:56 PM Jan Oelschlegel <
oelschle...@integration-factory.de> wrote:

> Okay, this is helpful. The problem arrives when adding parquet-avro to the
> dependencies. But the the question is, why do I need this dependency? I is
> not mentioned in the docs and I’m using standard setup for writing into
> hdfs with parquet format, nothing special.
>
>
>
>
>
> Best,
>
> Jan
>
>
>
> *Von:* Till Rohrmann <trohrm...@apache.org>
> *Gesendet:* Donnerstag, 4. Februar 2021 10:08
> *An:* Jan Oelschlegel <oelschle...@integration-factory.de>
> *Cc:* user <user@flink.apache.org>
> *Betreff:* Re: AbstractMethodError while writing to parquet
>
>
>
> I guess it depends from where the other dependency is coming. If you have
> multiple dependencies which conflict then you have to resolve it. One way
> to detect these things is to configure dependency convergence [1].
>
>
>
> [1]
> https://maven.apache.org/enforcer/enforcer-rules/dependencyConvergence.html
>
>
>
> Cheers,
>
> Till
>
>
>
> On Wed, Feb 3, 2021 at 6:34 PM Jan Oelschlegel <
> oelschle...@integration-factory.de> wrote:
>
> Hi Till,
>
>
>
> thanks for hint. I checked it and found a version conflict with
> flink-parquet.
>
>
>
> With this version it is running:
>
>
>
>
>
> <dependency>
>     <groupId>org.apache.parquet</groupId>
>     <artifactId>parquet-avro</artifactId>
>     <version>1.10.0</version>
> </dependency>
>
>
>
>
>
> But how can I avoid this in the future? I had to add parquet-avro, because
> without there were some errors. Do I have to lookup such conflicts manually
> and then choose the same version like at flink dependencies ?
>
>
>
>
>
> Best,
>
> Jan
>
>
>
> *Von:* Till Rohrmann <trohrm...@apache.org>
> *Gesendet:* Mittwoch, 3. Februar 2021 11:41
> *An:* Jan Oelschlegel <oelschle...@integration-factory.de>
> *Cc:* user <user@flink.apache.org>
> *Betreff:* Re: AbstractMethodError while writing to parquet
>
>
>
> Hi Jan,
>
>
>
> it looks to me that you might have different parquet-avro dependencies on
> your class path. Could you make sure that you don't have different versions
> of the library on your classpath?
>
>
>
> Cheers,
>
> Till
>
>
>
> On Tue, Feb 2, 2021 at 3:29 PM Jan Oelschlegel <
> oelschle...@integration-factory.de> wrote:
>
> Hi at all,
>
>
>
> i’m using Flink 1.11 with the datastream api. I would like to write my
> results in parquet format into HDFS.
>
>
>
> Therefore i generated an Avro SpecificRecord with the avro-maven-plugin:
>
> <plugin>
>                 <groupId>org.apache.avro</groupId>
>                 <artifactId>avro-maven-plugin</artifactId>
>                 <version>1.8.2</version>
>                 <executions>
>                     <execution>
>                         <phase>generate-sources</phase>
>                         <goals>
>                             <goal>schema</goal>
>                         </goals>
>                         <configuration>
>                             
> <sourceDirectory>src/main/resources/avro/</sourceDirectory>
>                             
> <outputDirectory>${project.basedir}/target/generated-sources/</outputDirectory>
>                             <stringType>String</stringType>
>                         </configuration>
>                     </execution>
>                 </executions>
>             </plugin>
>
>
>
>
>
> Then  I’m using the SpecificRecord in the StreamingFileSink:
>
> val sink: StreamingFileSink[SpecificRecord] = StreamingFileSink
>   .*forBulkFormat*(
>     new Path("hdfs://example.com:8020/data/"),
>     ParquetAvroWriters.*forSpecificRecord*(*classOf*[SpecificRecord])
>   )
>   .build()
>
>
>
>
>
> The job cancels with the following error:
>
>
>
>
>
> java.lang.AbstractMethodError: org.apache.parquet.hadoop.
> ColumnChunkPageWriteStore$ColumnChunkPageWriter.writePage(Lorg
> /apache/parquet/bytes/BytesInput;IILorg/apache/parquet/column/statistics/
> Statistics;Lorg/apache/parquet/column/Encoding;Lorg/apache/parquet/column/
> Encoding;Lorg/apache/parquet/column/Encoding;)V
>
>     at org.apache.parquet.column.impl.ColumnWriterV1.writePage(
> ColumnWriterV1.java:53)
>
>     at org.apache.parquet.column.impl.ColumnWriterBase.writePage(
> ColumnWriterBase.java:315)
>
>     at org.apache.parquet.column.impl.ColumnWriteStoreBase.sizeCheck(
> ColumnWriteStoreBase.java:200)
>
>     at org.apache.parquet.column.impl.ColumnWriteStoreBase.endRecord(
> ColumnWriteStoreBase.java:187)
>
>     at org.apache.parquet.column.impl.ColumnWriteStoreV1.endRecord(
> ColumnWriteStoreV1.java:27)
>
>     at org.apache.parquet.io.MessageColumnIO$MessageColumnIORecordConsumer
> .endMessage(MessageColumnIO.java:307)
>
>     at org.apache.parquet.avro.AvroWriteSupport.write(AvroWriteSupport
> .java:166)
>
>     at org.apache.parquet.hadoop.InternalParquetRecordWriter.write(
> InternalParquetRecordWriter.java:128)
>
>     at org.apache.parquet.hadoop.ParquetWriter.write(ParquetWriter.java:
> 299)
>
>     at org.apache.flink.formats.parquet.ParquetBulkWriter.addElement(
> ParquetBulkWriter.java:52)
>
>     at org.apache.flink.streaming.api.functions.sink.filesystem.
> BulkPartWriter.write(BulkPartWriter.java:48)
>
>     at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket
> .write(Bucket.java:202)
>
>     at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets
> .onElement(Buckets.java:282)
>
>     at org.apache.flink.streaming.api.functions.sink.filesystem.
> StreamingFileSinkHelper.onElement(StreamingFileSinkHelper.java:104)
>
>     at org.apache.flink.streaming.api.functions.sink.filesystem.
> StreamingFileSink.invoke(StreamingFileSink.java:420)
>
>     at org.apache.flink.streaming.api.operators.StreamSink.processElement(
> StreamSink.java:56)
>
>     at org.apache.flink.streaming.runtime.tasks.
> OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
>
>     at org.apache.flink.streaming.runtime.tasks.
> OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
>
>     at org.apache.flink.streaming.runtime.tasks.
> OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
>
>     at org.apache.flink.streaming.api.operators.CountingOutput.collect(
> CountingOutput.java:52)
>
>     at org.apache.flink.streaming.api.operators.CountingOutput.collect(
> CountingOutput.java:30)
>
>     at org.apache.flink.streaming.api.operators.StreamMap.processElement(
> StreamMap.java:41)
>
>     at org.apache.flink.streaming.runtime.tasks.
> OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
>
>     at org.apache.flink.streaming.runtime.tasks.
> OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
>
>     at org.apache.flink.streaming.runtime.tasks.
> OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
>
>     at org.apache.flink.streaming.api.operators.CountingOutput.collect(
> CountingOutput.java:52)
>
>     at org.apache.flink.streaming.api.operators.CountingOutput.collect(
> CountingOutput.java:30)
>
>     at org.apache.flink.streaming.api.operators.TimestampedCollector.
> collect(TimestampedCollector.java:53)
>
>     at de.integration_factory.datastream.aggregations.CasesProcessFunction
> .process(CasesProcessFunction.scala:34)
>
>     at de.integration_factory.datastream.aggregations.CasesProcessFunction
> .process(CasesProcessFunction.scala:11)
>
>     at org.apache.flink.streaming.api.scala.function.util.
> ScalaProcessWindowFunctionWrapper.process(
> ScalaProcessWindowFunctionWrapper.scala:63)
>
>     at org.apache.flink.streaming.runtime.operators.windowing.functions.
> InternalIterableProcessWindowFunction.process(
> InternalIterableProcessWindowFunction.java:50)
>
>     at org.apache.flink.streaming.runtime.operators.windowing.functions.
> InternalIterableProcessWindowFunction.process(
> InternalIterableProcessWindowFunction.java:32)
>
>     at org.apache.flink.streaming.runtime.operators.windowing.
> WindowOperator.emitWindowContents(WindowOperator.java:549)
>
>     at org.apache.flink.streaming.runtime.operators.windowing.
> WindowOperator.onEventTime(WindowOperator.java:457)
>
>     at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl
> .advanceWatermark(InternalTimerServiceImpl.java:276)
>
>     at org.apache.flink.streaming.api.operators.InternalTimeServiceManager
> .advanceWatermark(InternalTimeServiceManager.java:154)
>
>     at org.apache.flink.streaming.api.operators.AbstractStreamOperator
> .processWatermark(AbstractStreamOperator.java:568)
>
>     at org.apache.flink.streaming.runtime.tasks.
> OneInputStreamTask$StreamTaskNetworkOutput.emitWatermark(
> OneInputStreamTask.java:167)
>
>     at org.apache.flink.streaming.runtime.streamstatus.
> StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(
> StatusWatermarkValve.java:179)
>
>     at org.apache.flink.streaming.runtime.streamstatus.
> StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:101)
>
>     at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
> .processElement(StreamTaskNetworkInput.java:180)
>
>     at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
> .emitNext(StreamTaskNetworkInput.java:153)
>
>     at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
> .processInput(StreamOneInputProcessor.java:67)
>
>     at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(
> StreamTask.java:351)
>
>     at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
> .runMailboxStep(MailboxProcessor.java:191)
>
>     at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
> .runMailboxLoop(MailboxProcessor.java:181)
>
>     at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(
> StreamTask.java:566)
>
>     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
> StreamTask.java:536)
>
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
>
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
>
>     at java.lang.Thread.run(Thread.java:748)
>
>
>
>
>
>
>
> What can I do to fix this?
>
>
>
>
>
> Best,
>
> Jan
>
>
>
> HINWEIS: Dies ist eine vertrauliche Nachricht und nur für den Adressaten
> bestimmt. Es ist nicht erlaubt, diese Nachricht zu kopieren oder Dritten
> zugänglich zu machen. Sollten Sie diese Nachricht irrtümlich erhalten
> haben, bitte ich um Ihre Mitteilung per E-Mail oder unter der oben
> angegebenen Telefonnummer.
>
> HINWEIS: Dies ist eine vertrauliche Nachricht und nur für den Adressaten
> bestimmt. Es ist nicht erlaubt, diese Nachricht zu kopieren oder Dritten
> zugänglich zu machen. Sollten Sie diese Nachricht irrtümlich erhalten
> haben, bitte ich um Ihre Mitteilung per E-Mail oder unter der oben
> angegebenen Telefonnummer.
>
> HINWEIS: Dies ist eine vertrauliche Nachricht und nur für den Adressaten
> bestimmt. Es ist nicht erlaubt, diese Nachricht zu kopieren oder Dritten
> zugänglich zu machen. Sollten Sie diese Nachricht irrtümlich erhalten
> haben, bitte ich um Ihre Mitteilung per E-Mail oder unter der oben
> angegebenen Telefonnummer.
>

Reply via email to