Thanks a lot!

2015-08-10 12:20 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it>:

> Done through https://issues.apache.org/jira/browse/FLINK-2503
>
> Thanks again,
> Flavio
>
> On Mon, Aug 10, 2015 at 12:11 PM, Fabian Hueske <fhue...@gmail.com> wrote:
>
>> Congrats that you got your InputFormat working!
>> It is true, there can be a few inconsistencies in the Formats derived
>> from FileInputFormat.
>>
>> It would be great if you could open JIRAs for these issues. Otherwise,
>> the might get lost on the mailing list.
>>
>> Thanks, Fabian
>>
>> 2015-08-10 12:02 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it>:
>>
>>> Hi Fabian,
>>> thanks to your help I finally managed to successfully generate a DataSet
>>> from my folder but I think that there are some inconsistencies in the
>>> hierarchy of InputFormats.
>>> The *BinaryOutputFormat*/*TypeSerializerInputFormat* should somehow
>>> inherit the behaviour of the FileInputFormat (so respect *unsplittable*
>>> and *enumerateNestedFiles*) while they doesn't take into account those
>>> flags.
>>> Moreover in the *TypeSerializerInputFormat* there's a *"// TODO: fix
>>> this shit"* that maybe should be removed or fixed :)
>>>
>>> Also maintaing aligned testForUnsplittable and decorateInputStream is
>>> somehow dangerous..
>>> And maybe visibility for getBlockIndexForPosition should be changed to
>>> protected?
>>>
>>> So basically, my needs was to implement
>>> a TypeSerializerInputFormat<RowBundle> but to achieve that I had to make a
>>> lot of overrides..am I doing something wrong or are those inputFormat
>>> somehow to improve..? This is my IF code (*remark*: from the comment 
>>> *"Copied
>>> from FileInputFormat (override TypeSerializerInputFormat)"* on the code
>>> is copied-and-pasted from FileInputFormat..thus MY code ends there):
>>>
>>> public class RowBundleInputFormat extends
>>> TypeSerializerInputFormat<RowBundle> {
>>>
>>> private static final long serialVersionUID = 1L;
>>> private static final Logger LOG =
>>> LoggerFactory.getLogger(RowBundleInputFormat.class);
>>>
>>> /** The fraction that the last split may be larger than the others. */
>>> private static final float MAX_SPLIT_SIZE_DISCREPANCY = 1.1f;
>>> private boolean objectRead;
>>>
>>> public RowBundleInputFormat() {
>>> super(new GenericTypeInfo<>(RowBundle.class));
>>> unsplittable = true;
>>> }
>>>
>>> @Override
>>> protected FSDataInputStream decorateInputStream(FSDataInputStream
>>> inputStream, FileInputSplit fileSplit) throws Throwable {
>>> return inputStream;
>>> }
>>>
>>> @Override
>>> protected boolean testForUnsplittable(FileStatus pathFile) {
>>> return true;
>>> }
>>>
>>> @Override
>>> public void open(FileInputSplit split) throws IOException {
>>> super.open(split);
>>> objectRead = false;
>>> }
>>>
>>> @Override
>>> public boolean reachedEnd() throws IOException {
>>> return this.objectRead;
>>> }
>>>
>>> @Override
>>> public RowBundle nextRecord(RowBundle reuse) throws IOException {
>>> RowBundle yourObject = super.nextRecord(reuse);
>>> this.objectRead = true; // read only one object
>>> return yourObject;
>>> }
>>>
>>> // -------------------------------------------------------------------
>>> // Copied from FileInputFormat (override TypeSerializerInputFormat)
>>> // -------------------------------------------------------------------
>>> @Override
>>> public FileInputSplit[] createInputSplits(int minNumSplits)
>>> throws IOException {
>>> if (minNumSplits < 1) {
>>> throw new IllegalArgumentException(
>>> "Number of input splits has to be at least 1.");
>>> }
>>>
>>> // take the desired number of splits into account
>>> minNumSplits = Math.max(minNumSplits, this.numSplits);
>>>
>>> final Path path = this.filePath;
>>> final List<FileInputSplit> inputSplits = new ArrayList<FileInputSplit>(
>>> minNumSplits);
>>>
>>> // get all the files that are involved in the splits
>>> List<FileStatus> files = new ArrayList<FileStatus>();
>>> long totalLength = 0;
>>>
>>> final FileSystem fs = path.getFileSystem();
>>> final FileStatus pathFile = fs.getFileStatus(path);
>>>
>>> if (pathFile.isDir()) {
>>> // input is directory. list all contained files
>>> final FileStatus[] dir = fs.listStatus(path);
>>> for (int i = 0; i < dir.length; i++) {
>>> if (dir[i].isDir()) {
>>> if (enumerateNestedFiles) {
>>> if (acceptFile(dir[i])) {
>>> totalLength += addNestedFiles(dir[i].getPath(),
>>> files, 0, true);
>>> } else {
>>> if (LOG.isDebugEnabled()) {
>>> LOG.debug("Directory "
>>> + dir[i].getPath().toString()
>>> + " did not pass the file-filter and is excluded.");
>>> }
>>> }
>>> }
>>> } else {
>>> if (acceptFile(dir[i])) {
>>> files.add(dir[i]);
>>> totalLength += dir[i].getLen();
>>> // as soon as there is one deflate file in a directory,
>>> // we can not split it
>>> testForUnsplittable(dir[i]);
>>> } else {
>>> if (LOG.isDebugEnabled()) {
>>> LOG.debug("File "
>>> + dir[i].getPath().toString()
>>> + " did not pass the file-filter and is excluded.");
>>> }
>>> }
>>> }
>>> }
>>> } else {
>>> testForUnsplittable(pathFile);
>>>
>>> files.add(pathFile);
>>> totalLength += pathFile.getLen();
>>> }
>>> // returns if unsplittable
>>> if (unsplittable) {
>>> int splitNum = 0;
>>> for (final FileStatus file : files) {
>>> final BlockLocation[] blocks = fs.getFileBlockLocations(file,
>>> 0, file.getLen());
>>> Set<String> hosts = new HashSet<String>();
>>> for (BlockLocation block : blocks) {
>>> hosts.addAll(Arrays.asList(block.getHosts()));
>>> }
>>> long len = file.getLen();
>>> if (testForUnsplittable(file)) {
>>> len = READ_WHOLE_SPLIT_FLAG;
>>> }
>>> FileInputSplit fis = new FileInputSplit(splitNum++,
>>> file.getPath(), 0, len, hosts.toArray(new String[hosts
>>> .size()]));
>>> inputSplits.add(fis);
>>> }
>>> return inputSplits.toArray(new FileInputSplit[inputSplits.size()]);
>>> }
>>>
>>> final long maxSplitSize = (minNumSplits < 1) ? Long.MAX_VALUE
>>> : (totalLength / minNumSplits + (totalLength % minNumSplits == 0 ? 0
>>> : 1));
>>>
>>> // now that we have the files, generate the splits
>>> int splitNum = 0;
>>> for (final FileStatus file : files) {
>>>
>>> final long len = file.getLen();
>>> final long blockSize = file.getBlockSize();
>>>
>>> final long minSplitSize;
>>> if (this.minSplitSize <= blockSize) {
>>> minSplitSize = this.minSplitSize;
>>> } else {
>>> if (LOG.isWarnEnabled()) {
>>> LOG.warn("Minimal split size of " + this.minSplitSize
>>> + " is larger than the block size of " + blockSize
>>> + ". Decreasing minimal split size to block size.");
>>> }
>>> minSplitSize = blockSize;
>>> }
>>>
>>> final long splitSize = Math.max(minSplitSize,
>>> Math.min(maxSplitSize, blockSize));
>>> final long halfSplit = splitSize >>> 1;
>>>
>>> final long maxBytesForLastSplit = (long) (splitSize *
>>> MAX_SPLIT_SIZE_DISCREPANCY);
>>>
>>> if (len > 0) {
>>>
>>> // get the block locations and make sure they are in order with
>>> // respect to their offset
>>> final BlockLocation[] blocks = fs.getFileBlockLocations(file,
>>> 0, len);
>>> Arrays.sort(blocks);
>>>
>>> long bytesUnassigned = len;
>>> long position = 0;
>>>
>>> int blockIndex = 0;
>>>
>>> while (bytesUnassigned > maxBytesForLastSplit) {
>>> // get the block containing the majority of the data
>>> blockIndex = getBlockIndexForPosition(blocks, position,
>>> halfSplit, blockIndex);
>>> // create a new split
>>> FileInputSplit fis = new FileInputSplit(splitNum++,
>>> file.getPath(), position, splitSize,
>>> blocks[blockIndex].getHosts());
>>> inputSplits.add(fis);
>>>
>>> // adjust the positions
>>> position += splitSize;
>>> bytesUnassigned -= splitSize;
>>> }
>>>
>>> // assign the last split
>>> if (bytesUnassigned > 0) {
>>> blockIndex = getBlockIndexForPosition(blocks, position,
>>> halfSplit, blockIndex);
>>> final FileInputSplit fis = new FileInputSplit(splitNum++,
>>> file.getPath(), position, bytesUnassigned,
>>> blocks[blockIndex].getHosts());
>>> inputSplits.add(fis);
>>> }
>>> } else {
>>> // special case with a file of zero bytes size
>>> final BlockLocation[] blocks = fs.getFileBlockLocations(file,
>>> 0, 0);
>>> String[] hosts;
>>> if (blocks.length > 0) {
>>> hosts = blocks[0].getHosts();
>>> } else {
>>> hosts = new String[0];
>>> }
>>> final FileInputSplit fis = new FileInputSplit(splitNum++,
>>> file.getPath(), 0, 0, hosts);
>>> inputSplits.add(fis);
>>> }
>>> }
>>>
>>> return inputSplits.toArray(new FileInputSplit[inputSplits.size()]);
>>> }
>>>
>>> /**
>>> * Recursively traverse the input directory structure and enumerate all
>>> * accepted nested files.
>>> *
>>> * @return the total length of accepted files.
>>> */
>>> private long addNestedFiles(Path path, List<FileStatus> files, long
>>> length,
>>> boolean logExcludedFiles) throws IOException {
>>> final FileSystem fs = path.getFileSystem();
>>>
>>> for (FileStatus dir : fs.listStatus(path)) {
>>> if (dir.isDir()) {
>>> if (acceptFile(dir)) {
>>> addNestedFiles(dir.getPath(), files, length,
>>> logExcludedFiles);
>>> } else {
>>> if (logExcludedFiles && LOG.isDebugEnabled()) {
>>> LOG.debug("Directory "
>>> + dir.getPath().toString()
>>> + " did not pass the file-filter and is excluded.");
>>> }
>>> }
>>> } else {
>>> if (acceptFile(dir)) {
>>> files.add(dir);
>>> length += dir.getLen();
>>> testForUnsplittable(dir);
>>> } else {
>>> if (logExcludedFiles && LOG.isDebugEnabled()) {
>>> LOG.debug("Directory "
>>> + dir.getPath().toString()
>>> + " did not pass the file-filter and is excluded.");
>>> }
>>> }
>>> }
>>> }
>>> return length;
>>> }
>>>
>>> /**
>>> * Retrieves the index of the <tt>BlockLocation</tt> that contains the
>>> part
>>> * of the file described by the given offset.
>>> *
>>> * @param blocks
>>> *            The different blocks of the file. Must be ordered by their
>>> *            offset.
>>> * @param offset
>>> *            The offset of the position in the file.
>>> * @param startIndex
>>> *            The earliest index to look at.
>>> * @return The index of the block containing the given position.
>>> */
>>> private int getBlockIndexForPosition(BlockLocation[] blocks, long offset,
>>> long halfSplitSize, int startIndex) {
>>> // go over all indexes after the startIndex
>>> for (int i = startIndex; i < blocks.length; i++) {
>>> long blockStart = blocks[i].getOffset();
>>> long blockEnd = blockStart + blocks[i].getLength();
>>>
>>> if (offset >= blockStart && offset < blockEnd) {
>>> // got the block where the split starts
>>> // check if the next block contains more than this one does
>>> if (i < blocks.length - 1 && blockEnd - offset < halfSplitSize) {
>>> return i + 1;
>>> } else {
>>> return i;
>>> }
>>> }
>>> }
>>> throw new IllegalArgumentException("The given offset is not contained in
>>> the any block.");
>>> }
>>>
>>> }
>>>
>>>
>>>
>>>
>>> On Sun, Aug 9, 2015 at 2:00 PM, Fabian Hueske <fhue...@gmail.com> wrote:
>>>
>>>> You need to do something like this:
>>>>
>>>> public class YourInputFormat extends FileInputFormat<Object> {
>>>>
>>>>    private boolean objectRead;
>>>>
>>>>    @Override
>>>>    public FileInputSplit[] createInputSplits(int minNumSplits) {
>>>>       // Create one FileInputSplit for each file you want to read.
>>>>       // Check FileInputFormat for how to recursively enumerate files.
>>>>       // Input splits must start at 0 and have a length equal to length
>>>> of the file to read.
>>>>       return null;
>>>>    }
>>>>
>>>>    @Override
>>>>    public void open(FileInputSplit split) throws IOException {
>>>>       super.open(split);
>>>>       objectRead = false;
>>>>    }
>>>>
>>>>    @Override
>>>>    public boolean reachedEnd() throws IOException {
>>>>       return this.objectRead;
>>>>    }
>>>>
>>>>    @Override
>>>>    public Object nextRecord(Object reuse) throws IOException {
>>>>       Object yourObject = this.stream.read(); // use Kryo here to read
>>>> from this.stream()
>>>>       this.objectRead = true; // read only one object
>>>>       return yourObject;
>>>>    }
>>>> }
>>>>
>>>> 2015-08-07 14:40 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it>:
>>>>
>>>>> Sorry Fabian but I don't understand what I should do :(
>>>>> Could you provide me a simple snippet of code to achieve this?
>>>>>
>>>>> On Fri, Aug 7, 2015 at 1:30 PM, Fabian Hueske <fhue...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Enumeration of nested files is a feature of the FileInputFormat.
>>>>>> If you implement your own IF based on FileInputFormat as I suggested
>>>>>> before, you can use that feature.
>>>>>>
>>>>>> 2015-08-07 12:29 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it>:
>>>>>>
>>>>>>> I have a directory containing a list of files, each one containing a
>>>>>>> kryo-serialized object.
>>>>>>> With json serialized objects I don't have that problem (but there I
>>>>>>> use  env.readTextFile(path.withParameters(parameters)
>>>>>>> where parameters has the ENUMERATE_NESTED_FILES_FLAG set to true).
>>>>>>>
>>>>>>> On Fri, Aug 7, 2015 at 12:14 PM, Fabian Hueske <fhue...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> I don't know your use case.
>>>>>>>> The InputFormat interface is very flexible. Directories can be
>>>>>>>> recursively read. A file can contain one or more objects. You can also 
>>>>>>>> make
>>>>>>>> a smarter IF and put multiple (small) files into one split...
>>>>>>>>
>>>>>>>> It is up to your use case what you need to implement.
>>>>>>>>
>>>>>>>>
>>>>>>>> 2015-08-07 12:08 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it
>>>>>>>> >:
>>>>>>>>
>>>>>>>>> Should this be the case just reading recursively an entire
>>>>>>>>> directory containing one object per file?
>>>>>>>>>
>>>>>>>>> On Fri, Aug 7, 2015 at 12:04 PM, Fabian Hueske <fhue...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> You could implement your own InputFormat based on FileInputFormat
>>>>>>>>>> and overwrite the createInputSplits method to just create a single 
>>>>>>>>>> split
>>>>>>>>>> per file.
>>>>>>>>>>
>>>>>>>>>> 2015-08-07 12:02 GMT+02:00 Flavio Pompermaier <
>>>>>>>>>> pomperma...@okkam.it>:
>>>>>>>>>>
>>>>>>>>>>> So what should I do?
>>>>>>>>>>>
>>>>>>>>>>> On Fri, Aug 7, 2015 at 12:01 PM, Fabian Hueske <
>>>>>>>>>>> fhue...@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Ah, I checked the code.
>>>>>>>>>>>>
>>>>>>>>>>>> The BinaryInputFormat expects metadata which is written be the
>>>>>>>>>>>> BinaryOutputFormat.
>>>>>>>>>>>> So you cannot use the BinaryInputFormat to read a file which
>>>>>>>>>>>> does not provide the metadata.
>>>>>>>>>>>>
>>>>>>>>>>>> 2015-08-07 11:53 GMT+02:00 Flavio Pompermaier <
>>>>>>>>>>>> pomperma...@okkam.it>:
>>>>>>>>>>>>
>>>>>>>>>>>>> The file containing the serialized object is 7 bytes
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Fri, Aug 7, 2015 at 11:49 AM, Fabian Hueske <
>>>>>>>>>>>>> fhue...@gmail.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> This might be an issue with the blockSize parameter of the
>>>>>>>>>>>>>> BinaryInputFormat.
>>>>>>>>>>>>>> How large is the file with the single object?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> 2015-08-07 11:37 GMT+02:00 Flavio Pompermaier <
>>>>>>>>>>>>>> pomperma...@okkam.it>:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I also tried with
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> DataSet<RowBundle> ds =
>>>>>>>>>>>>>>> env.createInput(inputFormat).setParallelism(1);
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> but I get the same error :(
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Moreover, in this example I put exactly one object per file
>>>>>>>>>>>>>>> so it should be able to deserialize it, right?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Fri, Aug 7, 2015 at 11:33 AM, Fabian Hueske <
>>>>>>>>>>>>>>> fhue...@gmail.com> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> If you create your file by just sequentially writing all
>>>>>>>>>>>>>>>> objects to the file using Kryo, you can only read it with a 
>>>>>>>>>>>>>>>> parallelism of
>>>>>>>>>>>>>>>> 1.
>>>>>>>>>>>>>>>> Writing binary files in a way that they can be read in
>>>>>>>>>>>>>>>> parallel is a bit tricky (and not specific to Flink).
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> 2015-08-07 11:28 GMT+02:00 Flavio Pompermaier <
>>>>>>>>>>>>>>>> pomperma...@okkam.it>:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Hi to all,
>>>>>>>>>>>>>>>>> I;m trying to read a file serialized with kryo but I get
>>>>>>>>>>>>>>>>> this exception (due to the fact that the createInputSplits 
>>>>>>>>>>>>>>>>> creates 8
>>>>>>>>>>>>>>>>> inputsplits, where just one is not empty..).
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Caused by: java.io.IOException: Invalid argument
>>>>>>>>>>>>>>>>> at sun.nio.ch.FileChannelImpl.position0(Native Method)
>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>> sun.nio.ch.FileChannelImpl.position(FileChannelImpl.java:285)
>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>> org.apache.flink.core.fs.local.LocalDataInputStream.seek(LocalDataInputStream.java:57)
>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>> org.apache.flink.api.common.io.BinaryInputFormat.open(BinaryInputFormat.java:257)
>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>> org.apache.flink.api.common.io.BinaryInputFormat.open(BinaryInputFormat.java:46)
>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:151)
>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>>>>>>>>>>>>>>>> at java.lang.Thread.run(Thread.java:745)
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> -----------------------------------------------
>>>>>>>>>>>>>>>>> My program is basically the following:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> public static void main(String[] args) throws Exception {
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> ...
>>>>>>>>>>>>>>>>> //try-with-resources used to autoclose resources
>>>>>>>>>>>>>>>>> try (Output output = new Output(new
>>>>>>>>>>>>>>>>> FileOutputStream("/tmp/KryoTest.ser"))) {
>>>>>>>>>>>>>>>>> //serialise object
>>>>>>>>>>>>>>>>> Kryo kryo=new Kryo();
>>>>>>>>>>>>>>>>> kryo.writeClassAndObject(output, myObj);
>>>>>>>>>>>>>>>>> } catch (FileNotFoundException ex) {
>>>>>>>>>>>>>>>>> LOG.error(ex.getMessage(), ex);
>>>>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> //deserialise object
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> myObj=null;
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> try (Input input = new Input( new
>>>>>>>>>>>>>>>>> FileInputStream("/tmp/KryoTest.ser"))){
>>>>>>>>>>>>>>>>>     Kryo kryo=new Kryo();
>>>>>>>>>>>>>>>>>     myObj =(MyClass)kryo.readClassAndObject(input);
>>>>>>>>>>>>>>>>> } catch (FileNotFoundException ex) {
>>>>>>>>>>>>>>>>> LOG.error(ex.getMessage(), ex);
>>>>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> final ExecutionEnvironment env =
>>>>>>>>>>>>>>>>> ExecutionEnvironment.getExecutionEnvironment();
>>>>>>>>>>>>>>>>> env.registerTypeWithKryoSerializer(MyClass.class,
>>>>>>>>>>>>>>>>> MyClassSerializer.class);
>>>>>>>>>>>>>>>>> Configuration configuration = new Configuration();
>>>>>>>>>>>>>>>>> configuration.setLong(BinaryInputFormat.BLOCK_SIZE_PARAMETER_KEY,
>>>>>>>>>>>>>>>>> 64*1024*1024);
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> TypeInformation<MyClass> typeInfo = new
>>>>>>>>>>>>>>>>> GenericTypeInfo<>(MyClass.class);
>>>>>>>>>>>>>>>>> final BinaryInputFormat<MyClass> inputFormat = new
>>>>>>>>>>>>>>>>> TypeSerializerInputFormat<>(typeInfo);
>>>>>>>>>>>>>>>>> inputFormat.setFilePath("file:/tmp/KryoTest.ser");
>>>>>>>>>>>>>>>>> inputFormat.configure(configuration);
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> DataSet<MyClass> ds = env.createInput(inputFormat);
>>>>>>>>>>>>>>>>> ds.print();
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> private static final class MyClassSerializer extends
>>>>>>>>>>>>>>>>> Serializer<MyClass> {
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> @Override
>>>>>>>>>>>>>>>>> public void write(Kryo kryo, Output output, MyClass
>>>>>>>>>>>>>>>>> object) {
>>>>>>>>>>>>>>>>> kryo.writeClassAndObject(output, object);
>>>>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> @Override
>>>>>>>>>>>>>>>>> public MyClass read(Kryo kryo, Input input, Class<MyClass>
>>>>>>>>>>>>>>>>> type) {
>>>>>>>>>>>>>>>>> return (MyClass) kryo.readClassAndObject(input);
>>>>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Am I doing something wrong?
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>>>> Flavio
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>>
>>>

Reply via email to