I have defined a streaming file sink for parquet to store my scala case
class.
StreamingFileSink
.*forBulkFormat(*
new Path*(*appArgs.datalakeBucket*)*,
ParquetAvroWriters
.*forReflectRecord(classOf[*Log*])*
* )*
.withBucketAssigner*(*new TransactionLogHiveBucketAssigner*())*
.build*()*
where my class class is
Log(
level: String,
time_stamp: Option[Long] = None
)
When Flink tries to write a specific instance to parquet
Log("info",Some(1596975950000))
it throws the following error:
org.apache.parquet.schema.InvalidSchemaException: Cannot write a schema
with an empty group: required group time_stamp {
}
at org.apache.parquet.schema.TypeUtil$1.visit(TypeUtil.java:27)
at org.apache.parquet.schema.GroupType.accept(GroupType.java:255)
at org.apache.parquet.schema.TypeUtil$1.visit(TypeUtil.java:31)
at org.apache.parquet.schema.TypeUtil$1.visit(TypeUtil.java:37)
at org.apache.parquet.schema.MessageType.accept(MessageType.java:55)
at org.apache.parquet.schema.TypeUtil.checkValidWriteSchema(TypeUtil
.java:23)
at org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter
.java:280)
at org.apache.parquet.hadoop.ParquetWriter.<init>(ParquetWriter.java:283
)
at org.apache.parquet.hadoop.ParquetWriter$Builder.build(ParquetWriter
.java:564)
at org.apache.flink.formats.parquet.avro.ParquetAvroWriters
.createAvroParquetWriter(ParquetAvroWriters.java:87)
at org.apache.flink.formats.parquet.avro.ParquetAvroWriters
.lambda$forReflectRecord$3c375096$1(ParquetAvroWriters.java:73)
at org.apache.flink.formats.parquet.ParquetWriterFactory.create(
ParquetWriterFactory.java:57)
at org.apache.flink.streaming.api.functions.sink.filesystem.
BulkPartWriter$Factory.openNew(BulkPartWriter.java:103)
at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket
.rollPartFile(Bucket.java:222)
at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket
.write(Bucket.java:212)
at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets
.onElement(Buckets.java:274)
at org.apache.flink.streaming.api.functions.sink.filesystem.
StreamingFileSink.invoke(StreamingFileSink.java:445)
at org.apache.flink.streaming.api.operators.StreamSink.processElement(
StreamSink.java:56)
at org.apache.flink.streaming.runtime.tasks.
OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
at org.apache.flink.streaming.runtime.tasks.
OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
at org.apache.flink.streaming.runtime.tasks.
OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
at org.apache.flink.streaming.api.operators.
AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:
730)
at org.apache.flink.streaming.api.operators.
AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:
708)
at org.apache.flink.streaming.api.operators.StreamMap.processElement(
StreamMap.java:41)
at org.apache.flink.streaming.runtime.tasks.
OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
at org.apache.flink.streaming.runtime.tasks.
OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
at org.apache.flink.streaming.runtime.tasks.
OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
at org.apache.flink.streaming.runtime.tasks.
OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:711)
at org.apache.flink.streaming.runtime.tasks.
OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:664)
at org.apache.flink.streaming.api.operators.
AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:
730)
at org.apache.flink.streaming.api.operators.
AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:
708)
at org.apache.flink.streaming.api.operators.TimestampedCollector.collect
(TimestampedCollector.java:53)
at org.apache.flink.streaming.api.functions.windowing.
PassThroughWindowFunction.apply(PassThroughWindowFunction.java:36)
at org.apache.flink.streaming.runtime.operators.windowing.functions.
InternalSingleValueWindowFunction.process(InternalSingleValueWindowFunction
.java:46)
at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator
.emitWindowContents(WindowOperator.java:549)
at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator
.processElement(WindowOperator.java:373)
at org.apache.flink.streaming.runtime.tasks.
OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask
.java:173)
at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
.processElement(StreamTaskNetworkInput.java:151)
at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
.emitNext(StreamTaskNetworkInput.java:128)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
.processInput(StreamOneInputProcessor.java:69)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(
StreamTask.java:311)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
.runMailboxLoop(MailboxProcessor.java:187)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(
StreamTask.java:487)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask
.java:470)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
at java.lang.Thread.run(Thread.java:748)
Can Flink Parquet not handle field of type Option?
Thanks,