Flavio Pompermaier created FLINK-2503:
-----------------------------------------

             Summary: Inconsistencies in FileInputFormat hierarchy
                 Key: FLINK-2503
                 URL: https://issues.apache.org/jira/browse/FLINK-2503
             Project: Flink
          Issue Type: Bug
          Components: Core
    Affects Versions: master
            Reporter: Flavio Pompermaier
            Priority: Minor


>From a thread in the user mailing list (Invalid argument reading a file 
>containing a Kryo object).

I think that there are some inconsistencies in the hierarchy of InputFormats.
The BinaryOutputFormat/TypeSerializerInputFormat should somehow inherit the 
behaviour of the FileInputFormat (so respect unsplittable and 
enumerateNestedFiles) while they doesn't take into account those flags.
Moreover in the TypeSerializerInputFormat there's a "// TODO: fix this shit" 
that maybe should be removed or fixed :)

Also maintaing aligned testForUnsplittable and decorateInputStream is somehow 
dangerous..
And maybe visibility for getBlockIndexForPosition should be changed to 
protected?

My need was to implement a TypeSerializerInputFormat<RowBundle> but to achieve 
that I had to make a lot of overrides..am I doing something wrong or are those 
inputFormat somehow to improve..? This is my IF code (remark: from the comment 
"Copied from FileInputFormat (override TypeSerializerInputFormat)" on the code 
is copied-and-pasted from FileInputFormat..thus MY code ends there):

public class RowBundleInputFormat extends TypeSerializerInputFormat<RowBundle> {

        private static final long serialVersionUID = 1L;
        private static final Logger LOG = 
LoggerFactory.getLogger(RowBundleInputFormat.class);

        /** The fraction that the last split may be larger than the others. */
        private static final float MAX_SPLIT_SIZE_DISCREPANCY = 1.1f;
        private boolean objectRead;

        public RowBundleInputFormat() {
                super(new GenericTypeInfo<>(RowBundle.class));
                unsplittable = true;
        }

        @Override
        protected FSDataInputStream decorateInputStream(FSDataInputStream 
inputStream, FileInputSplit fileSplit) throws Throwable {
                return inputStream;
        }

        @Override
        protected boolean testForUnsplittable(FileStatus pathFile) {
                return true;
        }

        @Override
        public void open(FileInputSplit split) throws IOException {
                super.open(split);
                objectRead = false;
        }

        @Override
        public boolean reachedEnd() throws IOException {
                return this.objectRead;
        }

        @Override
        public RowBundle nextRecord(RowBundle reuse) throws IOException {
                RowBundle yourObject = super.nextRecord(reuse);
                this.objectRead = true; // read only one object
                return yourObject;
        }

        // -------------------------------------------------------------------
        // Copied from FileInputFormat (overriding TypeSerializerInputFormat)
        // -------------------------------------------------------------------
        @Override
        public FileInputSplit[] createInputSplits(int minNumSplits)
                        throws IOException {...}

        private long addNestedFiles(Path path, List<FileStatus> files, long 
length, boolean logExcludedFiles) throws IOException {...}

        private int getBlockIndexForPosition(BlockLocation[] blocks, long 
offset, long halfSplitSize, int startIndex) { ... }

}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to