Hi Arvid, Thanks for the response. I have created a sample application with input data and uploaded it to google drive. The sample data is in the archive... thus the large size. (27 mb)
https://drive.google.com/file/d/1dxpnDF3hPUPNlPO5p2tBf-88oOVV0qck/view?usp=sharing To run it: flink run -Dexecution.runtime-mode=BATCH -c com.billybobbain.AndroidTarReader /path/to/flink-tar/build/libs/flink-tar-0.1.jar --input_path /path/to/flink-tar/playstore-00.tar.gz --output_path /path/to/output/ The main class: public class AndroidTarReader { public static void main(String[] args) throws Exception { ParameterTool parameter = ParameterTool.fromArgs(args); String inputPath = parameter.get("input_path"); String outputPath = parameter.get("output_path"); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DataStreamSource<AndroidData> android = env.readFile(new TarInputFormat<>(new Path(inputPath), AndroidData.class), inputPath); final FileSink<AndroidData> sink = FileSink .forRowFormat(new Path(outputPath), new AndroidDataEncoder()) .withOutputFileConfig(OutputFileConfig.builder().withPartPrefix("filtered").withPartSuffix(".json").build()) .withRollingPolicy(DefaultRollingPolicy.builder() .withMaxPartSize(1024 * 1024) .build()) .build(); android.sinkTo(sink); env.execute("zMarket Android"); } } On Tue, Jan 5, 2021 at 5:59 AM Arvid Heise <ar...@ververica.com> wrote: > Hi Billy, > > the exception is happening on the output side. Input side looks fine. > Could you maybe post more information about the sink? > > On Mon, Dec 28, 2020 at 8:11 PM Billy Bain <billybobb...@gmail.com> wrote: > >> I am trying to implement a class that will work similar to >> AvroFileFormat. >> >> This tar archive has a very specific format. It has only one file inside >> and that file is line delimited JSON. >> >> I get this exception, but all the data is written to the temporary files. >> I have checked that my code isn't closing the stream, which was my prior >> issue. >> >> Caused by: java.nio.channels.ClosedChannelException >> at >> java.base/sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:150) >> at java.base/sun.nio.ch.FileChannelImpl.position(FileChannelImpl.java:325) >> at >> org.apache.flink.core.fs.local.LocalRecoverableFsDataOutputStream.getPos(LocalRecoverableFsDataOutputStream.java:101) >> at >> org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedPartFileWriter.getSize(OutputStreamBasedPartFileWriter.java:70) >> at >> org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy.shouldRollOnCheckpoint(DefaultRollingPolicy.java:71) >> at >> org.apache.flink.connector.file.sink.writer.FileWriterBucket.prepareCommit(FileWriterBucket.java:195) >> at >> org.apache.flink.connector.file.sink.writer.FileWriter.prepareCommit(FileWriter.java:202) >> at >> org.apache.flink.streaming.runtime.operators.sink.AbstractSinkWriterOperator.endInput(AbstractSinkWriterOperator.java:97) >> at >> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.endOperatorInput(StreamOperatorWrapper.java:91) >> at >> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$close$0(StreamOperatorWrapper.java:127) >> at >> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper$$Lambda$343/000000000000000000.run(Unknown >> Source) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47) >> at >> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:127) >> at >> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:134) >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain.closeOperators(OperatorChain.java:412) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke(StreamTask.java:585) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:547) >> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) >> at java.base/java.lang.Thread.run(Thread.java:836) >> >> public class TarInputFormat<E> extends FileInputFormat<E> implements >> ResultTypeQueryable<E> { >> >> private static final Logger logger = >> LoggerFactory.getLogger(TarInputFormat.class); >> private transient TarArchiveInputStream tarArchiveInputStream; >> private TarArchiveEntry nextEntry; >> private final Class<E> valueType; >> private long currentPosition = 0L; >> private static final ObjectMapper objectMapper = new ObjectMapper(); >> >> public TarInputFormat(Path filePath, Class<E> valueType) { >> super(filePath); >> this.valueType = valueType; >> this.unsplittable = true; >> this.setNumSplits(1); >> } >> >> @Override >> public TypeInformation<E> getProducedType() { >> return TypeExtractor.getForClass(this.valueType); >> } >> >> @Override >> public void open(FileInputSplit split) throws IOException { >> super.open(split); >> tarArchiveInputStream = new TarArchiveInputStream(stream); >> nextEntry = tarArchiveInputStream.getNextTarEntry(); >> logger.info("Entry Name={} size={}",nextEntry.getName(), >> nextEntry.getSize()); >> } >> >> @Override >> public void close() throws IOException { >> super.close(); >> if (tarArchiveInputStream != null) { >> tarArchiveInputStream.close(); >> } >> } >> >> @Override >> public boolean reachedEnd() throws IOException { >> return nextEntry == null || currentPosition == >> nextEntry.getSize(); >> } >> >> @Override >> public E nextRecord(E reuse) throws IOException { >> if(reachedEnd()) { >> return null; >> } >> logger.info("currentPosition={}", currentPosition); >> int c; >> ByteArrayOutputStream bos = new ByteArrayOutputStream(); >> while (currentPosition < nextEntry.getSize()) { >> c = tarArchiveInputStream.read(); >> currentPosition++; >> if (c == '\n') { >> break; >> } else { >> bos.write(c); >> } >> } >> return objectMapper.readValue(bos.toByteArray(), valueType); >> } >> >> } >> >> Thanks. >> >> -- >> Wayne D. Young >> aka Billy Bob Bain >> billybobb...@gmail.com >> > > > -- > > Arvid Heise | Senior Java Developer > > <https://www.ververica.com/> > > Follow us @VervericaData > > -- > > Join Flink Forward <https://flink-forward.org/> - The Apache Flink > Conference > > Stream Processing | Event Driven | Real Time > > -- > > Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany > > -- > Ververica GmbH > Registered at Amtsgericht Charlottenburg: HRB 158244 B > Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji > (Toni) Cheng > -- Wayne D. Young aka Billy Bob Bain billybobb...@gmail.com