Okay, this is helpful. The problem arrives when adding parquet-avro to the 
dependencies. But the the question is, why do I need this dependency? I is not 
mentioned in the docs and I’m using standard setup for writing into hdfs with 
parquet format, nothing special.


Best,
Jan

Von: Till Rohrmann <trohrm...@apache.org>
Gesendet: Donnerstag, 4. Februar 2021 10:08
An: Jan Oelschlegel <oelschle...@integration-factory.de>
Cc: user <user@flink.apache.org>
Betreff: Re: AbstractMethodError while writing to parquet

I guess it depends from where the other dependency is coming. If you have 
multiple dependencies which conflict then you have to resolve it. One way to 
detect these things is to configure dependency convergence [1].

[1] https://maven.apache.org/enforcer/enforcer-rules/dependencyConvergence.html

Cheers,
Till

On Wed, Feb 3, 2021 at 6:34 PM Jan Oelschlegel 
<oelschle...@integration-factory.de<mailto:oelschle...@integration-factory.de>> 
wrote:
Hi Till,

thanks for hint. I checked it and found a version conflict with flink-parquet.

With this version it is running:


<dependency>
    <groupId>org.apache.parquet</groupId>
    <artifactId>parquet-avro</artifactId>
    <version>1.10.0</version>
</dependency>


But how can I avoid this in the future? I had to add parquet-avro, because 
without there were some errors. Do I have to lookup such conflicts manually and 
then choose the same version like at flink dependencies ?


Best,
Jan

Von: Till Rohrmann <trohrm...@apache.org<mailto:trohrm...@apache.org>>
Gesendet: Mittwoch, 3. Februar 2021 11:41
An: Jan Oelschlegel 
<oelschle...@integration-factory.de<mailto:oelschle...@integration-factory.de>>
Cc: user <user@flink.apache.org<mailto:user@flink.apache.org>>
Betreff: Re: AbstractMethodError while writing to parquet

Hi Jan,

it looks to me that you might have different parquet-avro dependencies on your 
class path. Could you make sure that you don't have different versions of the 
library on your classpath?

Cheers,
Till

On Tue, Feb 2, 2021 at 3:29 PM Jan Oelschlegel 
<oelschle...@integration-factory.de<mailto:oelschle...@integration-factory.de>> 
wrote:
Hi at all,

i’m using Flink 1.11 with the datastream api. I would like to write my results 
in parquet format into HDFS.

Therefore i generated an Avro SpecificRecord with the avro-maven-plugin:

<plugin>
                <groupId>org.apache.avro</groupId>
                <artifactId>avro-maven-plugin</artifactId>
                <version>1.8.2</version>
                <executions>
                    <execution>
                        <phase>generate-sources</phase>
                        <goals>
                            <goal>schema</goal>
                        </goals>
                        <configuration>
                            
<sourceDirectory>src/main/resources/avro/</sourceDirectory>
                            
<outputDirectory>${project.basedir}/target/generated-sources/</outputDirectory>
                            <stringType>String</stringType>
                        </configuration>
                    </execution>
                </executions>
            </plugin>


Then  I’m using the SpecificRecord in the StreamingFileSink:

val sink: StreamingFileSink[SpecificRecord] = StreamingFileSink
  .forBulkFormat(
    new Path("hdfs://example.com:8020/data/<http://example.com:8020/data/>"),
    ParquetAvroWriters.forSpecificRecord(classOf[SpecificRecord])
  )
  .build()


The job cancels with the following error:


java.lang.AbstractMethodError: 
org.apache.parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.writePage(Lorg/apache/parquet/bytes/BytesInput;IILorg/apache/parquet/column/statistics/Statistics;Lorg/apache/parquet/column/Encoding;Lorg/apache/parquet/column/Encoding;Lorg/apache/parquet/column/Encoding;)V
    at 
org.apache.parquet.column.impl.ColumnWriterV1.writePage(ColumnWriterV1.java:53)
    at 
