Hi Vikash,
The error is coming from Parquet itself in conjunction with Avro (which is
used to infer the schema of your scala class). The inferred schema is
{
"fields": [
{
"name": "level",
"type": "string"
},
{
"name": "time_stamp",
"type": {
"fields": [],
"name": "Option",
"namespace": "scala",
"type": "record"
}
}
],
"name": "Log",
"namespace": "org.apache.flink.formats.parquet.avro",
"type": "record"
}
As you can see, Avro infers your schema, such that Option is treated as an
arbitrary class. Since it doesn't have any fields, you receive your error
from Parquet though.
I don't see an easy fix for it, but you can probably search for solutions
with Avro's ReflectData and scala.Option. As a workaround, you can refrain
from using an Option field, and go with a nullable field (you can translate
it into Option with a fancy getter).
In general, if you want to have more control over the schema, I'd suggest
to go schema first: Define your Avro schema and use avro-hugger to generate
the corresponding Scala class. In that way, Option is properly supported.
Best,
Arvid
On Wed, Aug 12, 2020 at 2:43 AM Vikash Dat <[email protected]> wrote:
> I have defined a streaming file sink for parquet to store my scala case
> class.
>
> StreamingFileSink
>
> .*forBulkFormat(*
>
> new Path*(*appArgs.datalakeBucket*)*,
>
> ParquetAvroWriters
>
> .*forReflectRecord(classOf[*Log*])*
>
> * )*
>
> .withBucketAssigner*(*new TransactionLogHiveBucketAssigner*())*
>
> .build*()*
>
>
> where my class class is
>
> Log(
>
> level: String,
>
> time_stamp: Option[Long] = None
>
> )
>
>
> When Flink tries to write a specific instance to parquet
>
>
> Log("info",Some(1596975950000))
>
>
> it throws the following error:
>
>
> org.apache.parquet.schema.InvalidSchemaException: Cannot write a schema
> with an empty group: required group time_stamp {
> }
> at org.apache.parquet.schema.TypeUtil$1.visit(TypeUtil.java:27)
> at org.apache.parquet.schema.GroupType.accept(GroupType.java:255)
> at org.apache.parquet.schema.TypeUtil$1.visit(TypeUtil.java:31)
> at org.apache.parquet.schema.TypeUtil$1.visit(TypeUtil.java:37)
> at org.apache.parquet.schema.MessageType.accept(MessageType.java:55)
> at org.apache.parquet.schema.TypeUtil.checkValidWriteSchema(TypeUtil
> .java:23)
> at org.apache.parquet.hadoop.ParquetFileWriter.<init>(
> ParquetFileWriter.java:280)
> at org.apache.parquet.hadoop.ParquetWriter.<init>(ParquetWriter.java:
> 283)
> at org.apache.parquet.hadoop.ParquetWriter$Builder.build(ParquetWriter
> .java:564)
> at org.apache.flink.formats.parquet.avro.ParquetAvroWriters
> .createAvroParquetWriter(ParquetAvroWriters.java:87)
> at org.apache.flink.formats.parquet.avro.ParquetAvroWriters
> .lambda$forReflectRecord$3c375096$1(ParquetAvroWriters.java:73)
> at org.apache.flink.formats.parquet.ParquetWriterFactory.create(
> ParquetWriterFactory.java:57)
> at org.apache.flink.streaming.api.functions.sink.filesystem.
> BulkPartWriter$Factory.openNew(BulkPartWriter.java:103)
> at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket
> .rollPartFile(Bucket.java:222)
> at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket
> .write(Bucket.java:212)
> at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets
> .onElement(Buckets.java:274)
> at org.apache.flink.streaming.api.functions.sink.filesystem.
> StreamingFileSink.invoke(StreamingFileSink.java:445)
> at org.apache.flink.streaming.api.operators.StreamSink.processElement(
> StreamSink.java:56)
> at org.apache.flink.streaming.runtime.tasks.
> OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
> at org.apache.flink.streaming.runtime.tasks.
> OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
> at org.apache.flink.streaming.runtime.tasks.
> OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
> at org.apache.flink.streaming.api.operators.
> AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:
> 730)
> at org.apache.flink.streaming.api.operators.
> AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:
> 708)
> at org.apache.flink.streaming.api.operators.StreamMap.processElement(
> StreamMap.java:41)
> at org.apache.flink.streaming.runtime.tasks.
> OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
> at org.apache.flink.streaming.runtime.tasks.
> OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
> at org.apache.flink.streaming.runtime.tasks.
> OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
> at org.apache.flink.streaming.runtime.tasks.
> OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:711)
> at org.apache.flink.streaming.runtime.tasks.
> OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:664)
> at org.apache.flink.streaming.api.operators.
> AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:
> 730)
> at org.apache.flink.streaming.api.operators.
> AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:
> 708)
> at org.apache.flink.streaming.api.operators.TimestampedCollector.
> collect(TimestampedCollector.java:53)
> at org.apache.flink.streaming.api.functions.windowing.
> PassThroughWindowFunction.apply(PassThroughWindowFunction.java:36)
> at org.apache.flink.streaming.runtime.operators.windowing.functions.
> InternalSingleValueWindowFunction.process(
> InternalSingleValueWindowFunction.java:46)
> at org.apache.flink.streaming.runtime.operators.windowing.
> WindowOperator.emitWindowContents(WindowOperator.java:549)
> at org.apache.flink.streaming.runtime.operators.windowing.
> WindowOperator.processElement(WindowOperator.java:373)
> at org.apache.flink.streaming.runtime.tasks.
> OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask
> .java:173)
> at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
> .processElement(StreamTaskNetworkInput.java:151)
> at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
> .emitNext(StreamTaskNetworkInput.java:128)
> at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
> .processInput(StreamOneInputProcessor.java:69)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(
> StreamTask.java:311)
> at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
> .runMailboxLoop(MailboxProcessor.java:187)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(
> StreamTask.java:487)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
> StreamTask.java:470)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
> at java.lang.Thread.run(Thread.java:748)
>
> Can Flink Parquet not handle field of type Option?
>
> Thanks,
>
>
>
>
>
--
Arvid Heise | Senior Java Developer
<https://www.ververica.com/>
Follow us @VervericaData
--
Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference
Stream Processing | Event Driven | Real Time
--
Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng