Hi Fabian,
thanks to your help I finally managed to successfully generate a DataSet
from my folder but I think that there are some inconsistencies in the
hierarchy of InputFormats.
The *BinaryOutputFormat*/*TypeSerializerInputFormat* should somehow inherit
the behaviour of the FileInputFormat (so respect *unsplittable* and
*enumerateNestedFiles*) while they doesn't take into account those flags.
Moreover in the *TypeSerializerInputFormat* there's a *"// TODO: fix this
shit"* that maybe should be removed or fixed :)

Also maintaing aligned testForUnsplittable and decorateInputStream is
somehow dangerous..
And maybe visibility for getBlockIndexForPosition should be changed to
protected?

So basically, my needs was to implement
a TypeSerializerInputFormat<RowBundle> but to achieve that I had to make a
lot of overrides..am I doing something wrong or are those inputFormat
somehow to improve..? This is my IF code (*remark*: from the comment *"Copied
from FileInputFormat (override TypeSerializerInputFormat)"* on the code is
copied-and-pasted from FileInputFormat..thus MY code ends there):

public class RowBundleInputFormat extends
TypeSerializerInputFormat<RowBundle> {

private static final long serialVersionUID = 1L;
private static final Logger LOG =
LoggerFactory.getLogger(RowBundleInputFormat.class);

/** The fraction that the last split may be larger than the others. */
private static final float MAX_SPLIT_SIZE_DISCREPANCY = 1.1f;
private boolean objectRead;

public RowBundleInputFormat() {
super(new GenericTypeInfo<>(RowBundle.class));
unsplittable = true;
}

@Override
protected FSDataInputStream decorateInputStream(FSDataInputStream
inputStream, FileInputSplit fileSplit) throws Throwable {
return inputStream;
}

@Override
protected boolean testForUnsplittable(FileStatus pathFile) {
return true;
}

@Override
public void open(FileInputSplit split) throws IOException {
super.open(split);
objectRead = false;
}

@Override
public boolean reachedEnd() throws IOException {
return this.objectRead;
}

@Override
public RowBundle nextRecord(RowBundle reuse) throws IOException {
RowBundle yourObject = super.nextRecord(reuse);
this.objectRead = true; // read only one object
return yourObject;
}

// -------------------------------------------------------------------
// Copied from FileInputFormat (override TypeSerializerInputFormat)
// -------------------------------------------------------------------
@Override
public FileInputSplit[] createInputSplits(int minNumSplits)
throws IOException {
if (minNumSplits < 1) {
throw new IllegalArgumentException(
"Number of input splits has to be at least 1.");
}

// take the desired number of splits into account
minNumSplits = Math.max(minNumSplits, this.numSplits);

final Path path = this.filePath;
final List<FileInputSplit> inputSplits = new ArrayList<FileInputSplit>(
minNumSplits);

// get all the files that are involved in the splits
List<FileStatus> files = new ArrayList<FileStatus>();
long totalLength = 0;

final FileSystem fs = path.getFileSystem();
final FileStatus pathFile = fs.getFileStatus(path);

if (pathFile.isDir()) {
// input is directory. list all contained files
final FileStatus[] dir = fs.listStatus(path);
for (int i = 0; i < dir.length; i++) {
if (dir[i].isDir()) {
if (enumerateNestedFiles) {
if (acceptFile(dir[i])) {
totalLength += addNestedFiles(dir[i].getPath(),
files, 0, true);
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Directory "
+ dir[i].getPath().toString()
+ " did not pass the file-filter and is excluded.");
}
}
}
} else {
if (acceptFile(dir[i])) {
files.add(dir[i]);
totalLength += dir[i].getLen();
// as soon as there is one deflate file in a directory,
// we can not split it
testForUnsplittable(dir[i]);
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("File "
+ dir[i].getPath().toString()
+ " did not pass the file-filter and is excluded.");
}
}
}
}
} else {
testForUnsplittable(pathFile);

files.add(pathFile);
totalLength += pathFile.getLen();
}
// returns if unsplittable
if (unsplittable) {
int splitNum = 0;
for (final FileStatus file : files) {
final BlockLocation[] blocks = fs.getFileBlockLocations(file,
0, file.getLen());
Set<String> hosts = new HashSet<String>();
for (BlockLocation block : blocks) {
hosts.addAll(Arrays.asList(block.getHosts()));
}
long len = file.getLen();
if (testForUnsplittable(file)) {
len = READ_WHOLE_SPLIT_FLAG;
}
FileInputSplit fis = new FileInputSplit(splitNum++,
file.getPath(), 0, len, hosts.toArray(new String[hosts
.size()]));
inputSplits.add(fis);
}
return inputSplits.toArray(new FileInputSplit[inputSplits.size()]);
}

