Thanks a lot! 2015-08-10 12:20 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it>:
> Done through https://issues.apache.org/jira/browse/FLINK-2503 > > Thanks again, > Flavio > > On Mon, Aug 10, 2015 at 12:11 PM, Fabian Hueske <fhue...@gmail.com> wrote: > >> Congrats that you got your InputFormat working! >> It is true, there can be a few inconsistencies in the Formats derived >> from FileInputFormat. >> >> It would be great if you could open JIRAs for these issues. Otherwise, >> the might get lost on the mailing list. >> >> Thanks, Fabian >> >> 2015-08-10 12:02 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it>: >> >>> Hi Fabian, >>> thanks to your help I finally managed to successfully generate a DataSet >>> from my folder but I think that there are some inconsistencies in the >>> hierarchy of InputFormats. >>> The *BinaryOutputFormat*/*TypeSerializerInputFormat* should somehow >>> inherit the behaviour of the FileInputFormat (so respect *unsplittable* >>> and *enumerateNestedFiles*) while they doesn't take into account those >>> flags. >>> Moreover in the *TypeSerializerInputFormat* there's a *"// TODO: fix >>> this shit"* that maybe should be removed or fixed :) >>> >>> Also maintaing aligned testForUnsplittable and decorateInputStream is >>> somehow dangerous.. >>> And maybe visibility for getBlockIndexForPosition should be changed to >>> protected? >>> >>> So basically, my needs was to implement >>> a TypeSerializerInputFormat<RowBundle> but to achieve that I had to make a >>> lot of overrides..am I doing something wrong or are those inputFormat >>> somehow to improve..? This is my IF code (*remark*: from the comment >>> *"Copied >>> from FileInputFormat (override TypeSerializerInputFormat)"* on the code >>> is copied-and-pasted from FileInputFormat..thus MY code ends there): >>> >>> public class RowBundleInputFormat extends >>> TypeSerializerInputFormat<RowBundle> { >>> >>> private static final long serialVersionUID = 1L; >>> private static final Logger LOG = >>> LoggerFactory.getLogger(RowBundleInputFormat.class); >>> >>> /** The fraction that the last split may be larger than the others. */ >>> private static final float MAX_SPLIT_SIZE_DISCREPANCY = 1.1f; >>> private boolean objectRead; >>> >>> public RowBundleInputFormat() { >>> super(new GenericTypeInfo<>(RowBundle.class)); >>> unsplittable = true; >>> } >>> >>> @Override >>> protected FSDataInputStream decorateInputStream(FSDataInputStream >>> inputStream, FileInputSplit fileSplit) throws Throwable { >>> return inputStream; >>> } >>> >>> @Override >>> protected boolean testForUnsplittable(FileStatus pathFile) { >>> return true; >>> } >>> >>> @Override >>> public void open(FileInputSplit split) throws IOException { >>> super.open(split); >>> objectRead = false; >>> } >>> >>> @Override >>> public boolean reachedEnd() throws IOException { >>> return this.objectRead; >>> } >>> >>> @Override >>> public RowBundle nextRecord(RowBundle reuse) throws IOException { >>> RowBundle yourObject = super.nextRecord(reuse); >>> this.objectRead = true; // read only one object >>> return yourObject; >>> } >>> >>> // ------------------------------------------------------------------- >>> // Copied from FileInputFormat (override TypeSerializerInputFormat) >>> // ------------------------------------------------------------------- >>> @Override >>> public FileInputSplit[] createInputSplits(int minNumSplits) >>> throws IOException { >>> if (minNumSplits < 1) { >>> throw new IllegalArgumentException( >>> "Number of input splits has to be at least 1."); >>> } >>> >>> // take the desired number of splits into account >>> minNumSplits = Math.max(minNumSplits, this.numSplits); >>> >>> final Path path = this.filePath; >>> final List<FileInputSplit> inputSplits = new ArrayList<FileInputSplit>( >>> minNumSplits); >>> >>> // get all the files that are involved in the splits >>> List<FileStatus> files = new ArrayList<FileStatus>(); >>> long totalLength = 0; >>> >>> final FileSystem fs = path.getFileSystem(); >>> final FileStatus pathFile = fs.getFileStatus(path); >>> >>> if (pathFile.isDir()) { >>> // input is directory. list all contained files >>> final FileStatus[] dir = fs.listStatus(path); >>> for (int i = 0; i < dir.length; i++) { >>> if (dir[i].isDir()) { >>> if (enumerateNestedFiles) { >>> if (acceptFile(dir[i])) { >>> totalLength += addNestedFiles(dir[i].getPath(), >>> files, 0, true); >>> } else { >>> if (LOG.isDebugEnabled()) { >>> LOG.debug("Directory " >>> + dir[i].getPath().toString() >>> + " did not pass the file-filter and is excluded."); >>> } >>> } >>> } >>> } else { >>> if (acceptFile(dir[i])) { >>> files.add(dir[i]); >>> totalLength += dir[i].getLen(); >>> // as soon as there is one deflate file in a directory, >>> // we can not split it >>> testForUnsplittable(dir[i]); >>> } else { >>> if (LOG.isDebugEnabled()) { >>> LOG.debug("File " >>> + dir[i].getPath().toString() >>> + " did not pass the file-filter and is excluded."); >>> } >>> } >>> } >>> } >>> } else { >>> testForUnsplittable(pathFile); >>> >>> files.add(pathFile); >>> totalLength += pathFile.getLen(); >>> } >>> // returns if unsplittable >>> if (unsplittable) { >>> int splitNum = 0; >>> for (final FileStatus file : files) { >>> final BlockLocation[] blocks = fs.getFileBlockLocations(file, >>> 0, file.getLen()); >>> Set<String> hosts = new HashSet<String>(); >>> for (BlockLocation block : blocks) { >>> hosts.addAll(Arrays.asList(block.getHosts())); >>> } >>> long len = file.getLen(); >>> if (testForUnsplittable(file)) { >>> len = READ_WHOLE_SPLIT_FLAG; >>> } >>> FileInputSplit fis = new FileInputSplit(splitNum++, >>> file.getPath(), 0, len, hosts.toArray(new String[hosts >>> .size()])); >>> inputSplits.add(fis); >>> } >>> return inputSplits.toArray(new FileInputSplit[inputSplits.size()]); >>> } >>> >>> final long maxSplitSize = (minNumSplits < 1) ? Long.MAX_VALUE >>> : (totalLength / minNumSplits + (totalLength % minNumSplits == 0 ? 0 >>> : 1)); >>> >>> // now that we have the files, generate the splits >>> int splitNum = 0; >>> for (final FileStatus file : files) { >>> >>> final long len = file.getLen(); >>> final long blockSize = file.getBlockSize(); >>> >>> final long minSplitSize; >>> if (this.minSplitSize <= blockSize) { >>> minSplitSize = this.minSplitSize; >>> } else { >>> if (LOG.isWarnEnabled()) { >>> LOG.warn("Minimal split size of " + this.minSplitSize >>> + " is larger than the block size of " + blockSize >>> + ". Decreasing minimal split size to block size."); >>> } >>> minSplitSize = blockSize; >>> } >>> >>> final long splitSize = Math.max(minSplitSize, >>> Math.min(maxSplitSize, blockSize)); >>> final long halfSplit = splitSize >>> 1; >>> >>> final long maxBytesForLastSplit = (long) (splitSize * >>> MAX_SPLIT_SIZE_DISCREPANCY); >>> >>> if (len > 0) { >>> >>> // get the block locations and make sure they are in order with >>> // respect to their offset >>> final BlockLocation[] blocks = fs.getFileBlockLocations(file, >>> 0, len); >>> Arrays.sort(blocks); >>> >>> long bytesUnassigned = len; >>> long position = 0; >>> >>> int blockIndex = 0; >>> >>> while (bytesUnassigned > maxBytesForLastSplit) { >>> // get the block containing the majority of the data >>> blockIndex = getBlockIndexForPosition(blocks, position, >>> halfSplit, blockIndex); >>> // create a new split >>> FileInputSplit fis = new FileInputSplit(splitNum++, >>> file.getPath(), position, splitSize, >>> blocks[blockIndex].getHosts()); >>> inputSplits.add(fis); >>> >>> // adjust the positions >>> position += splitSize; >>> bytesUnassigned -= splitSize; >>> } >>> >>> // assign the last split >>> if (bytesUnassigned > 0) { >>> blockIndex = getBlockIndexForPosition(blocks, position, >>> halfSplit, blockIndex); >>> final FileInputSplit fis = new FileInputSplit(splitNum++, >>> file.getPath(), position, bytesUnassigned, >>> blocks[blockIndex].getHosts()); >>> inputSplits.add(fis); >>> } >>> } else { >>> // special case with a file of zero bytes size >>> final BlockLocation[] blocks = fs.getFileBlockLocations(file, >>> 0, 0); >>> String[] hosts; >>> if (blocks.length > 0) { >>> hosts = blocks[0].getHosts(); >>> } else { >>> hosts = new String[0]; >>> } >>> final FileInputSplit fis = new FileInputSplit(splitNum++, >>> file.getPath(), 0, 0, hosts); >>> inputSplits.add(fis); >>> } >>> } >>> >>> return inputSplits.toArray(new FileInputSplit[inputSplits.size()]); >>> } >>> >>> /** >>> * Recursively traverse the input directory structure and enumerate all >>> * accepted nested files. >>> * >>> * @return the total length of accepted files. >>> */ >>> private long addNestedFiles(Path path, List<FileStatus> files, long >>> length, >>> boolean logExcludedFiles) throws IOException { >>> final FileSystem fs = path.getFileSystem(); >>> >>> for (FileStatus dir : fs.listStatus(path)) { >>> if (dir.isDir()) { >>> if (acceptFile(dir)) { >>> addNestedFiles(dir.getPath(), files, length, >>> logExcludedFiles); >>> } else { >>> if (logExcludedFiles && LOG.isDebugEnabled()) { >>> LOG.debug("Directory " >>> + dir.getPath().toString() >>> + " did not pass the file-filter and is excluded."); >>> } >>> } >>> } else { >>> if (acceptFile(dir)) { >>> files.add(dir); >>> length += dir.getLen(); >>> testForUnsplittable(dir); >>> } else { >>> if (logExcludedFiles && LOG.isDebugEnabled()) { >>> LOG.debug("Directory " >>> + dir.getPath().toString() >>> + " did not pass the file-filter and is excluded."); >>> } >>> } >>> } >>> } >>> return length; >>> } >>> >>> /** >>> * Retrieves the index of the <tt>BlockLocation</tt> that contains the >>> part >>> * of the file described by the given offset. >>> * >>> * @param blocks >>> * The different blocks of the file. Must be ordered by their >>> * offset. >>> * @param offset >>> * The offset of the position in the file. >>> * @param startIndex >>> * The earliest index to look at. >>> * @return The index of the block containing the given position. >>> */ >>> private int getBlockIndexForPosition(BlockLocation[] blocks, long offset, >>> long halfSplitSize, int startIndex) { >>> // go over all indexes after the startIndex >>> for (int i = startIndex; i < blocks.length; i++) { >>> long blockStart = blocks[i].getOffset(); >>> long blockEnd = blockStart + blocks[i].getLength(); >>> >>> if (offset >= blockStart && offset < blockEnd) { >>> // got the block where the split starts >>> // check if the next block contains more than this one does >>> if (i < blocks.length - 1 && blockEnd - offset < halfSplitSize) { >>> return i + 1; >>> } else { >>> return i; >>> } >>> } >>> } >>> throw new IllegalArgumentException("The given offset is not contained in >>> the any block."); >>> } >>> >>> } >>> >>> >>> >>> >>> On Sun, Aug 9, 2015 at 2:00 PM, Fabian Hueske <fhue...@gmail.com> wrote: >>> >>>> You need to do something like this: >>>> >>>> public class YourInputFormat extends FileInputFormat<Object> { >>>> >>>> private boolean objectRead; >>>> >>>> @Override >>>> public FileInputSplit[] createInputSplits(int minNumSplits) { >>>> // Create one FileInputSplit for each file you want to read. >>>> // Check FileInputFormat for how to recursively enumerate files. >>>> // Input splits must start at 0 and have a length equal to length >>>> of the file to read. >>>> return null; >>>> } >>>> >>>> @Override >>>> public void open(FileInputSplit split) throws IOException { >>>> super.open(split); >>>> objectRead = false; >>>> } >>>> >>>> @Override >>>> public boolean reachedEnd() throws IOException { >>>> return this.objectRead; >>>> } >>>> >>>> @Override >>>> public Object nextRecord(Object reuse) throws IOException { >>>> Object yourObject = this.stream.read(); // use Kryo here to read >>>> from this.stream() >>>> this.objectRead = true; // read only one object >>>> return yourObject; >>>> } >>>> } >>>> >>>> 2015-08-07 14:40 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it>: >>>> >>>>> Sorry Fabian but I don't understand what I should do :( >>>>> Could you provide me a simple snippet of code to achieve this? >>>>> >>>>> On Fri, Aug 7, 2015 at 1:30 PM, Fabian Hueske <fhue...@gmail.com> >>>>> wrote: >>>>> >>>>>> Enumeration of nested files is a feature of the FileInputFormat. >>>>>> If you implement your own IF based on FileInputFormat as I suggested >>>>>> before, you can use that feature. >>>>>> >>>>>> 2015-08-07 12:29 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it>: >>>>>> >>>>>>> I have a directory containing a list of files, each one containing a >>>>>>> kryo-serialized object. >>>>>>> With json serialized objects I don't have that problem (but there I >>>>>>> use env.readTextFile(path.withParameters(parameters) >>>>>>> where parameters has the ENUMERATE_NESTED_FILES_FLAG set to true). >>>>>>> >>>>>>> On Fri, Aug 7, 2015 at 12:14 PM, Fabian Hueske <fhue...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> I don't know your use case. >>>>>>>> The InputFormat interface is very flexible. Directories can be >>>>>>>> recursively read. A file can contain one or more objects. You can also >>>>>>>> make >>>>>>>> a smarter IF and put multiple (small) files into one split... >>>>>>>> >>>>>>>> It is up to your use case what you need to implement. >>>>>>>> >>>>>>>> >>>>>>>> 2015-08-07 12:08 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it >>>>>>>> >: >>>>>>>> >>>>>>>>> Should this be the case just reading recursively an entire >>>>>>>>> directory containing one object per file? >>>>>>>>> >>>>>>>>> On Fri, Aug 7, 2015 at 12:04 PM, Fabian Hueske <fhue...@gmail.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> You could implement your own InputFormat based on FileInputFormat >>>>>>>>>> and overwrite the createInputSplits method to just create a single >>>>>>>>>> split >>>>>>>>>> per file. >>>>>>>>>> >>>>>>>>>> 2015-08-07 12:02 GMT+02:00 Flavio Pompermaier < >>>>>>>>>> pomperma...@okkam.it>: >>>>>>>>>> >>>>>>>>>>> So what should I do? >>>>>>>>>>> >>>>>>>>>>> On Fri, Aug 7, 2015 at 12:01 PM, Fabian Hueske < >>>>>>>>>>> fhue...@gmail.com> wrote: >>>>>>>>>>> >>>>>>>>>>>> Ah, I checked the code. >>>>>>>>>>>> >>>>>>>>>>>> The BinaryInputFormat expects metadata which is written be the >>>>>>>>>>>> BinaryOutputFormat. >>>>>>>>>>>> So you cannot use the BinaryInputFormat to read a file which >>>>>>>>>>>> does not provide the metadata. >>>>>>>>>>>> >>>>>>>>>>>> 2015-08-07 11:53 GMT+02:00 Flavio Pompermaier < >>>>>>>>>>>> pomperma...@okkam.it>: >>>>>>>>>>>> >>>>>>>>>>>>> The file containing the serialized object is 7 bytes >>>>>>>>>>>>> >>>>>>>>>>>>> On Fri, Aug 7, 2015 at 11:49 AM, Fabian Hueske < >>>>>>>>>>>>> fhue...@gmail.com> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> This might be an issue with the blockSize parameter of the >>>>>>>>>>>>>> BinaryInputFormat. >>>>>>>>>>>>>> How large is the file with the single object? >>>>>>>>>>>>>> >>>>>>>>>>>>>> 2015-08-07 11:37 GMT+02:00 Flavio Pompermaier < >>>>>>>>>>>>>> pomperma...@okkam.it>: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> I also tried with >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> DataSet<RowBundle> ds = >>>>>>>>>>>>>>> env.createInput(inputFormat).setParallelism(1); >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> but I get the same error :( >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Moreover, in this example I put exactly one object per file >>>>>>>>>>>>>>> so it should be able to deserialize it, right? >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Fri, Aug 7, 2015 at 11:33 AM, Fabian Hueske < >>>>>>>>>>>>>>> fhue...@gmail.com> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> If you create your file by just sequentially writing all >>>>>>>>>>>>>>>> objects to the file using Kryo, you can only read it with a >>>>>>>>>>>>>>>> parallelism of >>>>>>>>>>>>>>>> 1. >>>>>>>>>>>>>>>> Writing binary files in a way that they can be read in >>>>>>>>>>>>>>>> parallel is a bit tricky (and not specific to Flink). >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> 2015-08-07 11:28 GMT+02:00 Flavio Pompermaier < >>>>>>>>>>>>>>>> pomperma...@okkam.it>: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Hi to all, >>>>>>>>>>>>>>>>> I;m trying to read a file serialized with kryo but I get >>>>>>>>>>>>>>>>> this exception (due to the fact that the createInputSplits >>>>>>>>>>>>>>>>> creates 8 >>>>>>>>>>>>>>>>> inputsplits, where just one is not empty..). >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Caused by: java.io.IOException: Invalid argument >>>>>>>>>>>>>>>>> at sun.nio.ch.FileChannelImpl.position0(Native Method) >>>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>> sun.nio.ch.FileChannelImpl.position(FileChannelImpl.java:285) >>>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>> org.apache.flink.core.fs.local.LocalDataInputStream.seek(LocalDataInputStream.java:57) >>>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>> org.apache.flink.api.common.io.BinaryInputFormat.open(BinaryInputFormat.java:257) >>>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>> org.apache.flink.api.common.io.BinaryInputFormat.open(BinaryInputFormat.java:46) >>>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:151) >>>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) >>>>>>>>>>>>>>>>> at java.lang.Thread.run(Thread.java:745) >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> ----------------------------------------------- >>>>>>>>>>>>>>>>> My program is basically the following: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> public static void main(String[] args) throws Exception { >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> ... >>>>>>>>>>>>>>>>> //try-with-resources used to autoclose resources >>>>>>>>>>>>>>>>> try (Output output = new Output(new >>>>>>>>>>>>>>>>> FileOutputStream("/tmp/KryoTest.ser"))) { >>>>>>>>>>>>>>>>> //serialise object >>>>>>>>>>>>>>>>> Kryo kryo=new Kryo(); >>>>>>>>>>>>>>>>> kryo.writeClassAndObject(output, myObj); >>>>>>>>>>>>>>>>> } catch (FileNotFoundException ex) { >>>>>>>>>>>>>>>>> LOG.error(ex.getMessage(), ex); >>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> //deserialise object >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> myObj=null; >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> try (Input input = new Input( new >>>>>>>>>>>>>>>>> FileInputStream("/tmp/KryoTest.ser"))){ >>>>>>>>>>>>>>>>> Kryo kryo=new Kryo(); >>>>>>>>>>>>>>>>> myObj =(MyClass)kryo.readClassAndObject(input); >>>>>>>>>>>>>>>>> } catch (FileNotFoundException ex) { >>>>>>>>>>>>>>>>> LOG.error(ex.getMessage(), ex); >>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> final ExecutionEnvironment env = >>>>>>>>>>>>>>>>> ExecutionEnvironment.getExecutionEnvironment(); >>>>>>>>>>>>>>>>> env.registerTypeWithKryoSerializer(MyClass.class, >>>>>>>>>>>>>>>>> MyClassSerializer.class); >>>>>>>>>>>>>>>>> Configuration configuration = new Configuration(); >>>>>>>>>>>>>>>>> configuration.setLong(BinaryInputFormat.BLOCK_SIZE_PARAMETER_KEY, >>>>>>>>>>>>>>>>> 64*1024*1024); >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> TypeInformation<MyClass> typeInfo = new >>>>>>>>>>>>>>>>> GenericTypeInfo<>(MyClass.class); >>>>>>>>>>>>>>>>> final BinaryInputFormat<MyClass> inputFormat = new >>>>>>>>>>>>>>>>> TypeSerializerInputFormat<>(typeInfo); >>>>>>>>>>>>>>>>> inputFormat.setFilePath("file:/tmp/KryoTest.ser"); >>>>>>>>>>>>>>>>> inputFormat.configure(configuration); >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> DataSet<MyClass> ds = env.createInput(inputFormat); >>>>>>>>>>>>>>>>> ds.print(); >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> private static final class MyClassSerializer extends >>>>>>>>>>>>>>>>> Serializer<MyClass> { >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> @Override >>>>>>>>>>>>>>>>> public void write(Kryo kryo, Output output, MyClass >>>>>>>>>>>>>>>>> object) { >>>>>>>>>>>>>>>>> kryo.writeClassAndObject(output, object); >>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> @Override >>>>>>>>>>>>>>>>> public MyClass read(Kryo kryo, Input input, Class<MyClass> >>>>>>>>>>>>>>>>> type) { >>>>>>>>>>>>>>>>> return (MyClass) kryo.readClassAndObject(input); >>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Am I doing something wrong? >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Best, >>>>>>>>>>>>>>>>> Flavio >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >>> >>>