Flavio Pompermaier created FLINK-2503: -----------------------------------------
Summary: Inconsistencies in FileInputFormat hierarchy Key: FLINK-2503 URL: https://issues.apache.org/jira/browse/FLINK-2503 Project: Flink Issue Type: Bug Components: Core Affects Versions: master Reporter: Flavio Pompermaier Priority: Minor >From a thread in the user mailing list (Invalid argument reading a file >containing a Kryo object). I think that there are some inconsistencies in the hierarchy of InputFormats. The BinaryOutputFormat/TypeSerializerInputFormat should somehow inherit the behaviour of the FileInputFormat (so respect unsplittable and enumerateNestedFiles) while they doesn't take into account those flags. Moreover in the TypeSerializerInputFormat there's a "// TODO: fix this shit" that maybe should be removed or fixed :) Also maintaing aligned testForUnsplittable and decorateInputStream is somehow dangerous.. And maybe visibility for getBlockIndexForPosition should be changed to protected? My need was to implement a TypeSerializerInputFormat<RowBundle> but to achieve that I had to make a lot of overrides..am I doing something wrong or are those inputFormat somehow to improve..? This is my IF code (remark: from the comment "Copied from FileInputFormat (override TypeSerializerInputFormat)" on the code is copied-and-pasted from FileInputFormat..thus MY code ends there): public class RowBundleInputFormat extends TypeSerializerInputFormat<RowBundle> { private static final long serialVersionUID = 1L; private static final Logger LOG = LoggerFactory.getLogger(RowBundleInputFormat.class); /** The fraction that the last split may be larger than the others. */ private static final float MAX_SPLIT_SIZE_DISCREPANCY = 1.1f; private boolean objectRead; public RowBundleInputFormat() { super(new GenericTypeInfo<>(RowBundle.class)); unsplittable = true; } @Override protected FSDataInputStream decorateInputStream(FSDataInputStream inputStream, FileInputSplit fileSplit) throws Throwable { return inputStream; } @Override protected boolean testForUnsplittable(FileStatus pathFile) { return true; } @Override public void open(FileInputSplit split) throws IOException { super.open(split); objectRead = false; } @Override public boolean reachedEnd() throws IOException { return this.objectRead; } @Override public RowBundle nextRecord(RowBundle reuse) throws IOException { RowBundle yourObject = super.nextRecord(reuse); this.objectRead = true; // read only one object return yourObject; } // ------------------------------------------------------------------- // Copied from FileInputFormat (overriding TypeSerializerInputFormat) // ------------------------------------------------------------------- @Override public FileInputSplit[] createInputSplits(int minNumSplits) throws IOException {...} private long addNestedFiles(Path path, List<FileStatus> files, long length, boolean logExcludedFiles) throws IOException {...} private int getBlockIndexForPosition(BlockLocation[] blocks, long offset, long halfSplitSize, int startIndex) { ... } } -- This message was sent by Atlassian JIRA (v6.3.4#6332)