[ https://issues.apache.org/jira/browse/FLINK-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16626690#comment-16626690 ]
ASF GitHub Bot commented on FLINK-10134: ---------------------------------------- XuQianJin-Stars closed pull request #6710: [FLINK-10134] UTF-16 support for TextInputFormat bug fixed URL: https://github.com/apache/flink/pull/6710 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java index c1ef344175b..8eb43424264 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java @@ -62,6 +62,9 @@ // Charset is not serializable private transient Charset charset; + /** The charset of bom in the file to process. */ + private String bomCharsetName; + /** * The default read buffer size = 1MB. */ @@ -221,6 +224,11 @@ public void setCharset(String charset) { } } + @PublicEvolving + public String getBomCharsetName() { + return this.bomCharsetName; + } + public byte[] getDelimiter() { return delimiter; } @@ -341,7 +349,7 @@ public void configure(Configuration parameters) { @Override public FileBaseStatistics getStatistics(BaseStatistics cachedStats) throws IOException { - final FileBaseStatistics cachedFileStats = cachedStats instanceof FileBaseStatistics ? + final FileBaseStatistics cachedFileStats = cachedStats instanceof FileInputFormat.FileBaseStatistics ? (FileBaseStatistics) cachedStats : null; // store properties @@ -408,7 +416,9 @@ public FileBaseStatistics getStatistics(BaseStatistics cachedStats) throws IOExc while (samplesTaken < numSamples && fileNum < allFiles.size()) { // make a split for the sample and use it to read a record FileStatus file = allFiles.get(fileNum); - FileInputSplit split = new FileInputSplit(0, file.getPath(), offset, file.getLen() - offset, null); + String bomCharsetName = getBomCharset(file); + + FileInputSplit split = new FileInputSplit(0, file.getPath(), offset, file.getLen() - offset, null, bomCharsetName); // we open the split, read one line, and take its length try { @@ -467,6 +477,7 @@ public FileBaseStatistics getStatistics(BaseStatistics cachedStats) throws IOExc */ @Override public void open(FileInputSplit split) throws IOException { + this.bomCharsetName = split.getBomCharsetName(); super.open(split); initBuffers(); @@ -736,7 +747,7 @@ public void reopen(FileInputSplit split, Long state) throws IOException { Preconditions.checkNotNull(split, "reopen() cannot be called on a null split."); Preconditions.checkNotNull(state, "reopen() cannot be called with a null initial state."); Preconditions.checkArgument(state == -1 || state >= split.getStart(), - " Illegal offset "+ state +", smaller than the splits start=" + split.getStart()); + " Illegal offset " + state + ", smaller than the splits start=" + split.getStart()); try { this.open(split); diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java index 14cf647cd24..c58344fba6c 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java @@ -34,7 +34,7 @@ import org.apache.flink.core.fs.FileStatus; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; - +import org.apache.flink.core.fs.local.LocalFileSystem; import org.apache.flink.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -55,20 +55,20 @@ * {@link #nextRecord(Object)} and {@link #reachedEnd()} methods need to be implemented. * Additionally, one may override {@link #open(FileInputSplit)} and {@link #close()} to * change the life cycle behavior. - * + * * <p>After the {@link #open(FileInputSplit)} method completed, the file input data is available * from the {@link #stream} field.</p> */ @Public public abstract class FileInputFormat<OT> extends RichInputFormat<OT, FileInputSplit> { - + // -------------------------------------- Constants ------------------------------------------- - + private static final Logger LOG = LoggerFactory.getLogger(FileInputFormat.class); - + private static final long serialVersionUID = 1L; - - + + /** * The fraction that the last split may be larger than the others. */ @@ -84,8 +84,8 @@ * unsplittable files. */ protected static final Map<String, InflaterInputStreamFactory<?>> INFLATER_INPUT_STREAM_FACTORIES = - new HashMap<String, InflaterInputStreamFactory<?>>(); - + new HashMap<String, InflaterInputStreamFactory<?>>(); + /** * The splitLength is set to -1L for reading the whole split. */ @@ -96,9 +96,11 @@ initDefaultInflaterInputStreamFactories(); } + /** * Initialize defaults for input format. Needs to be a static method because it is configured for local * cluster execution, see LocalFlinkMiniCluster. + * * @param configuration The configuration to load defaults from */ private static void initDefaultsFromConfiguration(Configuration configuration) { @@ -117,10 +119,10 @@ private static void initDefaultsFromConfiguration(Configuration configuration) { private static void initDefaultInflaterInputStreamFactories() { InflaterInputStreamFactory<?>[] defaultFactories = { - DeflateInflaterInputStreamFactory.getInstance(), - GzipInflaterInputStreamFactory.getInstance(), - Bzip2InputStreamFactory.getInstance(), - XZInputStreamFactory.getInstance(), + DeflateInflaterInputStreamFactory.getInstance(), + GzipInflaterInputStreamFactory.getInstance(), + Bzip2InputStreamFactory.getInstance(), + XZInputStreamFactory.getInstance(), }; for (InflaterInputStreamFactory<?> inputStreamFactory : defaultFactories) { for (String fileExtension : inputStreamFactory.getCommonFileExtensions()) { @@ -132,8 +134,9 @@ private static void initDefaultInflaterInputStreamFactories() { /** * Registers a decompression algorithm through a {@link org.apache.flink.api.common.io.compression.InflaterInputStreamFactory} * with a file extension for transparent decompression. + * * @param fileExtension of the compressed files - * @param factory to create an {@link java.util.zip.InflaterInputStream} that handles the decompression format + * @param factory to create an {@link java.util.zip.InflaterInputStream} that handles the decompression format */ public static void registerInflaterInputStreamFactory(String fileExtension, InflaterInputStreamFactory<?> factory) { synchronized (INFLATER_INPUT_STREAM_FACTORIES) { @@ -151,23 +154,24 @@ public static void registerInflaterInputStreamFactory(String fileExtension, Infl /** * Returns the extension of a file name (!= a path). + * * @return the extension of the file name or {@code null} if there is no extension. */ protected static String extractFileExtension(String fileName) { checkNotNull(fileName); int lastPeriodIndex = fileName.lastIndexOf('.'); - if (lastPeriodIndex < 0){ + if (lastPeriodIndex < 0) { return null; } else { return fileName.substring(lastPeriodIndex + 1); } } - + // -------------------------------------------------------------------------------------------- // Variables for internal operation. - // They are all transient, because we do not want them so be serialized + // They are all transient, because we do not want them so be serialized // -------------------------------------------------------------------------------------------- - + /** * The input stream reading from the input file. */ @@ -187,16 +191,16 @@ protected static String extractFileExtension(String fileName) { * The current split that this parallel instance must consume. */ protected transient FileInputSplit currentSplit; - + // -------------------------------------------------------------------------------------------- // The configuration parameters. Configured on the instance and serialized to be shipped. // -------------------------------------------------------------------------------------------- - + /** * The path to the file that contains the input. * * @deprecated Please override {@link FileInputFormat#supportsMultiPaths()} and - * use {@link FileInputFormat#getFilePaths()} and {@link FileInputFormat#setFilePaths(Path...)}. + * use {@link FileInputFormat#getFilePaths()} and {@link FileInputFormat#setFilePaths(Path...)}. */ @Deprecated protected Path filePath; @@ -205,22 +209,22 @@ protected static String extractFileExtension(String fileName) { * The list of paths to files and directories that contain the input. */ private Path[] filePaths; - + /** * The minimal split size, set by the configure() method. */ - protected long minSplitSize = 0; - + protected long minSplitSize = 0; + /** * The desired number of splits, as set by the configure() method. */ protected int numSplits = -1; - + /** * Stream opening timeout. */ protected long openTimeout = DEFAULT_OPENING_TIMEOUT; - + /** * Some file input formats are not splittable on a block level (avro, deflate) * Therefore, the FileInputFormat can only read whole files. @@ -240,24 +244,23 @@ protected static String extractFileExtension(String fileName) { // -------------------------------------------------------------------------------------------- // Constructors - // -------------------------------------------------------------------------------------------- + // -------------------------------------------------------------------------------------------- - public FileInputFormat() {} + public FileInputFormat() { + } protected FileInputFormat(Path filePath) { if (filePath != null) { setFilePath(filePath); } } - + // -------------------------------------------------------------------------------------------- // Getters/setters for the configurable parameters // -------------------------------------------------------------------------------------------- - + /** - * * @return The path of the file to read. - * * @deprecated Please use getFilePaths() instead. */ @Deprecated @@ -276,10 +279,10 @@ public Path getFilePath() { return filePath; } } - + /** * Returns the paths of all files to be read by the FileInputFormat. - * + * * @return The list of all paths to read. */ public Path[] getFilePaths() { @@ -293,10 +296,10 @@ public Path getFilePath() { if (this.filePath == null) { return new Path[0]; } - return new Path[] {filePath}; + return new Path[]{filePath}; } } - + public void setFilePath(String filePath) { if (filePath == null) { throw new IllegalArgumentException("File path cannot be null."); @@ -318,7 +321,7 @@ public void setFilePath(String filePath) { throw new RuntimeException("Could not create a valid URI from the given file path name: " + rex.getMessage()); } } - + /** * Sets a single path of a file to be read. * @@ -331,10 +334,10 @@ public void setFilePath(Path filePath) { setFilePaths(filePath); } - + /** * Sets multiple paths of files to be read. - * + * * @param filePaths The paths of the files to read. */ public void setFilePaths(String... filePaths) { @@ -368,11 +371,11 @@ public void setFilePaths(Path... filePaths) { this.filePaths = filePaths; } - + public long getMinSplitSize() { return minSplitSize; } - + public void setMinSplitSize(long minSplitSize) { if (minSplitSize < 0) { throw new IllegalArgumentException("The minimum split size cannot be negative."); @@ -380,23 +383,23 @@ public void setMinSplitSize(long minSplitSize) { this.minSplitSize = minSplitSize; } - + public int getNumSplits() { return numSplits; } - + public void setNumSplits(int numSplits) { if (numSplits < -1 || numSplits == 0) { throw new IllegalArgumentException("The desired number of splits must be positive or -1 (= don't care)."); } - + this.numSplits = numSplits; } - + public long getOpenTimeout() { return openTimeout; } - + public void setOpenTimeout(long openTimeout) { if (openTimeout < 0) { throw new IllegalArgumentException("The timeout for opening the input splits must be positive or zero (= infinite)."); @@ -415,7 +418,7 @@ public boolean getNestedFileEnumeration() { // -------------------------------------------------------------------------------------------- // Getting information about the split that is currently open // -------------------------------------------------------------------------------------------- - + /** * Gets the start of the current split. * @@ -424,7 +427,7 @@ public boolean getNestedFileEnumeration() { public long getSplitStart() { return splitStart; } - + /** * Gets the length or remaining length of the current split. * @@ -441,10 +444,10 @@ public void setFilesFilter(FilePathFilter filesFilter) { // -------------------------------------------------------------------------------------------- // Pre-flight: Configuration, Splits, Sampling // -------------------------------------------------------------------------------------------- - + /** * Configures the file input format by reading the file path from the configuration. - * + * * @see org.apache.flink.api.common.io.InputFormat#configure(org.apache.flink.configuration.Configuration) */ @Override @@ -467,34 +470,33 @@ public void configure(Configuration parameters) { /** * Obtains basic file statistics containing only file size. If the input is a directory, then the size is the sum of all contained files. - * + * * @see org.apache.flink.api.common.io.InputFormat#getStatistics(org.apache.flink.api.common.io.statistics.BaseStatistics) */ @Override public FileBaseStatistics getStatistics(BaseStatistics cachedStats) throws IOException { - + final FileBaseStatistics cachedFileStats = cachedStats instanceof FileBaseStatistics ? (FileBaseStatistics) cachedStats : null; - + try { return getFileStats(cachedFileStats, getFilePaths(), new ArrayList<>(getFilePaths().length)); } catch (IOException ioex) { if (LOG.isWarnEnabled()) { LOG.warn("Could not determine statistics for paths '" + Arrays.toString(getFilePaths()) + "' due to an io error: " - + ioex.getMessage()); + + ioex.getMessage()); } - } - catch (Throwable t) { + } catch (Throwable t) { if (LOG.isErrorEnabled()) { LOG.error("Unexpected problem while getting the file statistics for paths '" + Arrays.toString(getFilePaths()) + "': " - + t.getMessage(), t); + + t.getMessage(), t); } } - + // no statistics available return null; } - + protected FileBaseStatistics getFileStats(FileBaseStatistics cachedStats, Path[] filePaths, ArrayList<FileStatus> files) throws IOException { long totalLength = 0; @@ -519,7 +521,7 @@ protected FileBaseStatistics getFileStats(FileBaseStatistics cachedStats, Path[] return new FileBaseStatistics(latestModTime, totalLength, BaseStatistics.AVG_RECORD_BYTES_UNKNOWN); } - + protected FileBaseStatistics getFileStats(FileBaseStatistics cachedStats, Path filePath, FileSystem fs, ArrayList<FileStatus> files) throws IOException { // get the file info and check whether the cached statistics are still valid. @@ -562,10 +564,9 @@ public LocatableInputSplitAssigner getInputSplitAssigner(FileInputSplit[] splits * Computes the input splits for the file. By default, one file block is one split. If more splits * are requested than blocks are available, then a split may be a fraction of a block and splits may cross * block boundaries. - * + * * @param minNumSplits The minimum desired number of file splits. * @return The computed file splits. - * * @see org.apache.flink.api.common.io.InputFormat#createInputSplits(int) */ @Override @@ -573,10 +574,10 @@ public LocatableInputSplitAssigner getInputSplitAssigner(FileInputSplit[] splits if (minNumSplits < 1) { throw new IllegalArgumentException("Number of input splits has to be at least 1."); } - + // take the desired number of splits into account minNumSplits = Math.max(minNumSplits, this.numSplits); - + final List<FileInputSplit> inputSplits = new ArrayList<FileInputSplit>(minNumSplits); // get all the files that are involved in the splits @@ -601,23 +602,25 @@ public LocatableInputSplitAssigner getInputSplitAssigner(FileInputSplit[] splits if (unsplittable) { int splitNum = 0; for (final FileStatus file : files) { + String bomCharsetName = getBomCharset(file); + final FileSystem fs = file.getPath().getFileSystem(); final BlockLocation[] blocks = fs.getFileBlockLocations(file, 0, file.getLen()); Set<String> hosts = new HashSet<String>(); - for(BlockLocation block : blocks) { + for (BlockLocation block : blocks) { hosts.addAll(Arrays.asList(block.getHosts())); } long len = file.getLen(); - if(testForUnsplittable(file)) { + if (testForUnsplittable(file)) { len = READ_WHOLE_SPLIT_FLAG; } FileInputSplit fis = new FileInputSplit(splitNum++, file.getPath(), 0, len, - hosts.toArray(new String[hosts.size()])); + hosts.toArray(new String[hosts.size()]), bomCharsetName); inputSplits.add(fis); } return inputSplits.toArray(new FileInputSplit[inputSplits.size()]); } - + final long maxSplitSize = totalLength / minNumSplits + (totalLength % minNumSplits == 0 ? 0 : 1); @@ -625,17 +628,18 @@ public LocatableInputSplitAssigner getInputSplitAssigner(FileInputSplit[] splits int splitNum = 0; for (final FileStatus file : files) { + String bomCharsetName = getBomCharset(file); + final FileSystem fs = file.getPath().getFileSystem(); final long len = file.getLen(); final long blockSize = file.getBlockSize(); - + final long minSplitSize; if (this.minSplitSize <= blockSize) { minSplitSize = this.minSplitSize; - } - else { + } else { if (LOG.isWarnEnabled()) { - LOG.warn("Minimal split size of " + this.minSplitSize + " is larger than the block size of " + + LOG.warn("Minimal split size of " + this.minSplitSize + " is larger than the block size of " + blockSize + ". Decreasing minimal split size to block size."); } minSplitSize = blockSize; @@ -662,7 +666,7 @@ public LocatableInputSplitAssigner getInputSplitAssigner(FileInputSplit[] splits blockIndex = getBlockIndexForPosition(blocks, position, halfSplit, blockIndex); // create a new split FileInputSplit fis = new FileInputSplit(splitNum++, file.getPath(), position, splitSize, - blocks[blockIndex].getHosts()); + blocks[blockIndex].getHosts(), bomCharsetName); inputSplits.add(fis); // adjust the positions @@ -674,7 +678,7 @@ public LocatableInputSplitAssigner getInputSplitAssigner(FileInputSplit[] splits if (bytesUnassigned > 0) { blockIndex = getBlockIndexForPosition(blocks, position, halfSplit, blockIndex); final FileInputSplit fis = new FileInputSplit(splitNum++, file.getPath(), position, - bytesUnassigned, blocks[blockIndex].getHosts()); + bytesUnassigned, blocks[blockIndex].getHosts(), bomCharsetName); inputSplits.add(fis); } } else { @@ -686,7 +690,7 @@ public LocatableInputSplitAssigner getInputSplitAssigner(FileInputSplit[] splits } else { hosts = new String[0]; } - final FileInputSplit fis = new FileInputSplit(splitNum++, file.getPath(), 0, 0, hosts); + final FileInputSplit fis = new FileInputSplit(splitNum++, file.getPath(), 0, 0, hosts, bomCharsetName); inputSplits.add(fis); } } @@ -696,32 +700,32 @@ public LocatableInputSplitAssigner getInputSplitAssigner(FileInputSplit[] splits /** * Enumerate all files in the directory and recursive if enumerateNestedFiles is true. + * * @return the total length of accepted files. */ private long addFilesInDir(Path path, List<FileStatus> files, boolean logExcludedFiles) - throws IOException { + throws IOException { final FileSystem fs = path.getFileSystem(); long length = 0; - for(FileStatus dir: fs.listStatus(path)) { + for (FileStatus dir : fs.listStatus(path)) { if (dir.isDir()) { if (acceptFile(dir) && enumerateNestedFiles) { length += addFilesInDir(dir.getPath(), files, logExcludedFiles); } else { if (logExcludedFiles && LOG.isDebugEnabled()) { - LOG.debug("Directory "+dir.getPath().toString()+" did not pass the file-filter and is excluded."); + LOG.debug("Directory " + dir.getPath().toString() + " did not pass the file-filter and is excluded."); } } - } - else { - if(acceptFile(dir)) { + } else { + if (acceptFile(dir)) { files.add(dir); length += dir.getLen(); testForUnsplittable(dir); } else { if (logExcludedFiles && LOG.isDebugEnabled()) { - LOG.debug("Directory "+dir.getPath().toString()+" did not pass the file-filter and is excluded."); + LOG.debug("Directory " + dir.getPath().toString() + " did not pass the file-filter and is excluded."); } } } @@ -730,7 +734,7 @@ private long addFilesInDir(Path path, List<FileStatus> files, boolean logExclude } protected boolean testForUnsplittable(FileStatus pathFile) { - if(getInflaterInputStreamFactory(pathFile.getPath()) != null) { + if (getInflaterInputStreamFactory(pathFile.getPath()) != null) { unsplittable = true; return true; } @@ -751,7 +755,7 @@ protected boolean testForUnsplittable(FileStatus pathFile) { * A simple hook to filter files and directories from the input. * The method may be overridden. Hadoop's FileInputFormat has a similar mechanism and applies the * same filters by default. - * + * * @param fileStatus The file status to check. * @return true, if the given file or directory is accepted */ @@ -765,9 +769,9 @@ public boolean acceptFile(FileStatus fileStatus) { /** * Retrieves the index of the <tt>BlockLocation</tt> that contains the part of the file described by the given * offset. - * - * @param blocks The different blocks of the file. Must be ordered by their offset. - * @param offset The offset of the position in the file. + * + * @param blocks The different blocks of the file. Must be ordered by their offset. + * @param offset The offset of the position in the file. * @param startIndex The earliest index to look at. * @return The index of the block containing the given position. */ @@ -789,14 +793,14 @@ private int getBlockIndexForPosition(BlockLocation[] blocks, long offset, long h } throw new IllegalArgumentException("The given offset is not contained in the any block."); } - + // -------------------------------------------------------------------------------------------- /** * Opens an input stream to the file defined in the input format. * The stream is positioned at the beginning of the given split. * <p> - * The stream is actually opened in an asynchronous thread to make sure any interruptions to the thread + * The stream is actually opened in an asynchronous thread to make sure any interruptions to the thread * working on the input format do not reach the file system. */ @Override @@ -810,20 +814,19 @@ public void open(FileInputSplit fileSplit) throws IOException { LOG.debug("Opening input split " + fileSplit.getPath() + " [" + this.splitStart + "," + this.splitLength + "]"); } - + // open the split in an asynchronous thread final InputSplitOpenThread isot = new InputSplitOpenThread(fileSplit, this.openTimeout); isot.start(); - + try { this.stream = isot.waitForCompletion(); this.stream = decorateInputStream(this.stream, fileSplit); + } catch (Throwable t) { + throw new IOException("Error opening the Input Split " + fileSplit.getPath() + + " [" + splitStart + "," + splitLength + "]: " + t.getMessage(), t); } - catch (Throwable t) { - throw new IOException("Error opening the Input Split " + fileSplit.getPath() + - " [" + splitStart + "," + splitLength + "]: " + t.getMessage(), t); - } - + // get FSDataInputStream if (this.splitStart != 0) { this.stream.seek(this.splitStart); @@ -862,13 +865,12 @@ public void close() throws IOException { stream = null; } } - + /** * Override this method to supports multiple paths. * When this method will be removed, all FileInputFormats have to support multiple paths. * * @return True if the FileInputFormat supports multiple paths, false otherwise. - * * @deprecated Will be removed for Flink 2.0. */ @Deprecated @@ -876,21 +878,74 @@ public boolean supportsMultiPaths() { return false; } + @Override public String toString() { return getFilePaths() == null || getFilePaths().length == 0 ? "File Input (unknown file)" : - "File Input (" + Arrays.toString(this.getFilePaths()) + ')'; + "File Input (" + Arrays.toString(this.getFilePaths()) + ')'; + } + + /** + * Get file bom encoding + * + * @param fs + * @return + */ + public String getBomCharset(FileStatus fs) { + FSDataInputStream inStream = null; + String charset, testFileSystem = "TestFileSystem"; + byte[] bom = new byte[4]; + byte[] bytes = new byte[]{(byte) 0x00, (byte) 0xFE, (byte) 0xFF, (byte) 0xEF, (byte) 0xBB, (byte) 0xBF}; + try { + /* + * int read() Reads a data byte from this input stream. Int read(byte[] b) will be at most b.length from this input stream + * Bytes of data are read into a byte array. Int read(byte[] b, int off, int len) + * Reads up to len bytes of data from this input stream into a byte array. + */ + FileSystem fileSystem = fs.getPath().getFileSystem(); + if (testFileSystem.equals(fileSystem.getClass().getSimpleName())) { + fileSystem = new LocalFileSystem(); + } + + inStream = fileSystem.open(fs.getPath()); + inStream.read(bom, 0, bom.length); + + if ((bom[0] == bytes[0]) && (bom[1] == bytes[0]) && (bom[2] == bytes[1]) && (bom[3] == bytes[2])) { + charset = "UTF-32BE"; + } else if ((bom[0] == bytes[2]) && (bom[1] == bytes[1]) && (bom[2] == bytes[0]) && (bom[3] == bytes[0])) { + charset = "UTF-32LE"; + } else if ((bom[0] == bytes[3]) && (bom[1] == bytes[4]) && (bom[2] == bytes[5])) { + charset = "UTF-8"; + } else if ((bom[0] == bytes[1]) && (bom[1] == bytes[2])) { + charset = "UTF-16BE"; + } else if ((bom[0] == bytes[2]) && (bom[1] == bytes[1])) { + charset = "UTF-16LE"; + } else { + charset = null; + } + } catch (Exception e) { + throw new IllegalArgumentException("Failed to get file bom encoding."); + } finally { + if (null != inStream) { + try { + inStream.close(); + } catch (IOException e) { + e.printStackTrace(); + } + } + } + return charset; } // ============================================================================================ - + /** * Encapsulation of the basic statistics the optimizer obtains about a file. Contained are the size of the file * and the average bytes of a single record. The statistics also have a time-stamp that records the modification * time of the file and indicates as such for which time the statistics were valid. */ public static class FileBaseStatistics implements BaseStatistics { - + protected final long fileModTime; // timestamp of the last modification protected final long fileSize; // size of the file(s) in bytes @@ -899,13 +954,10 @@ public String toString() { /** * Creates a new statistics object. - * - * @param fileModTime - * The timestamp of the latest modification of any of the involved files. - * @param fileSize - * The size of the file, in bytes. <code>-1</code>, if unknown. - * @param avgBytesPerRecord - * The average number of byte in a record, or <code>-1.0f</code>, if unknown. + * + * @param fileModTime The timestamp of the latest modification of any of the involved files. + * @param fileSize The size of the file, in bytes. <code>-1</code>, if unknown. + * @param avgBytesPerRecord The average number of byte in a record, or <code>-1.0f</code>, if unknown. */ public FileBaseStatistics(long fileModTime, long fileSize, float avgBytesPerRecord) { this.fileModTime = fileModTime; @@ -915,7 +967,7 @@ public FileBaseStatistics(long fileModTime, long fileSize, float avgBytesPerReco /** * Gets the timestamp of the last modification. - * + * * @return The timestamp of the last modification. */ public long getLastModificationTime() { @@ -924,7 +976,7 @@ public long getLastModificationTime() { /** * Gets the file size. - * + * * @return The fileSize. * @see org.apache.flink.api.common.io.statistics.BaseStatistics#getTotalInputSize() */ @@ -936,19 +988,19 @@ public long getTotalInputSize() { /** * Gets the estimates number of records in the file, computed as the file size divided by the * average record width, rounded up. - * + * * @return The estimated number of records in the file. * @see org.apache.flink.api.common.io.statistics.BaseStatistics#getNumberOfRecords() */ @Override public long getNumberOfRecords() { - return (this.fileSize == SIZE_UNKNOWN || this.avgBytesPerRecord == AVG_RECORD_BYTES_UNKNOWN) ? + return (this.fileSize == SIZE_UNKNOWN || this.avgBytesPerRecord == AVG_RECORD_BYTES_UNKNOWN) ? NUM_RECORDS_UNKNOWN : (long) Math.ceil(this.fileSize / this.avgBytesPerRecord); } /** * Gets the estimated average number of bytes per record. - * + * * @return The average number of bytes per record. * @see org.apache.flink.api.common.io.statistics.BaseStatistics#getAverageRecordWidth() */ @@ -956,35 +1008,35 @@ public long getNumberOfRecords() { public float getAverageRecordWidth() { return this.avgBytesPerRecord; } - + @Override public String toString() { return "size=" + this.fileSize + ", recWidth=" + this.avgBytesPerRecord + ", modAt=" + this.fileModTime; } } - + // ============================================================================================ - + /** * Obtains a DataInputStream in an thread that is not interrupted. * This is a necessary hack around the problem that the HDFS client is very sensitive to InterruptedExceptions. */ public static class InputSplitOpenThread extends Thread { - + private final FileInputSplit split; - + private final long timeout; private volatile FSDataInputStream fdis; private volatile Throwable error; - + private volatile boolean aborted; public InputSplitOpenThread(FileInputSplit split, long timeout) { super("Transient InputSplit Opener"); setDaemon(true); - + this.split = split; this.timeout = timeout; } @@ -994,37 +1046,35 @@ public void run() { try { final FileSystem fs = FileSystem.get(this.split.getPath().toUri()); this.fdis = fs.open(this.split.getPath()); - + // check for canceling and close the stream in that case, because no one will obtain it if (this.aborted) { final FSDataInputStream f = this.fdis; this.fdis = null; f.close(); } - } - catch (Throwable t) { + } catch (Throwable t) { this.error = t; } } - + public FSDataInputStream waitForCompletion() throws Throwable { final long start = System.currentTimeMillis(); long remaining = this.timeout; - + do { try { // wait for the task completion this.join(remaining); - } - catch (InterruptedException iex) { + } catch (InterruptedException iex) { // we were canceled, so abort the procedure abortWait(); throw iex; } } while (this.error == null && this.fdis == null && - (remaining = this.timeout + start - System.currentTimeMillis()) > 0); - + (remaining = this.timeout + start - System.currentTimeMillis()) > 0); + if (this.error != null) { throw this.error; } @@ -1036,17 +1086,17 @@ public FSDataInputStream waitForCompletion() throws Throwable { // b) the flag was set such that the stream did not see it and we have a valid stream // In any case, close the stream and throw an exception. abortWait(); - + final boolean stillAlive = this.isAlive(); final StringBuilder bld = new StringBuilder(256); for (StackTraceElement e : this.getStackTrace()) { bld.append("\tat ").append(e.toString()).append('\n'); } - throw new IOException("Input opening request timed out. Opener was " + (stillAlive ? "" : "NOT ") + + throw new IOException("Input opening request timed out. Opener was " + (stillAlive ? "" : "NOT ") + " alive. Stack of split open thread:\n" + bld.toString()); } } - + /** * Double checked procedure setting the abort flag and closing the stream. */ @@ -1057,17 +1107,18 @@ private void abortWait() { if (inStream != null) { try { inStream.close(); - } catch (Throwable t) {} + } catch (Throwable t) { + } } } } - + // ============================================================================================ // Parameterization via configuration // ============================================================================================ - + // ------------------------------------- Config Keys ------------------------------------------ - + /** * The config parameter which defines the input file path. */ @@ -1077,4 +1128,5 @@ private void abortWait() { * The config parameter which defines whether input directories are recursively traversed. */ public static final String ENUMERATE_NESTED_FILES_FLAG = "recursive.file.enumeration"; + } diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FileInputSplit.java b/flink-core/src/main/java/org/apache/flink/core/fs/FileInputSplit.java index b53ac4b4924..9e28522ddc3 100644 --- a/flink-core/src/main/java/org/apache/flink/core/fs/FileInputSplit.java +++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileInputSplit.java @@ -39,6 +39,9 @@ /** The number of bytes in the file to process. */ private final long length; + /** The charset of bom in the file to process. */ + private String bomCharsetName; + // -------------------------------------------------------------------------------------------- /** @@ -63,6 +66,31 @@ public FileInputSplit(int num, Path file, long start, long length, String[] host this.length = length; } + /** + * Constructs a split with host information. + * + * @param num + * the number of this input split + * @param file + * the file name + * @param start + * the position of the first byte in the file to process + * @param length + * the number of bytes in the file to process (-1 is flag for "read whole file") + * @param hosts + * the list of hosts containing the block, possibly <code>null</code> + * @param bomCharsetName + * The charset of bom in the file to process (default is UTF-8) + */ + public FileInputSplit(int num, Path file, long start, long length, String[] hosts, String bomCharsetName) { + super(num, hosts); + + this.file = file; + this.start = start; + this.length = length; + this.bomCharsetName = bomCharsetName; + } + // -------------------------------------------------------------------------------------------- /** @@ -92,6 +120,15 @@ public long getLength() { return length; } + /** + * Returns the charset of bom in the file. + * + * @return the charset of bom in the file + */ + public String getBomCharsetName(){ + return bomCharsetName; + } + // -------------------------------------------------------------------------------------------- @Override diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java index 82793adc137..7541d48bbe0 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java @@ -59,7 +59,12 @@ public TextInputFormat(Path filePath) { // -------------------------------------------------------------------------------------------- public String getCharsetName() { - return charsetName; + String bomCharsetName = getBomCharsetName(); + if (bomCharsetName != null && !bomCharsetName.equals(charsetName)) { + return bomCharsetName; + } else { + return charsetName; + } } public void setCharsetName(String charsetName) { @@ -85,14 +90,26 @@ public void configure(Configuration parameters) { @Override public String readRecord(String reusable, byte[] bytes, int offset, int numBytes) throws IOException { + String utf8 = "UTF-8"; + String utf16 = "UTF-16"; + String utf32 = "UTF-32"; + int stepSize = 0; + String charsetName = this.getCharsetName(); + if (charsetName.contains(utf8)) { + stepSize = 1; + } else if (charsetName.contains(utf16)) { + stepSize = 2; + } else if (charsetName.contains(utf32)) { + stepSize = 4; + } //Check if \n is used as delimiter and the end of this line is a \r, then remove \r from the line if (this.getDelimiter() != null && this.getDelimiter().length == 1 - && this.getDelimiter()[0] == NEW_LINE && offset + numBytes >= 1 - && bytes[offset + numBytes - 1] == CARRIAGE_RETURN){ - numBytes -= 1; + && this.getDelimiter()[0] == NEW_LINE && offset + numBytes >= stepSize + && bytes[offset + numBytes - stepSize] == CARRIAGE_RETURN) { + numBytes -= stepSize; } - - return new String(bytes, offset, numBytes, this.charsetName); + numBytes = numBytes - stepSize + 1; + return new String(bytes, offset, numBytes, this.getCharsetName()); } // -------------------------------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/TextInputFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/io/TextInputFormatTest.java index e78232ac1e5..205dddb7639 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/io/TextInputFormatTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/io/TextInputFormatTest.java @@ -55,7 +55,7 @@ public void testSimpleRead() { tempFile.deleteOnExit(); tempFile.setWritable(true); - PrintStream ps = new PrintStream(tempFile); + PrintStream ps = new PrintStream(tempFile); ps.println(first); ps.println(second); ps.close(); @@ -83,8 +83,7 @@ public void testSimpleRead() { assertEquals(second, result); assertTrue(inputFormat.reachedEnd() || null == inputFormat.nextRecord(result)); - } - catch (Throwable t) { + } catch (Throwable t) { System.err.println("test failed with exception: " + t.getMessage()); t.printStackTrace(System.err); fail("Test erroneous"); @@ -93,11 +92,11 @@ public void testSimpleRead() { @Test public void testNestedFileRead() { - String[] dirs = new String[] {"tmp/first/", "tmp/second/"}; + String[] dirs = new String[]{"tmp/first/", "tmp/second/"}; List<String> expectedFiles = new ArrayList<>(); try { - for (String dir: dirs) { + for (String dir : dirs) { // create input file File tmpDir = new File(dir); if (!tmpDir.exists()) { @@ -127,7 +126,7 @@ public void testNestedFileRead() { FileInputSplit[] splits = inputFormat.createInputSplits(expectedFiles.size()); List<String> paths = new ArrayList<>(); - for (FileInputSplit split: splits) { + for (FileInputSplit split : splits) { paths.add(split.getPath().toString()); } @@ -188,7 +187,7 @@ private void testRemovingTrailingCR(String lineBreaker, String delimiter) { String result = ""; if ((delimiter.equals("\n") && (lineBreaker.equals("\n") || lineBreaker.equals("\r\n"))) - || (lineBreaker.equals(delimiter))){ + || (lineBreaker.equals(delimiter))) { result = inputFormat.nextRecord(""); assertNotNull("Expecting first record here", result); @@ -207,8 +206,110 @@ private void testRemovingTrailingCR(String lineBreaker, String delimiter) { assertEquals(content, result); } + } catch (Throwable t) { + System.err.println("test failed with exception: " + t.getMessage()); + t.printStackTrace(System.err); + fail("Test erroneous"); + } + } + + @Test + public void testUTF16Read() { + final String first = "First line"; + final String second = "Second line"; + + try { + // create input file + File tempFile = File.createTempFile("TextInputFormatTest", "tmp"); + tempFile.deleteOnExit(); + tempFile.setWritable(true); + + PrintStream ps = new PrintStream(tempFile, "UTF-16"); + ps.println(first); + ps.println(second); + ps.close(); + + TextInputFormat inputFormat = new TextInputFormat(new Path(tempFile.toURI().toString())); + inputFormat.setCharsetName("UTF-32"); + + Configuration parameters = new Configuration(); + inputFormat.configure(parameters); + +// inputFormat.setDelimiter("\r"); +// inputFormat.setDelimiter("i"); + + FileInputSplit[] splits = inputFormat.createInputSplits(1); + assertTrue("expected at least one input split", splits.length >= 1); + inputFormat.open(splits[0]); + + String result = ""; + + System.out.println("bomCharsetName:" + inputFormat.getBomCharsetName()); + + assertFalse(inputFormat.reachedEnd()); + result = inputFormat.nextRecord(""); + System.out.println(result); + assertNotNull("Expecting first record here", result); + assertEquals(first, result.substring(1)); + + assertFalse(inputFormat.reachedEnd()); + result = inputFormat.nextRecord(result); + System.out.println(result); + assertNotNull("Expecting second record here", result); + assertEquals(second, result); + + assertTrue(inputFormat.reachedEnd() || null == inputFormat.nextRecord(result)); + } catch (Throwable t) { + System.err.println("test failed with exception: " + t.getMessage()); + t.printStackTrace(System.err); + fail("Test erroneous"); } - catch (Throwable t) { + } + + @Test + public void testUTF32Read() { + final String first = "First line"; + final String second = "Second line"; + + try { + // create input file + File tempFile = File.createTempFile("TextInputFormatTest", "tmp"); + tempFile.deleteOnExit(); + tempFile.setWritable(true); + + PrintStream ps = new PrintStream(tempFile, "UTF-32"); + ps.println(first); + ps.println(second); + ps.close(); + + TextInputFormat inputFormat = new TextInputFormat(new Path(tempFile.toURI().toString())); + inputFormat.setCharsetName("UTF-32"); + + Configuration parameters = new Configuration(); + inputFormat.configure(parameters); + + FileInputSplit[] splits = inputFormat.createInputSplits(1); + assertTrue("expected at least one input split", splits.length >= 1); + inputFormat.open(splits[0]); + + String result = ""; + + System.out.println("bomCharsetName:" + inputFormat.getBomCharsetName()); + + assertFalse(inputFormat.reachedEnd()); + result = inputFormat.nextRecord(""); + System.out.println(result); + assertNotNull("Expecting first record here", result); + assertEquals(first, result); + + assertFalse(inputFormat.reachedEnd()); + result = inputFormat.nextRecord(result); + System.out.println(result); + assertNotNull("Expecting second record here", result); + assertEquals(second, result); + + assertTrue(inputFormat.reachedEnd() || null == inputFormat.nextRecord(result)); + } catch (Throwable t) { System.err.println("test failed with exception: " + t.getMessage()); t.printStackTrace(System.err); fail("Test erroneous"); ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > UTF-16 support for TextInputFormat > ---------------------------------- > > Key: FLINK-10134 > URL: https://issues.apache.org/jira/browse/FLINK-10134 > Project: Flink > Issue Type: Bug > Components: Core > Affects Versions: 1.4.2 > Reporter: David Dreyfus > Priority: Blocker > Labels: pull-request-available > > It does not appear that Flink supports a charset encoding of "UTF-16". It > particular, it doesn't appear that Flink consumes the Byte Order Mark (BOM) > to establish whether a UTF-16 file is UTF-16LE or UTF-16BE. > > TextInputFormat.setCharset("UTF-16") calls DelimitedInputFormat.setCharset(), > which sets TextInputFormat.charsetName and then modifies the previously set > delimiterString to construct the proper byte string encoding of the the > delimiter. This same charsetName is also used in TextInputFormat.readRecord() > to interpret the bytes read from the file. > > There are two problems that this implementation would seem to have when using > UTF-16. > # delimiterString.getBytes(getCharset()) in DelimitedInputFormat.java will > return a Big Endian byte sequence including the Byte Order Mark (BOM). The > actual text file will not contain a BOM at each line ending, so the delimiter > will never be read. Moreover, if the actual byte encoding of the file is > Little Endian, the bytes will be interpreted incorrectly. > # TextInputFormat.readRecord() will not see a BOM each time it decodes a > byte sequence with the String(bytes, offset, numBytes, charset) call. > Therefore, it will assume Big Endian, which may not always be correct. [1] > [https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java#L95] > > While there are likely many solutions, I would think that all of them would > have to start by reading the BOM from the file when a Split is opened and > then using that BOM to modify the specified encoding to a BOM specific one > when the caller doesn't specify one, and to overwrite the caller's > specification if the BOM is in conflict with the caller's specification. That > is, if the BOM indicates Little Endian and the caller indicates UTF-16BE, > Flink should rewrite the charsetName as UTF-16LE. > I hope this makes sense and that I haven't been testing incorrectly or > misreading the code. > > I've verified the problem on version 1.4.2. I believe the problem exists on > all versions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)