Hi, I'm trying to read parquet file with Flink 1.12.0 Scala API and save it as another parquet file.
Now it's working correctly with ParquetRowInputFormat: val inputPath: String = ... val messageType: MessageType = ... val parquetInputFormat = new ParquetRowInputFormat(new Path(inputPath), messageType) parquetInputFormat.setNestedFileEnumeration(true) env.readFile(parquetInputFormat, inputPath) .map(row => {....//mapping row to MyPOJO}) .sinkTo(FileSink.forBulkFormat...) But when I replace the inputFormat: val pojoTypeInfo = Types.POJO(classOf[MyPOJO]).asInstanceOf[PojoTypeInfo[MyPOJO]] val parquetInputFormat = new ParquetPojoInputFormat(new Path(inputPath), messageType, pojoTypeInfo) parquetInputFormat.setNestedFileEnumeration(true) env.createInput(parquetInputFormat) .sinkTo(FileSink.forBulkFormat...) The job always fails with exception: java.nio.channels.ClosedChannelException at java.base/sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:156) at java.base/sun.nio.ch.FileChannelImpl.position(FileChannelImpl.java:331) at org.apache.flink.core.fs.local.LocalRecoverableFsDataOutputStream.getPos(LocalRecoverableFsDataOutputStream.java:101) at org.apache.flink.formats.parquet.PositionOutputStreamAdapter.getPos(PositionOutputStreamAdapter.java:50) at org.apache.parquet.hadoop.ParquetFileWriter.startBlock(ParquetFileWriter.java:349) at org.apache.parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:171) at org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:114) at org.apache.parquet.hadoop.ParquetWriter.close(ParquetWriter.java:310) at org.apache.flink.formats.parquet.ParquetBulkWriter.finish(ParquetBulkWriter.java:62) at org.apache.flink.streaming.api.functions.sink.filesystem.BulkPartWriter.closeForCommit(BulkPartWriter.java:60) at org.apache.flink.connector.file.sink.writer.FileWriterBucket.closePartFile(FileWriterBucket.java:276) at org.apache.flink.connector.file.sink.writer.FileWriterBucket.prepareCommit(FileWriterBucket.java:202) at org.apache.flink.connector.file.sink.writer.FileWriter.prepareCommit(FileWriter.java:202) at org.apache.flink.streaming.runtime.operators.sink.AbstractSinkWriterOperator.endInput(AbstractSinkWriterOperator.java:97) at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.endOperatorInput(StreamOperatorWrapper.java:91) at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$close$0(StreamOperatorWrapper.java:127) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47) at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:127) at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:134) at org.apache.flink.streaming.runtime.tasks.OperatorChain.closeOperators(OperatorChain.java:412) at org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke(StreamTask.java:585) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:547) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) This exception is always throwed after a warning: warn [ContinuousFileReaderOperator] not processing any records while closed I would supposed that problem is in my file sink, but the same file sink works for ParquetRowInputFormat. Did I miss something? Thank you! -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/