[ https://issues.apache.org/jira/browse/FLINK-2503?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Robert Metzger updated FLINK-2503: ---------------------------------- Component/s: (was: Core) API / DataSet > Inconsistencies in FileInputFormat hierarchy > -------------------------------------------- > > Key: FLINK-2503 > URL: https://issues.apache.org/jira/browse/FLINK-2503 > Project: Flink > Issue Type: Bug > Components: API / DataSet > Affects Versions: 0.10.0 > Reporter: Flavio Pompermaier > Priority: Minor > > From a thread in the user mailing list (Invalid argument reading a file > containing a Kryo object). > I think that there are some inconsistencies in the hierarchy of InputFormats. > The BinaryOutputFormat/TypeSerializerInputFormat should somehow inherit the > behaviour of the FileInputFormat (so respect unsplittable and > enumerateNestedFiles) while they doesn't take into account those flags. > Moreover in the TypeSerializerInputFormat there's a "// TODO: fix this shit" > that maybe should be removed or fixed :) > Also maintaing aligned testForUnsplittable and decorateInputStream is somehow > dangerous.. > And maybe visibility for getBlockIndexForPosition should be changed to > protected? > My need was to implement a TypeSerializerInputFormat<RowBundle> but to > achieve that I had to make a lot of overrides..am I doing something wrong or > are those inputFormat somehow to improve..? This is my IF code (remark: from > the comment "Copied from FileInputFormat (override > TypeSerializerInputFormat)" on the code is copied-and-pasted from > FileInputFormat..thus MY code ends there): > {code:java} > public class RowBundleInputFormat extends > TypeSerializerInputFormat<RowBundle> { > private static final long serialVersionUID = 1L; > private static final Logger LOG = > LoggerFactory.getLogger(RowBundleInputFormat.class); > /** The fraction that the last split may be larger than the others. */ > private static final float MAX_SPLIT_SIZE_DISCREPANCY = 1.1f; > private boolean objectRead; > public RowBundleInputFormat() { > super(new GenericTypeInfo<>(RowBundle.class)); > unsplittable = true; > } > @Override > protected FSDataInputStream decorateInputStream(FSDataInputStream > inputStream, FileInputSplit fileSplit) throws Throwable { > return inputStream; > } > @Override > protected boolean testForUnsplittable(FileStatus pathFile) { > return true; > } > @Override > public void open(FileInputSplit split) throws IOException { > super.open(split); > objectRead = false; > } > @Override > public boolean reachedEnd() throws IOException { > return this.objectRead; > } > @Override > public RowBundle nextRecord(RowBundle reuse) throws IOException { > RowBundle yourObject = super.nextRecord(reuse); > this.objectRead = true; // read only one object > return yourObject; > } > // ------------------------------------------------------------------- > // Copied from FileInputFormat (overriding TypeSerializerInputFormat) > // ------------------------------------------------------------------- > @Override > public FileInputSplit[] createInputSplits(int minNumSplits) > throws IOException {...} > private long addNestedFiles(Path path, List<FileStatus> files, long > length, boolean logExcludedFiles) throws IOException {...} > private int getBlockIndexForPosition(BlockLocation[] blocks, long > offset, long halfSplitSize, int startIndex) { ... } > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)