Done through https://issues.apache.org/jira/browse/FLINK-2503

Thanks again,
Flavio

On Mon, Aug 10, 2015 at 12:11 PM, Fabian Hueske <fhue...@gmail.com> wrote:

> Congrats that you got your InputFormat working!
> It is true, there can be a few inconsistencies in the Formats derived from
> FileInputFormat.
>
> It would be great if you could open JIRAs for these issues. Otherwise, the
> might get lost on the mailing list.
>
> Thanks, Fabian
>
> 2015-08-10 12:02 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it>:
>
>> Hi Fabian,
>> thanks to your help I finally managed to successfully generate a DataSet
>> from my folder but I think that there are some inconsistencies in the
>> hierarchy of InputFormats.
>> The *BinaryOutputFormat*/*TypeSerializerInputFormat* should somehow
>> inherit the behaviour of the FileInputFormat (so respect *unsplittable*
>> and *enumerateNestedFiles*) while they doesn't take into account those
>> flags.
>> Moreover in the *TypeSerializerInputFormat* there's a *"// TODO: fix
>> this shit"* that maybe should be removed or fixed :)
>>
>> Also maintaing aligned testForUnsplittable and decorateInputStream is
>> somehow dangerous..
>> And maybe visibility for getBlockIndexForPosition should be changed to
>> protected?
>>
>> So basically, my needs was to implement
>> a TypeSerializerInputFormat<RowBundle> but to achieve that I had to make a
>> lot of overrides..am I doing something wrong or are those inputFormat
>> somehow to improve..? This is my IF code (*remark*: from the comment *"Copied
>> from FileInputFormat (override TypeSerializerInputFormat)"* on the code
>> is copied-and-pasted from FileInputFormat..thus MY code ends there):
>>
>> public class RowBundleInputFormat extends
>> TypeSerializerInputFormat<RowBundle> {
>>
>> private static final long serialVersionUID = 1L;
>> private static final Logger LOG =
>> LoggerFactory.getLogger(RowBundleInputFormat.class);
>>
>> /** The fraction that the last split may be larger than the others. */
>> private static final float MAX_SPLIT_SIZE_DISCREPANCY = 1.1f;
>> private boolean objectRead;
>>
>> public RowBundleInputFormat() {
>> super(new GenericTypeInfo<>(RowBundle.class));
>> unsplittable = true;
>> }
>>
>> @Override
>> protected FSDataInputStream decorateInputStream(FSDataInputStream
>> inputStream, FileInputSplit fileSplit) throws Throwable {
>> return inputStream;
>> }
>>
>> @Override
>> protected boolean testForUnsplittable(FileStatus pathFile) {
>> return true;
>> }
>>
>> @Override
>> public void open(FileInputSplit split) throws IOException {
>> super.open(split);
>> objectRead = false;
>> }
>>
>> @Override
>> public boolean reachedEnd() throws IOException {
>> return this.objectRead;
>> }
>>
>> @Override
>> public RowBundle nextRecord(RowBundle reuse) throws IOException {
>> RowBundle yourObject = super.nextRecord(reuse);
>> this.objectRead = true; // read only one object
>> return yourObject;
>> }
>>
>> // -------------------------------------------------------------------
>> // Copied from FileInputFormat (override TypeSerializerInputFormat)
>> // -------------------------------------------------------------------
>> @Override
>> public FileInputSplit[] createInputSplits(int minNumSplits)
>> throws IOException {
>> if (minNumSplits < 1) {
>> throw new IllegalArgumentException(
>> "Number of input splits has to be at least 1.");
>> }
>>
>> // take the desired number of splits into account
>> minNumSplits = Math.max(minNumSplits, this.numSplits);
>>
>> final Path path = this.filePath;
>> final List<FileInputSplit> inputSplits = new ArrayList<FileInputSplit>(
>> minNumSplits);
>>
>> // get all the files that are involved in the splits
>> List<FileStatus> files = new ArrayList<FileStatus>();
>> long totalLength = 0;
>>
>> final FileSystem fs = path.getFileSystem();
>> final FileStatus pathFile = fs.getFileStatus(path);
>>
>> if (pathFile.isDir()) {
>> // input is directory. list all contained files
>> final FileStatus[] dir = fs.listStatus(path);
>> for (int i = 0; i < dir.length; i++) {
>> if (dir[i].isDir()) {
>> if (enumerateNestedFiles) {
>> if (acceptFile(dir[i])) {
>> totalLength += addNestedFiles(dir[i].getPath(),
>> files, 0, true);
>> } else {
>> if (LOG.isDebugEnabled()) {
>> LOG.debug("Directory "
>> + dir[i].getPath().toString()
>> + " did not pass the file-filter and is excluded.");
>> }
>> }
>> }
>> } else {
>> if (acceptFile(dir[i])) {
>> files.add(dir[i]);
>> totalLength += dir[i].getLen();
>> // as soon as there is one deflate file in a directory,
>> // we can not split it
>> testForUnsplittable(dir[i]);
>> } else {
>> if (LOG.isDebugEnabled()) {
>> LOG.debug("File "
>> + dir[i].getPath().toString()
>> + " did not pass the file-filter and is excluded.");
>> }
>> }
>> }
>> }
>> } else {
>> testForUnsplittable(pathFile);
>>
>> files.add(pathFile);
>> totalLength += pathFile.getLen();
>> }
>> // returns if unsplittable
>> if (unsplittable) {
>> int splitNum = 0;
>> for (final FileStatus file : files) {
>> final BlockLocation[] blocks = fs.getFileBlockLocations(file,
>> 0, file.getLen());
>> Set<String> hosts = new HashSet<String>();
>> for (BlockLocation block : blocks) {
>> hosts.addAll(Arrays.asList(block.getHosts()));
>> }
>> long len = file.getLen();
>> if (testForUnsplittable(file)) {
>> len = READ_WHOLE_SPLIT_FLAG;
>> }
>> FileInputSplit fis = new FileInputSplit(splitNum++,
>> file.getPath(), 0, len, hosts.toArray(new String[hosts
>> .size()]));
>> inputSplits.add(fis);
>> }
>> return inputSplits.toArray(new FileInputSplit[inputSplits.size()]);
>> }
>>
>> final long maxSplitSize = (minNumSplits < 1) ? Long.MAX_VALUE
>> : (totalLength / minNumSplits + (totalLength % minNumSplits == 0 ? 0
>> : 1));
>>
>> // now that we have the files, generate the splits
>> int splitNum = 0;
>> for (final FileStatus file : files) {
>>
>> final long len = file.getLen();
>> final long blockSize = file.getBlockSize();
>>
>> final long minSplitSize;
>> if (this.minSplitSize <= blockSize) {
>> minSplitSize = this.minSplitSize;
>> } else {
>> if (LOG.isWarnEnabled()) {
>> LOG.warn("Minimal split size of " + this.minSplitSize
>> + " is larger than the block size of " + blockSize
>> + ". Decreasing minimal split size to block size.");
>> }
>> minSplitSize = blockSize;
>> }
>>
>> final long splitSize = Math.max(minSplitSize,
>> Math.min(maxSplitSize, blockSize));
>> final long halfSplit = splitSize >>> 1;
>>
>> final long maxBytesForLastSplit = (long) (splitSize *
>> MAX_SPLIT_SIZE_DISCREPANCY);
>>
>> if (len > 0) {
>>
>> // get the block locations and make sure they are in order with
>> // respect to their offset
>> final BlockLocation[] blocks = fs.getFileBlockLocations(file,
>> 0, len);
>> Arrays.sort(blocks);
>>
>> long bytesUnassigned = len;
>> long position = 0;
>>
>> int blockIndex = 0;
>>
>> while (bytesUnassigned > maxBytesForLastSplit) {
>> // get the block containing the majority of the data
>> blockIndex = getBlockIndexForPosition(blocks, position,
>> halfSplit, blockIndex);
>> // create a new split
>> FileInputSplit fis = new FileInputSplit(splitNum++,
>> file.getPath(), position, splitSize,
>> blocks[blockIndex].getHosts());
>> inputSplits.add(fis);
>>
>> // adjust the positions
>> position += splitSize;
>> bytesUnassigned -= splitSize;
>> }
>>
>> // assign the last split
>> if (bytesUnassigned > 0) {
>> blockIndex = getBlockIndexForPosition(blocks, position,
>> halfSplit, blockIndex);
>> final FileInputSplit fis = new FileInputSplit(splitNum++,
>> file.getPath(), position, bytesUnassigned,
>> blocks[blockIndex].getHosts());
>> inputSplits.add(fis);
>> }
>> } else {
>> // special case with a file of zero bytes size
>> final BlockLocation[] blocks = fs.getFileBlockLocations(file,
>> 0, 0);
>> String[] hosts;
>> if (blocks.length > 0) {
>> hosts = blocks[0].getHosts();
>> } else {
>> hosts = new String[0];
>> }
>> final FileInputSplit fis = new FileInputSplit(splitNum++,
>> file.getPath(), 0, 0, hosts);
>> inputSplits.add(fis);
>> }
>> }
>>
>> return inputSplits.toArray(new FileInputSplit[inputSplits.size()]);
>> }
>>
>> /**
>> * Recursively traverse the input directory structure and enumerate all
>> * accepted nested files.
>> *
>> * @return the total length of accepted files.
>> */
>> private long addNestedFiles(Path path, List<FileStatus> files, long
>> length,
>> boolean logExcludedFiles) throws IOException {
>> final FileSystem fs = path.getFileSystem();
>>
>> for (FileStatus dir : fs.listStatus(path)) {
>> if (dir.isDir()) {
>> if (acceptFile(dir)) {
>> addNestedFiles(dir.getPath(), files, length,
>> logExcludedFiles);
>> } else {
>> if (logExcludedFiles && LOG.isDebugEnabled()) {
>> LOG.debug("Directory "
>> + dir.getPath().toString()
>> + " did not pass the file-filter and is excluded.");
>> }
>> }
>> } else {
>> if (acceptFile(dir)) {
>> files.add(dir);
>> length += dir.getLen();
>> testForUnsplittable(dir);
>> } else {
>> if (logExcludedFiles && LOG.isDebugEnabled()) {
>> LOG.debug("Directory "
>> + dir.getPath().toString()
>> + " did not pass the file-filter and is excluded.");
>> }
>> }
>> }
>> }
>> return length;
>> }
>>
>> /**
>> * Retrieves the index of the <tt>BlockLocation</tt> that contains the part
>> * of the file described by the given offset.
>> *
>> * @param blocks
>> *            The different blocks of the file. Must be ordered by their
>> *            offset.
>> * @param offset
>> *            The offset of the position in the file.
>> * @param startIndex
>> *            The earliest index to look at.
>> * @return The index of the block containing the given position.
>> */
>> private int getBlockIndexForPosition(BlockLocation[] blocks, long offset,
>> long halfSplitSize, int startIndex) {
>> // go over all indexes after the startIndex
>> for (int i = startIndex; i < blocks.length; i++) {
>> long blockStart = blocks[i].getOffset();
>> long blockEnd = blockStart + blocks[i].getLength();
>>
>> if (offset >= blockStart && offset < blockEnd) {
>> // got the block where the split starts
>> // check if the next block contains more than this one does
>> if (i < blocks.length - 1 && blockEnd - offset < halfSplitSize) {
>> return i + 1;
>> } else {
>> return i;
>> }
>> }
>> }
>> throw new IllegalArgumentException("The given offset is not contained in
>> the any block.");
>> }
>>
>> }
>>
>>
>>
>>
>> On Sun, Aug 9, 2015 at 2:00 PM, Fabian Hueske <fhue...@gmail.com> wrote:
>>
>>> You need to do something like this:
>>>
>>> public class YourInputFormat extends FileInputFormat<Object> {
>>>
>>>    private boolean objectRead;
>>>
>>>    @Override
>>>    public FileInputSplit[] createInputSplits(int minNumSplits) {
>>>       // Create one FileInputSplit for each file you want to read.
>>>       // Check FileInputFormat for how to recursively enumerate files.
>>>       // Input splits must start at 0 and have a length equal to length
>>> of the file to read.
>>>       return null;
>>>    }
>>>
>>>    @Override
>>>    public void open(FileInputSplit split) throws IOException {
>>>       super.open(split);
>>>       objectRead = false;
>>>    }
>>>
>>>    @Override
>>>    public boolean reachedEnd() throws IOException {
>>>       return this.objectRead;
>>>    }
>>>
>>>    @Override
>>>    public Object nextRecord(Object reuse) throws IOException {
>>>       Object yourObject = this.stream.read(); // use Kryo here to read
>>> from this.stream()
>>>       this.objectRead = true; // read only one object
>>>       return yourObject;
>>>    }
>>> }
>>>
>>> 2015-08-07 14:40 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it>:
>>>
>>>> Sorry Fabian but I don't understand what I should do :(
>>>> Could you provide me a simple snippet of code to achieve this?
>>>>
>>>> On Fri, Aug 7, 2015 at 1:30 PM, Fabian Hueske <fhue...@gmail.com>
>>>> wrote:
>>>>
>>>>> Enumeration of nested files is a feature of the FileInputFormat.
>>>>> If you implement your own IF based on FileInputFormat as I suggested
>>>>> before, you can use that feature.
>>>>>
>>>>> 2015-08-07 12:29 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it>:
>>>>>
>>>>>> I have a directory containing a list of files, each one containing a
>>>>>> kryo-serialized object.
>>>>>> With json serialized objects I don't have that problem (but there I
>>>>>> use  env.readTextFile(path.withParameters(parameters)
>>>>>> where parameters has the ENUMERATE_NESTED_FILES_FLAG set to true).
>>>>>>
>>>>>> On Fri, Aug 7, 2015 at 12:14 PM, Fabian Hueske <fhue...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> I don't know your use case.
>>>>>>> The InputFormat interface is very flexible. Directories can be
>>>>>>> recursively read. A file can contain one or more objects. You can also 
>>>>>>> make
>>>>>>> a smarter IF and put multiple (small) files into one split...
>>>>>>>
>>>>>>> It is up to your use case what you need to implement.
>>>>>>>
>>>>>>>
>>>>>>> 2015-08-07 12:08 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it>
>>>>>>> :
>>>>>>>
>>>>>>>> Should this be the case just reading recursively an entire
>>>>>>>> directory containing one object per file?
>>>>>>>>
>>>>>>>> On Fri, Aug 7, 2015 at 12:04 PM, Fabian Hueske <fhue...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> You could implement your own InputFormat based on FileInputFormat
>>>>>>>>> and overwrite the createInputSplits method to just create a single 
>>>>>>>>> split
>>>>>>>>> per file.
>>>>>>>>>
>>>>>>>>> 2015-08-07 12:02 GMT+02:00 Flavio Pompermaier <
>>>>>>>>> pomperma...@okkam.it>:
>>>>>>>>>
>>>>>>>>>> So what should I do?
>>>>>>>>>>
>>>>>>>>>> On Fri, Aug 7, 2015 at 12:01 PM, Fabian Hueske <fhue...@gmail.com
>>>>>>>>>> > wrote:
>>>>>>>>>>
>>>>>>>>>>> Ah, I checked the code.
>>>>>>>>>>>
>>>>>>>>>>> The BinaryInputFormat expects metadata which is written be the
>>>>>>>>>>> BinaryOutputFormat.
>>>>>>>>>>> So you cannot use the BinaryInputFormat to read a file which
>>>>>>>>>>> does not provide the metadata.
>>>>>>>>>>>
>>>>>>>>>>> 2015-08-07 11:53 GMT+02:00 Flavio Pompermaier <
>>>>>>>>>>> pomperma...@okkam.it>:
>>>>>>>>>>>
>>>>>>>>>>>> The file containing the serialized object is 7 bytes
>>>>>>>>>>>>
>>>>>>>>>>>> On Fri, Aug 7, 2015 at 11:49 AM, Fabian Hueske <
>>>>>>>>>>>> fhue...@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> This might be an issue with the blockSize parameter of the
>>>>>>>>>>>>> BinaryInputFormat.
>>>>>>>>>>>>> How large is the file with the single object?
>>>>>>>>>>>>>
>>>>>>>>>>>>> 2015-08-07 11:37 GMT+02:00 Flavio Pompermaier <
>>>>>>>>>>>>> pomperma...@okkam.it>:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> I also tried with
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> DataSet<RowBundle> ds =
>>>>>>>>>>>>>> env.createInput(inputFormat).setParallelism(1);
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> but I get the same error :(
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Moreover, in this example I put exactly one object per file
>>>>>>>>>>>>>> so it should be able to deserialize it, right?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Fri, Aug 7, 2015 at 11:33 AM, Fabian Hueske <
>>>>>>>>>>>>>> fhue...@gmail.com> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> If you create your file by just sequentially writing all
>>>>>>>>>>>>>>> objects to the file using Kryo, you can only read it with a 
>>>>>>>>>>>>>>> parallelism of
>>>>>>>>>>>>>>> 1.
>>>>>>>>>>>>>>> Writing binary files in a way that they can be read in
>>>>>>>>>>>>>>> parallel is a bit tricky (and not specific to Flink).
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> 2015-08-07 11:28 GMT+02:00 Flavio Pompermaier <
>>>>>>>>>>>>>>> pomperma...@okkam.it>:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Hi to all,
>>>>>>>>>>>>>>>> I;m trying to read a file serialized with kryo but I get
>>>>>>>>>>>>>>>> this exception (due to the fact that the createInputSplits 
>>>>>>>>>>>>>>>> creates 8
>>>>>>>>>>>>>>>> inputsplits, where just one is not empty..).
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Caused by: java.io.IOException: Invalid argument
>>>>>>>>>>>>>>>> at sun.nio.ch.FileChannelImpl.position0(Native Method)
>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>> sun.nio.ch.FileChannelImpl.position(FileChannelImpl.java:285)
>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>> org.apache.flink.core.fs.local.LocalDataInputStream.seek(LocalDataInputStream.java:57)
>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>> org.apache.flink.api.common.io.BinaryInputFormat.open(BinaryInputFormat.java:257)
>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>> org.apache.flink.api.common.io.BinaryInputFormat.open(BinaryInputFormat.java:46)
>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:151)
>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>>>>>>>>>>>>>>> at java.lang.Thread.run(Thread.java:745)
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> -----------------------------------------------
>>>>>>>>>>>>>>>> My program is basically the following:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> public static void main(String[] args) throws Exception {
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> ...
>>>>>>>>>>>>>>>> //try-with-resources used to autoclose resources
>>>>>>>>>>>>>>>> try (Output output = new Output(new
>>>>>>>>>>>>>>>> FileOutputStream("/tmp/KryoTest.ser"))) {
>>>>>>>>>>>>>>>> //serialise object
>>>>>>>>>>>>>>>> Kryo kryo=new Kryo();
>>>>>>>>>>>>>>>> kryo.writeClassAndObject(output, myObj);
>>>>>>>>>>>>>>>> } catch (FileNotFoundException ex) {
>>>>>>>>>>>>>>>> LOG.error(ex.getMessage(), ex);
>>>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> //deserialise object
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> myObj=null;
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> try (Input input = new Input( new
>>>>>>>>>>>>>>>> FileInputStream("/tmp/KryoTest.ser"))){
>>>>>>>>>>>>>>>>     Kryo kryo=new Kryo();
>>>>>>>>>>>>>>>>     myObj =(MyClass)kryo.readClassAndObject(input);
>>>>>>>>>>>>>>>> } catch (FileNotFoundException ex) {
>>>>>>>>>>>>>>>> LOG.error(ex.getMessage(), ex);
>>>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> final ExecutionEnvironment env =
>>>>>>>>>>>>>>>> ExecutionEnvironment.getExecutionEnvironment();
>>>>>>>>>>>>>>>> env.registerTypeWithKryoSerializer(MyClass.class,
>>>>>>>>>>>>>>>> MyClassSerializer.class);
>>>>>>>>>>>>>>>> Configuration configuration = new Configuration();
>>>>>>>>>>>>>>>> configuration.setLong(BinaryInputFormat.BLOCK_SIZE_PARAMETER_KEY,
>>>>>>>>>>>>>>>> 64*1024*1024);
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> TypeInformation<MyClass> typeInfo = new
>>>>>>>>>>>>>>>> GenericTypeInfo<>(MyClass.class);
>>>>>>>>>>>>>>>> final BinaryInputFormat<MyClass> inputFormat = new
>>>>>>>>>>>>>>>> TypeSerializerInputFormat<>(typeInfo);
>>>>>>>>>>>>>>>> inputFormat.setFilePath("file:/tmp/KryoTest.ser");
>>>>>>>>>>>>>>>> inputFormat.configure(configuration);
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> DataSet<MyClass> ds = env.createInput(inputFormat);
>>>>>>>>>>>>>>>> ds.print();
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> private static final class MyClassSerializer extends
>>>>>>>>>>>>>>>> Serializer<MyClass> {
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> @Override
>>>>>>>>>>>>>>>> public void write(Kryo kryo, Output output, MyClass object)
>>>>>>>>>>>>>>>> {
>>>>>>>>>>>>>>>> kryo.writeClassAndObject(output, object);
>>>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> @Override
>>>>>>>>>>>>>>>> public MyClass read(Kryo kryo, Input input, Class<MyClass>
>>>>>>>>>>>>>>>> type) {
>>>>>>>>>>>>>>>> return (MyClass) kryo.readClassAndObject(input);
>>>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Am I doing something wrong?
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>>> Flavio
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>>
>>

Reply via email to