final long maxSplitSize = (minNumSplits < 1) ? Long.MAX_VALUE
: (totalLength / minNumSplits + (totalLength % minNumSplits == 0 ? 0
: 1));

// now that we have the files, generate the splits
int splitNum = 0;
for (final FileStatus file : files) {

final long len = file.getLen();
final long blockSize = file.getBlockSize();

final long minSplitSize;
if (this.minSplitSize <= blockSize) {
minSplitSize = this.minSplitSize;
} else {
if (LOG.isWarnEnabled()) {
LOG.warn("Minimal split size of " + this.minSplitSize
+ " is larger than the block size of " + blockSize
+ ". Decreasing minimal split size to block size.");
}
minSplitSize = blockSize;
}

final long splitSize = Math.max(minSplitSize,
Math.min(maxSplitSize, blockSize));
final long halfSplit = splitSize >>> 1;

final long maxBytesForLastSplit = (long) (splitSize *
MAX_SPLIT_SIZE_DISCREPANCY);

if (len > 0) {

// get the block locations and make sure they are in order with
// respect to their offset
final BlockLocation[] blocks = fs.getFileBlockLocations(file,
0, len);
Arrays.sort(blocks);

long bytesUnassigned = len;
long position = 0;

int blockIndex = 0;

while (bytesUnassigned > maxBytesForLastSplit) {
// get the block containing the majority of the data
blockIndex = getBlockIndexForPosition(blocks, position,
halfSplit, blockIndex);
// create a new split
FileInputSplit fis = new FileInputSplit(splitNum++,
file.getPath(), position, splitSize,
blocks[blockIndex].getHosts());
inputSplits.add(fis);

// adjust the positions
position += splitSize;
bytesUnassigned -= splitSize;
}

// assign the last split
if (bytesUnassigned > 0) {
blockIndex = getBlockIndexForPosition(blocks, position,
halfSplit, blockIndex);
final FileInputSplit fis = new FileInputSplit(splitNum++,
file.getPath(), position, bytesUnassigned,
blocks[blockIndex].getHosts());
inputSplits.add(fis);
}
} else {
// special case with a file of zero bytes size
final BlockLocation[] blocks = fs.getFileBlockLocations(file,
0, 0);
String[] hosts;
if (blocks.length > 0) {
hosts = blocks[0].getHosts();
} else {
hosts = new String[0];
}
final FileInputSplit fis = new FileInputSplit(splitNum++,
file.getPath(), 0, 0, hosts);
inputSplits.add(fis);
}
}

return inputSplits.toArray(new FileInputSplit[inputSplits.size()]);
}

/**
* Recursively traverse the input directory structure and enumerate all
* accepted nested files.
*
* @return the total length of accepted files.
*/
private long addNestedFiles(Path path, List<FileStatus> files, long length,
boolean logExcludedFiles) throws IOException {
final FileSystem fs = path.getFileSystem();

for (FileStatus dir : fs.listStatus(path)) {
if (dir.isDir()) {
if (acceptFile(dir)) {
addNestedFiles(dir.getPath(), files, length,
logExcludedFiles);
} else {
if (logExcludedFiles && LOG.isDebugEnabled()) {
LOG.debug("Directory "
+ dir.getPath().toString()
+ " did not pass the file-filter and is excluded.");
}
}
} else {
if (acceptFile(dir)) {
files.add(dir);
length += dir.getLen();
testForUnsplittable(dir);
} else {
if (logExcludedFiles && LOG.isDebugEnabled()) {
LOG.debug("Directory "
+ dir.getPath().toString()
+ " did not pass the file-filter and is excluded.");
}
}
}
}
return length;
}

