mccheah closed pull request #4: Use the same shared FileSystem instance across calls in HadoopTableOperations URL: https://github.com/apache/incubator-iceberg/pull/4
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/core/src/main/java/com/netflix/iceberg/hadoop/HadoopInputFile.java b/core/src/main/java/com/netflix/iceberg/hadoop/HadoopInputFile.java index ab196a6..2456a5a 100644 --- a/core/src/main/java/com/netflix/iceberg/hadoop/HadoopInputFile.java +++ b/core/src/main/java/com/netflix/iceberg/hadoop/HadoopInputFile.java @@ -76,6 +76,10 @@ public static HadoopInputFile fromStatus(FileStatus stat, Configuration conf) { } } + static HadoopInputFile fromFsPath(FileSystem fs, Path path, Configuration conf) { + return new HadoopInputFile(fs, path, conf); + } + private HadoopInputFile(FileSystem fs, Path path, Configuration conf) { this.fs = fs; this.path = path; diff --git a/core/src/main/java/com/netflix/iceberg/hadoop/HadoopOutputFile.java b/core/src/main/java/com/netflix/iceberg/hadoop/HadoopOutputFile.java index e01b7e9..163971f 100644 --- a/core/src/main/java/com/netflix/iceberg/hadoop/HadoopOutputFile.java +++ b/core/src/main/java/com/netflix/iceberg/hadoop/HadoopOutputFile.java @@ -31,20 +31,25 @@ */ public class HadoopOutputFile implements OutputFile { public static OutputFile fromPath(Path path, Configuration conf) { - return new HadoopOutputFile(path, conf); + return new HadoopOutputFile(Util.getFS(path, conf), path, conf); + } + + static OutputFile fromFsPath(FileSystem fs, Path path, Configuration conf) { + return new HadoopOutputFile(fs, path, conf); } private final Path path; private final Configuration conf; + private final FileSystem fs; - private HadoopOutputFile(Path path, Configuration conf) { + private HadoopOutputFile(FileSystem fs, Path path, Configuration conf) { this.path = path; this.conf = conf; + this.fs = fs; } @Override public PositionOutputStream create() { - FileSystem fs = Util.getFS(path, conf); try { return HadoopStreams.wrap(fs.create(path, false /* createOrOverwrite */)); } catch (FileAlreadyExistsException e) { diff --git a/core/src/main/java/com/netflix/iceberg/hadoop/HadoopTableOperations.java b/core/src/main/java/com/netflix/iceberg/hadoop/HadoopTableOperations.java index 0bd6477..7a6318a 100644 --- a/core/src/main/java/com/netflix/iceberg/hadoop/HadoopTableOperations.java +++ b/core/src/main/java/com/netflix/iceberg/hadoop/HadoopTableOperations.java @@ -47,6 +47,7 @@ private final Configuration conf; private final Path location; + private final FileSystem metadataFs; private TableMetadata currentMetadata = null; private Integer version = null; private boolean shouldRefresh = true; @@ -54,6 +55,7 @@ HadoopTableOperations(Path location, Configuration conf) { this.conf = conf; this.location = location; + this.metadataFs = Util.getFS(location, conf); } public TableMetadata current() { @@ -67,10 +69,9 @@ public TableMetadata current() { public TableMetadata refresh() { int ver = version != null ? version : readVersionHint(); Path metadataFile = metadataFile(ver); - FileSystem fs = Util.getFS(metadataFile, conf); try { // don't check if the file exists if version is non-null because it was already checked - if (version == null && !fs.exists(metadataFile)) { + if (version == null && !metadataFs.exists(metadataFile)) { if (ver == 0) { // no v0 metadata means the table doesn't exist yet return null; @@ -78,7 +79,7 @@ public TableMetadata refresh() { throw new ValidationException("Metadata file is missing: %s", metadataFile); } - while (fs.exists(metadataFile(ver + 1))) { + while (metadataFs.exists(metadataFile(ver + 1))) { ver += 1; metadataFile = metadataFile(ver); } @@ -88,7 +89,7 @@ public TableMetadata refresh() { } this.version = ver; this.currentMetadata = TableMetadataParser.read(this, - HadoopInputFile.fromPath(metadataFile, conf)); + HadoopInputFile.fromFsPath(metadataFs, metadataFile, conf)); this.shouldRefresh = false; return currentMetadata; } @@ -105,14 +106,13 @@ public void commit(TableMetadata base, TableMetadata metadata) { } Path tempMetadataFile = metadataPath(UUID.randomUUID().toString() + getFileExtension(conf)); - TableMetadataParser.write(metadata, HadoopOutputFile.fromPath(tempMetadataFile, conf)); + TableMetadataParser.write(metadata, HadoopOutputFile.fromFsPath(metadataFs, tempMetadataFile, conf)); int nextVersion = (version != null ? version : 0) + 1; Path finalMetadataFile = metadataFile(nextVersion); - FileSystem fs = Util.getFS(tempMetadataFile, conf); try { - if (fs.exists(finalMetadataFile)) { + if (metadataFs.exists(finalMetadataFile)) { throw new CommitFailedException( "Version %d already exists: %s", nextVersion, finalMetadataFile); } @@ -123,7 +123,7 @@ public void commit(TableMetadata base, TableMetadata metadata) { try { // this rename operation is the atomic commit operation - if (!fs.rename(tempMetadataFile, finalMetadataFile)) { + if (!metadataFs.rename(tempMetadataFile, finalMetadataFile)) { throw new CommitFailedException( "Failed to commit changes using rename: %s", finalMetadataFile); } @@ -140,20 +140,19 @@ public void commit(TableMetadata base, TableMetadata metadata) { @Override public InputFile newInputFile(String path) { - return HadoopInputFile.fromPath(new Path(path), conf); + return HadoopInputFile.fromFsPath(metadataFs, new Path(path), conf); } @Override public OutputFile newMetadataFile(String filename) { - return HadoopOutputFile.fromPath(metadataPath(filename), conf); + return HadoopOutputFile.fromFsPath(metadataFs, metadataPath(filename), conf); } @Override public void deleteFile(String path) { Path toDelete = new Path(path); - FileSystem fs = Util.getFS(toDelete, conf); try { - fs.delete(toDelete, false /* not recursive */ ); + metadataFs.delete(toDelete, false /* not recursive */ ); } catch (IOException e) { throw new RuntimeIOException(e, "Failed to delete file: %s", path); } @@ -178,9 +177,7 @@ private Path versionHintFile() { private void writeVersionHint(int version) { Path versionHintFile = versionHintFile(); - FileSystem fs = Util.getFS(versionHintFile, conf); - - try (FSDataOutputStream out = fs.create(versionHintFile, true /* overwrite */ )) { + try (FSDataOutputStream out = metadataFs.create(versionHintFile, true /* overwrite */ )) { out.write(String.valueOf(version).getBytes("UTF-8")); } catch (IOException e) { ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services