org.apache.parquet.column.impl.ColumnWriterBase.writePage(ColumnWriterBase.java:315)
    at 
org.apache.parquet.column.impl.ColumnWriteStoreBase.sizeCheck(ColumnWriteStoreBase.java:200)
    at 
org.apache.parquet.column.impl.ColumnWriteStoreBase.endRecord(ColumnWriteStoreBase.java:187)
    at 
org.apache.parquet.column.impl.ColumnWriteStoreV1.endRecord(ColumnWriteStoreV1.java:27)
    at 
org.apache.parquet.io<http://org.apache.parquet.io>.MessageColumnIO$MessageColumnIORecordConsumer.endMessage(MessageColumnIO.java:307)
    at org.apache.parquet.avro.AvroWriteSupport.write(AvroWriteSupport.java:166)
    at 
org.apache.parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:128)
    at org.apache.parquet.hadoop.ParquetWriter.write(ParquetWriter.java:299)
    at 
org.apache.flink.formats.parquet.ParquetBulkWriter.addElement(ParquetBulkWriter.java:52)
    at 
org.apache.flink.streaming.api.functions.sink.filesystem.BulkPartWriter.write(BulkPartWriter.java:48)
    at 
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:202)
    at 
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:282)
    at 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.onElement(StreamingFileSinkHelper.java:104)
    at 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.invoke(StreamingFileSink.java:420)
    at 
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
    at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
    at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
    at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
    at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
    at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
    at 
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
    at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
    at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
    at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
    at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
    at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
    at 
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53)
    at 
de.integration_factory.datastream.aggregations.CasesProcessFunction.process(CasesProcessFunction.scala:34)
    at 
de.integration_factory.datastream.aggregations.CasesProcessFunction.process(CasesProcessFunction.scala:11)
    at 
org.apache.flink.streaming.api.scala.function.util.ScalaProcessWindowFunctionWrapper.process(ScalaProcessWindowFunctionWrapper.scala:63)
    at 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:50)
    at 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:32)
    at 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:549)
    at 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onEventTime(WindowOperator.java:457)
    at 
org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:276)
    at 
org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:154)
    at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:568)
    at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitWatermark(OneInputStreamTask.java:167)
    at 
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:179)
    at 
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:101)
    at 
org.apache.flink.streaming.runtime.io<http://org.apache.flink.streaming.runtime.io>.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:180)
    at 
org.apache.flink.streaming.runtime.io<http://org.apache.flink.streaming.runtime.io>.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153)
    at 
org.apache.flink.streaming.runtime.io<http://org.apache.flink.streaming.runtime.io>.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:351)
    at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)
    at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:566)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:536)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
    at java.lang.Thread.run(Thread.java:748)



What can I do to fix this?


Best,
Jan

HINWEIS: Dies ist eine vertrauliche Nachricht und nur für den Adressaten 
bestimmt. Es ist nicht erlaubt, diese Nachricht zu kopieren oder Dritten 
zugänglich zu machen. Sollten Sie diese Nachricht irrtümlich erhalten haben, 
bitte ich um Ihre Mitteilung per E-Mail oder unter der oben angegebenen 
Telefonnummer.
HINWEIS: Dies ist eine vertrauliche Nachricht und nur für den Adressaten 
bestimmt. Es ist nicht erlaubt, diese Nachricht zu kopieren oder Dritten 
zugänglich zu machen. Sollten Sie diese Nachricht irrtümlich erhalten haben, 
bitte ich um Ihre Mitteilung per E-Mail oder unter der oben angegebenen 
Telefonnummer.
HINWEIS: Dies ist eine vertrauliche Nachricht und nur für den Adressaten 
bestimmt. Es ist nicht erlaubt, diese Nachricht zu kopieren oder Dritten 
zugänglich zu machen. Sollten Sie diese Nachricht irrtümlich erhalten haben, 
bitte ich um Ihre Mitteilung per E-Mail oder unter der oben angegebenen 
Telefonnummer.

Reply via email to