/**
* Retrieves the index of the <tt>BlockLocation</tt> that contains the part
* of the file described by the given offset.
*
* @param blocks
*            The different blocks of the file. Must be ordered by their
*            offset.
* @param offset
*            The offset of the position in the file.
* @param startIndex
*            The earliest index to look at.
* @return The index of the block containing the given position.
*/
private int getBlockIndexForPosition(BlockLocation[] blocks, long offset,
long halfSplitSize, int startIndex) {
// go over all indexes after the startIndex
for (int i = startIndex; i < blocks.length; i++) {
long blockStart = blocks[i].getOffset();
long blockEnd = blockStart + blocks[i].getLength();

if (offset >= blockStart && offset < blockEnd) {
// got the block where the split starts
// check if the next block contains more than this one does
if (i < blocks.length - 1 && blockEnd - offset < halfSplitSize) {
return i + 1;
} else {
return i;
}
}
}
throw new IllegalArgumentException("The given offset is not contained in
the any block.");
}

}




On Sun, Aug 9, 2015 at 2:00 PM, Fabian Hueske <fhue...@gmail.com> wrote:

> You need to do something like this:
>
> public class YourInputFormat extends FileInputFormat<Object> {
>
>    private boolean objectRead;
>
>    @Override
>    public FileInputSplit[] createInputSplits(int minNumSplits) {
>       // Create one FileInputSplit for each file you want to read.
>       // Check FileInputFormat for how to recursively enumerate files.
>       // Input splits must start at 0 and have a length equal to length of
> the file to read.
>       return null;
>    }
>
>    @Override
>    public void open(FileInputSplit split) throws IOException {
>       super.open(split);
>       objectRead = false;
>    }
>
>    @Override
>    public boolean reachedEnd() throws IOException {
>       return this.objectRead;
>    }
>
>    @Override
>    public Object nextRecord(Object reuse) throws IOException {
>       Object yourObject = this.stream.read(); // use Kryo here to read
> from this.stream()
>       this.objectRead = true; // read only one object
>       return yourObject;
>    }
> }
>
> 2015-08-07 14:40 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it>:
>
>> Sorry Fabian but I don't understand what I should do :(
>> Could you provide me a simple snippet of code to achieve this?
>>
>> On Fri, Aug 7, 2015 at 1:30 PM, Fabian Hueske <fhue...@gmail.com> wrote:
>>
>>> Enumeration of nested files is a feature of the FileInputFormat.
>>> If you implement your own IF based on FileInputFormat as I suggested
>>> before, you can use that feature.
>>>
>>> 2015-08-07 12:29 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it>:
>>>
>>>> I have a directory containing a list of files, each one containing a
>>>> kryo-serialized object.
>>>> With json serialized objects I don't have that problem (but there I use
>>>>  env.readTextFile(path.withParameters(parameters)
>>>> where parameters has the ENUMERATE_NESTED_FILES_FLAG set to true).
>>>>
>>>> On Fri, Aug 7, 2015 at 12:14 PM, Fabian Hueske <fhue...@gmail.com>
>>>> wrote:
>>>>
>>>>> I don't know your use case.
>>>>> The InputFormat interface is very flexible. Directories can be
>>>>> recursively read. A file can contain one or more objects. You can also 
>>>>> make
>>>>> a smarter IF and put multiple (small) files into one split...
>>>>>
>>>>> It is up to your use case what you need to implement.
>>>>>
>>>>>
>>>>> 2015-08-07 12:08 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it>:
>>>>>
>>>>>> Should this be the case just reading recursively an entire directory
>>>>>> containing one object per file?
>>>>>>
>>>>>> On Fri, Aug 7, 2015 at 12:04 PM, Fabian Hueske <fhue...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> You could implement your own InputFormat based on FileInputFormat
>>>>>>> and overwrite the createInputSplits method to just create a single split
>>>>>>> per file.
>>>>>>>
>>>>>>> 2015-08-07 12:02 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it>
>>>>>>> :
>>>>>>>
>>>>>>>> So what should I do?
>>>>>>>>
>>>>>>>> On Fri, Aug 7, 2015 at 12:01 PM, Fabian Hueske <fhue...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Ah, I checked the code.
>>>>>>>>>
>>>>>>>>> The BinaryInputFormat expects metadata which is written be the
>>>>>>>>> BinaryOutputFormat.
>>>>>>>>> So you cannot use the BinaryInputFormat to read a file which does
>>>>>>>>> not provide the metadata.
>>>>>>>>>
>>>>>>>>> 2015-08-07 11:53 GMT+02:00 Flavio Pompermaier <
>>>>>>>>> pomperma...@okkam.it>:
>>>>>>>>>
>>>>>>>>>> The file containing the serialized object is 7 bytes
>>>>>>>>>>
>>>>>>>>>> On Fri, Aug 7, 2015 at 11:49 AM, Fabian Hueske <fhue...@gmail.com
>>>>>>>>>> > wrote:
>>>>>>>>>>
>>>>>>>>>>> This might be an issue with the blockSize parameter of the
>>>>>>>>>>> BinaryInputFormat.
>>>>>>>>>>> How large is the file with the single object?
>>>>>>>>>>>
>>>>>>>>>>> 2015-08-07 11:37 GMT+02:00 Flavio Pompermaier <
>>>>>>>>>>> pomperma...@okkam.it>:
>>>>>>>>>>>
>>>>>>>>>>>> I also tried with
>>>>>>>>>>>>
>>>>>>>>>>>> DataSet<RowBundle> ds =
>>>>>>>>>>>> env.createInput(inputFormat).setParallelism(1);
>>>>>>>>>>>>
>>>>>>>>>>>> but I get the same error :(
>>>>>>>>>>>>
>>>>>>>>>>>> Moreover, in this example I put exactly one object per file so
>>>>>>>>>>>> it should be able to deserialize it, right?
>>>>>>>>>>>>
>>>>>>>>>>>> On Fri, Aug 7, 2015 at 11:33 AM, Fabian Hueske <
>>>>>>>>>>>> fhue...@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> If you create your file by just sequentially writing all
>>>>>>>>>>>>> objects to the file using Kryo, you can only read it with a 
>>>>>>>>>>>>> parallelism of
>>>>>>>>>>>>> 1.
>>>>>>>>>>>>> Writing binary files in a way that they can be read in
>>>>>>>>>>>>> parallel is a bit tricky (and not specific to Flink).
>>>>>>>>>>>>>
>>>>>>>>>>>>> 2015-08-07 11:28 GMT+02:00 Flavio Pompermaier <
>>>>>>>>>>>>> pomperma...@okkam.it>:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi to all,
>>>>>>>>>>>>>> I;m trying to read a file serialized with kryo but I get this
>>>>>>>>>>>>>> exception (due to the fact that the createInputSplits creates 8
>>>>>>>>>>>>>> inputsplits, where just one is not empty..).
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Caused by: java.io.IOException: Invalid argument
>>>>>>>>>>>>>> at sun.nio.ch.FileChannelImpl.position0(Native Method)
>>>>>>>>>>>>>> at
>>>>>>>>>>>>>> sun.nio.ch.FileChannelImpl.position(FileChannelImpl.java:285)
>>>>>>>>>>>>>> at
>>>>>>>>>>>>>> org.apache.flink.core.fs.local.LocalDataInputStream.seek(LocalDataInputStream.java:57)
>>>>>>>>>>>>>> at
>>>>>>>>>>>>>> org.apache.flink.api.common.io.BinaryInputFormat.open(BinaryInputFormat.java:257)
>>>>>>>>>>>>>> at
>>>>>>>>>>>>>> org.apache.flink.api.common.io.BinaryInputFormat.open(BinaryInputFormat.java:46)
>>>>>>>>>>>>>> at
>>>>>>>>>>>>>> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:151)
>>>>>>>>>>>>>> at
>>>>>>>>>>>>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>>>>>>>>>>>>> at java.lang.Thread.run(Thread.java:745)
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> -----------------------------------------------
>>>>>>>>>>>>>> My program is basically the following:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> public static void main(String[] args) throws Exception {
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> ...
>>>>>>>>>>>>>> //try-with-resources used to autoclose resources
>>>>>>>>>>>>>> try (Output output = new Output(new
>>>>>>>>>>>>>> FileOutputStream("/tmp/KryoTest.ser"))) {
>>>>>>>>>>>>>> //serialise object
>>>>>>>>>>>>>> Kryo kryo=new Kryo();
>>>>>>>>>>>>>> kryo.writeClassAndObject(output, myObj);
>>>>>>>>>>>>>> } catch (FileNotFoundException ex) {
>>>>>>>>>>>>>> LOG.error(ex.getMessage(), ex);
>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> //deserialise object
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> myObj=null;
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> try (Input input = new Input( new
>>>>>>>>>>>>>> FileInputStream("/tmp/KryoTest.ser"))){
>>>>>>>>>>>>>>     Kryo kryo=new Kryo();
>>>>>>>>>>>>>>     myObj =(MyClass)kryo.readClassAndObject(input);
>>>>>>>>>>>>>> } catch (FileNotFoundException ex) {
>>>>>>>>>>>>>> LOG.error(ex.getMessage(), ex);
>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> final ExecutionEnvironment env =
>>>>>>>>>>>>>> ExecutionEnvironment.getExecutionEnvironment();
>>>>>>>>>>>>>> env.registerTypeWithKryoSerializer(MyClass.class,
>>>>>>>>>>>>>> MyClassSerializer.class);
>>>>>>>>>>>>>> Configuration configuration = new Configuration();
>>>>>>>>>>>>>> configuration.setLong(BinaryInputFormat.BLOCK_SIZE_PARAMETER_KEY,
>>>>>>>>>>>>>> 64*1024*1024);
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> TypeInformation<MyClass> typeInfo = new
>>>>>>>>>>>>>> GenericTypeInfo<>(MyClass.class);
>>>>>>>>>>>>>> final BinaryInputFormat<MyClass> inputFormat = new
>>>>>>>>>>>>>> TypeSerializerInputFormat<>(typeInfo);
>>>>>>>>>>>>>> inputFormat.setFilePath("file:/tmp/KryoTest.ser");
>>>>>>>>>>>>>> inputFormat.configure(configuration);
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> DataSet<MyClass> ds = env.createInput(inputFormat);
>>>>>>>>>>>>>> ds.print();
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> private static final class MyClassSerializer extends
>>>>>>>>>>>>>> Serializer<MyClass> {
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> @Override
>>>>>>>>>>>>>> public void write(Kryo kryo, Output output, MyClass object) {
>>>>>>>>>>>>>> kryo.writeClassAndObject(output, object);
>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> @Override
>>>>>>>>>>>>>> public MyClass read(Kryo kryo, Input input, Class<MyClass>
>>>>>>>>>>>>>> type) {
>>>>>>>>>>>>>> return (MyClass) kryo.readClassAndObject(input);
>>>>>>>>>>>>>> }
>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Am I doing something wrong?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>> Flavio
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>


-- 

Flavio Pompermaier

*Development Department*_______________________________________________
*OKKAM**Srl **- www.okkam.it <http://www.okkam.it/>*

*Phone:* +(39) 0461 283 702
*Fax:* + (39) 0461 186 6433
*Email:* pomperma...@okkam.it
*Headquarters:* Trento (Italy), via G.B. Trener 8
*Registered office:* Trento (Italy), via Segantini 23

Confidentially notice. This e-mail transmission may contain legally
privileged and/or confidential information. Please do not read it if you
are not the intended recipient(S). Any use, distribution, reproduction or
disclosure by any other person is strictly prohibited. If you have received
this e-mail in error, please notify the sender and destroy the original
transmission and its attachments without reading or saving it in any manner.

Reply via email to