Hi Billy, the exception is happening on the output side. Input side looks fine. Could you maybe post more information about the sink?
On Mon, Dec 28, 2020 at 8:11 PM Billy Bain <billybobb...@gmail.com> wrote: > I am trying to implement a class that will work similar to AvroFileFormat. > > This tar archive has a very specific format. It has only one file inside > and that file is line delimited JSON. > > I get this exception, but all the data is written to the temporary files. > I have checked that my code isn't closing the stream, which was my prior > issue. > > Caused by: java.nio.channels.ClosedChannelException > at > java.base/sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:150) > at java.base/sun.nio.ch.FileChannelImpl.position(FileChannelImpl.java:325) > at > org.apache.flink.core.fs.local.LocalRecoverableFsDataOutputStream.getPos(LocalRecoverableFsDataOutputStream.java:101) > at > org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedPartFileWriter.getSize(OutputStreamBasedPartFileWriter.java:70) > at > org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy.shouldRollOnCheckpoint(DefaultRollingPolicy.java:71) > at > org.apache.flink.connector.file.sink.writer.FileWriterBucket.prepareCommit(FileWriterBucket.java:195) > at > org.apache.flink.connector.file.sink.writer.FileWriter.prepareCommit(FileWriter.java:202) > at > org.apache.flink.streaming.runtime.operators.sink.AbstractSinkWriterOperator.endInput(AbstractSinkWriterOperator.java:97) > at > org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.endOperatorInput(StreamOperatorWrapper.java:91) > at > org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$close$0(StreamOperatorWrapper.java:127) > at > org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper$$Lambda$343/000000000000000000.run(Unknown > Source) > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47) > at > org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:127) > at > org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:134) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain.closeOperators(OperatorChain.java:412) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke(StreamTask.java:585) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:547) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) > at java.base/java.lang.Thread.run(Thread.java:836) > > public class TarInputFormat<E> extends FileInputFormat<E> implements > ResultTypeQueryable<E> { > > private static final Logger logger = > LoggerFactory.getLogger(TarInputFormat.class); > private transient TarArchiveInputStream tarArchiveInputStream; > private TarArchiveEntry nextEntry; > private final Class<E> valueType; > private long currentPosition = 0L; > private static final ObjectMapper objectMapper = new ObjectMapper(); > > public TarInputFormat(Path filePath, Class<E> valueType) { > super(filePath); > this.valueType = valueType; > this.unsplittable = true; > this.setNumSplits(1); > } > > @Override > public TypeInformation<E> getProducedType() { > return TypeExtractor.getForClass(this.valueType); > } > > @Override > public void open(FileInputSplit split) throws IOException { > super.open(split); > tarArchiveInputStream = new TarArchiveInputStream(stream); > nextEntry = tarArchiveInputStream.getNextTarEntry(); > logger.info("Entry Name={} size={}",nextEntry.getName(), > nextEntry.getSize()); > } > > @Override > public void close() throws IOException { > super.close(); > if (tarArchiveInputStream != null) { > tarArchiveInputStream.close(); > } > } > > @Override > public boolean reachedEnd() throws IOException { > return nextEntry == null || currentPosition == > nextEntry.getSize(); > } > > @Override > public E nextRecord(E reuse) throws IOException { > if(reachedEnd()) { > return null; > } > logger.info("currentPosition={}", currentPosition); > int c; > ByteArrayOutputStream bos = new ByteArrayOutputStream(); > while (currentPosition < nextEntry.getSize()) { > c = tarArchiveInputStream.read(); > currentPosition++; > if (c == '\n') { > break; > } else { > bos.write(c); > } > } > return objectMapper.readValue(bos.toByteArray(), valueType); > } > > } > > Thanks. > > -- > Wayne D. Young > aka Billy Bob Bain > billybobb...@gmail.com > -- Arvid Heise | Senior Java Developer <https://www.ververica.com/> Follow us @VervericaData -- Join Flink Forward <https://flink-forward.org/> - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbH Registered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng