Hi Arvid,

Thanks for the response. I have created a sample application with input
data and uploaded it to google drive. The sample data is in the archive...
thus the large size. (27 mb)

https://drive.google.com/file/d/1dxpnDF3hPUPNlPO5p2tBf-88oOVV0qck/view?usp=sharing

To run it:
flink run  -Dexecution.runtime-mode=BATCH -c
com.billybobbain.AndroidTarReader
/path/to/flink-tar/build/libs/flink-tar-0.1.jar --input_path
/path/to/flink-tar/playstore-00.tar.gz --output_path /path/to/output/

The main class:
public class AndroidTarReader {
    public static void main(String[] args) throws Exception {
        ParameterTool parameter = ParameterTool.fromArgs(args);
        String inputPath = parameter.get("input_path");
        String outputPath = parameter.get("output_path");
        final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<AndroidData> android = env.readFile(new
TarInputFormat<>(new Path(inputPath), AndroidData.class), inputPath);
        final FileSink<AndroidData> sink = FileSink
                .forRowFormat(new Path(outputPath), new
AndroidDataEncoder())

.withOutputFileConfig(OutputFileConfig.builder().withPartPrefix("filtered").withPartSuffix(".json").build())
                .withRollingPolicy(DefaultRollingPolicy.builder()
                        .withMaxPartSize(1024 * 1024)
                        .build())
                .build();
        android.sinkTo(sink);
        env.execute("zMarket Android");
    }
}

On Tue, Jan 5, 2021 at 5:59 AM Arvid Heise <ar...@ververica.com> wrote:

> Hi Billy,
>
> the exception is happening on the output side. Input side looks fine.
> Could you maybe post more information about the sink?
>
> On Mon, Dec 28, 2020 at 8:11 PM Billy Bain <billybobb...@gmail.com> wrote:
>
>> I am trying to implement a class that will work similar to
>> AvroFileFormat.
>>
>> This tar archive has a very specific format. It has only one file inside
>> and that file is line delimited JSON.
>>
>> I get this exception, but all the data is written to the temporary files.
>> I have checked that my code isn't closing the stream, which was my prior
>> issue.
>>
>> Caused by: java.nio.channels.ClosedChannelException
>> at
>> java.base/sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:150)
>> at java.base/sun.nio.ch.FileChannelImpl.position(FileChannelImpl.java:325)
>> at
>> org.apache.flink.core.fs.local.LocalRecoverableFsDataOutputStream.getPos(LocalRecoverableFsDataOutputStream.java:101)
>> at
>> org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedPartFileWriter.getSize(OutputStreamBasedPartFileWriter.java:70)
>> at
>> org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy.shouldRollOnCheckpoint(DefaultRollingPolicy.java:71)
>> at
>> org.apache.flink.connector.file.sink.writer.FileWriterBucket.prepareCommit(FileWriterBucket.java:195)
>> at
>> org.apache.flink.connector.file.sink.writer.FileWriter.prepareCommit(FileWriter.java:202)
>> at
>> org.apache.flink.streaming.runtime.operators.sink.AbstractSinkWriterOperator.endInput(AbstractSinkWriterOperator.java:97)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.endOperatorInput(StreamOperatorWrapper.java:91)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$close$0(StreamOperatorWrapper.java:127)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper$$Lambda$343/000000000000000000.run(Unknown
>> Source)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:127)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:134)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain.closeOperators(OperatorChain.java:412)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke(StreamTask.java:585)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:547)
>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
>> at java.base/java.lang.Thread.run(Thread.java:836)
>>
>> public class TarInputFormat<E> extends FileInputFormat<E> implements
>> ResultTypeQueryable<E> {
>>
>>     private static final Logger logger =
>> LoggerFactory.getLogger(TarInputFormat.class);
>>     private transient TarArchiveInputStream tarArchiveInputStream;
>>     private TarArchiveEntry nextEntry;
>>     private final Class<E> valueType;
>>     private long currentPosition = 0L;
>>     private static final ObjectMapper objectMapper = new ObjectMapper();
>>
>>     public TarInputFormat(Path filePath, Class<E> valueType) {
>>         super(filePath);
>>         this.valueType = valueType;
>>         this.unsplittable = true;
>>         this.setNumSplits(1);
>>     }
>>
>>     @Override
>>     public TypeInformation<E> getProducedType() {
>>         return TypeExtractor.getForClass(this.valueType);
>>     }
>>
>>     @Override
>>     public void open(FileInputSplit split) throws IOException {
>>         super.open(split);
>>         tarArchiveInputStream = new TarArchiveInputStream(stream);
>>         nextEntry = tarArchiveInputStream.getNextTarEntry();
>>         logger.info("Entry Name={} size={}",nextEntry.getName(),
>> nextEntry.getSize());
>>     }
>>
>>     @Override
>>     public void close() throws IOException {
>>         super.close();
>>         if (tarArchiveInputStream != null) {
>>             tarArchiveInputStream.close();
>>         }
>>     }
>>
>>     @Override
>>     public boolean reachedEnd() throws IOException {
>>         return nextEntry == null ||  currentPosition ==
>> nextEntry.getSize();
>>     }
>>
>>     @Override
>>     public E nextRecord(E reuse) throws IOException {
>>         if(reachedEnd()) {
>>             return null;
>>         }
>>         logger.info("currentPosition={}", currentPosition);
>>         int c;
>>         ByteArrayOutputStream bos = new ByteArrayOutputStream();
>>         while (currentPosition < nextEntry.getSize()) {
>>             c = tarArchiveInputStream.read();
>>             currentPosition++;
>>             if (c == '\n') {
>>                 break;
>>             } else {
>>                 bos.write(c);
>>             }
>>         }
>>         return objectMapper.readValue(bos.toByteArray(), valueType);
>>     }
>>
>> }
>>
>> Thanks.
>>
>> --
>> Wayne D. Young
>> aka Billy Bob Bain
>> billybobb...@gmail.com
>>
>
>
> --
>
> Arvid Heise | Senior Java Developer
>
> <https://www.ververica.com/>
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Toni) Cheng
>


-- 
Wayne D. Young
aka Billy Bob Bain
billybobb...@gmail.com

Reply via email to