Hi Billy,

the exception is happening on the output side. Input side looks fine. Could
you maybe post more information about the sink?

On Mon, Dec 28, 2020 at 8:11 PM Billy Bain <billybobb...@gmail.com> wrote:

> I am trying to implement a class that will work similar to AvroFileFormat.
>
> This tar archive has a very specific format. It has only one file inside
> and that file is line delimited JSON.
>
> I get this exception, but all the data is written to the temporary files.
> I have checked that my code isn't closing the stream, which was my prior
> issue.
>
> Caused by: java.nio.channels.ClosedChannelException
> at
> java.base/sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:150)
> at java.base/sun.nio.ch.FileChannelImpl.position(FileChannelImpl.java:325)
> at
> org.apache.flink.core.fs.local.LocalRecoverableFsDataOutputStream.getPos(LocalRecoverableFsDataOutputStream.java:101)
> at
> org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedPartFileWriter.getSize(OutputStreamBasedPartFileWriter.java:70)
> at
> org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy.shouldRollOnCheckpoint(DefaultRollingPolicy.java:71)
> at
> org.apache.flink.connector.file.sink.writer.FileWriterBucket.prepareCommit(FileWriterBucket.java:195)
> at
> org.apache.flink.connector.file.sink.writer.FileWriter.prepareCommit(FileWriter.java:202)
> at
> org.apache.flink.streaming.runtime.operators.sink.AbstractSinkWriterOperator.endInput(AbstractSinkWriterOperator.java:97)
> at
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.endOperatorInput(StreamOperatorWrapper.java:91)
> at
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$close$0(StreamOperatorWrapper.java:127)
> at
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper$$Lambda$343/000000000000000000.run(Unknown
> Source)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
> at
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:127)
> at
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:134)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.closeOperators(OperatorChain.java:412)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke(StreamTask.java:585)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:547)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
> at java.base/java.lang.Thread.run(Thread.java:836)
>
> public class TarInputFormat<E> extends FileInputFormat<E> implements
> ResultTypeQueryable<E> {
>
>     private static final Logger logger =
> LoggerFactory.getLogger(TarInputFormat.class);
>     private transient TarArchiveInputStream tarArchiveInputStream;
>     private TarArchiveEntry nextEntry;
>     private final Class<E> valueType;
>     private long currentPosition = 0L;
>     private static final ObjectMapper objectMapper = new ObjectMapper();
>
>     public TarInputFormat(Path filePath, Class<E> valueType) {
>         super(filePath);
>         this.valueType = valueType;
>         this.unsplittable = true;
>         this.setNumSplits(1);
>     }
>
>     @Override
>     public TypeInformation<E> getProducedType() {
>         return TypeExtractor.getForClass(this.valueType);
>     }
>
>     @Override
>     public void open(FileInputSplit split) throws IOException {
>         super.open(split);
>         tarArchiveInputStream = new TarArchiveInputStream(stream);
>         nextEntry = tarArchiveInputStream.getNextTarEntry();
>         logger.info("Entry Name={} size={}",nextEntry.getName(),
> nextEntry.getSize());
>     }
>
>     @Override
>     public void close() throws IOException {
>         super.close();
>         if (tarArchiveInputStream != null) {
>             tarArchiveInputStream.close();
>         }
>     }
>
>     @Override
>     public boolean reachedEnd() throws IOException {
>         return nextEntry == null ||  currentPosition ==
> nextEntry.getSize();
>     }
>
>     @Override
>     public E nextRecord(E reuse) throws IOException {
>         if(reachedEnd()) {
>             return null;
>         }
>         logger.info("currentPosition={}", currentPosition);
>         int c;
>         ByteArrayOutputStream bos = new ByteArrayOutputStream();
>         while (currentPosition < nextEntry.getSize()) {
>             c = tarArchiveInputStream.read();
>             currentPosition++;
>             if (c == '\n') {
>                 break;
>             } else {
>                 bos.write(c);
>             }
>         }
>         return objectMapper.readValue(bos.toByteArray(), valueType);
>     }
>
> }
>
> Thanks.
>
> --
> Wayne D. Young
> aka Billy Bob Bain
> billybobb...@gmail.com
>


-- 

Arvid Heise | Senior Java Developer

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng

Reply via email to