Hi,
I'm trying to read parquet file with Flink 1.12.0 Scala API and save it as
another parquet file.
Now it's working correctly with ParquetRowInputFormat:
val inputPath: String = ...
val messageType: MessageType = ...
val parquetInputFormat = new ParquetRowInputFormat(new Path(inputPath),
messageType)
parquetInputFormat.setNestedFileEnumeration(true)
env.readFile(parquetInputFormat, inputPath)
.map(row => {....//mapping row to MyPOJO})
.sinkTo(FileSink.forBulkFormat...)
But when I replace the inputFormat:
val pojoTypeInfo =
Types.POJO(classOf[MyPOJO]).asInstanceOf[PojoTypeInfo[MyPOJO]]
val parquetInputFormat = new ParquetPojoInputFormat(new Path(inputPath),
messageType, pojoTypeInfo)
parquetInputFormat.setNestedFileEnumeration(true)
env.createInput(parquetInputFormat)
.sinkTo(FileSink.forBulkFormat...)
The job always fails with exception:
java.nio.channels.ClosedChannelException
at
java.base/sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:156)
at
java.base/sun.nio.ch.FileChannelImpl.position(FileChannelImpl.java:331)
at
org.apache.flink.core.fs.local.LocalRecoverableFsDataOutputStream.getPos(LocalRecoverableFsDataOutputStream.java:101)
at
org.apache.flink.formats.parquet.PositionOutputStreamAdapter.getPos(PositionOutputStreamAdapter.java:50)
at
org.apache.parquet.hadoop.ParquetFileWriter.startBlock(ParquetFileWriter.java:349)
at
org.apache.parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:171)
at
org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:114)
at
org.apache.parquet.hadoop.ParquetWriter.close(ParquetWriter.java:310)
at
org.apache.flink.formats.parquet.ParquetBulkWriter.finish(ParquetBulkWriter.java:62)
at
org.apache.flink.streaming.api.functions.sink.filesystem.BulkPartWriter.closeForCommit(BulkPartWriter.java:60)
at
org.apache.flink.connector.file.sink.writer.FileWriterBucket.closePartFile(FileWriterBucket.java:276)
at
org.apache.flink.connector.file.sink.writer.FileWriterBucket.prepareCommit(FileWriterBucket.java:202)
at
org.apache.flink.connector.file.sink.writer.FileWriter.prepareCommit(FileWriter.java:202)
at
org.apache.flink.streaming.runtime.operators.sink.AbstractSinkWriterOperator.endInput(AbstractSinkWriterOperator.java:97)
at
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.endOperatorInput(StreamOperatorWrapper.java:91)
at
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$close$0(StreamOperatorWrapper.java:127)
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
at
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:127)
at
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:134)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain.closeOperators(OperatorChain.java:412)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke(StreamTask.java:585)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:547)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
This exception is always throwed after a warning:
warn [ContinuousFileReaderOperator] not processing any records while closed
I would supposed that problem is in my file sink, but the same file sink
works for ParquetRowInputFormat.
Did I miss something?
Thank you!
--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/