Victor Wong created FLINK-12648:
-----------------------------------
Summary: Load Hadoop file system via FileSystem.get()
Key: FLINK-12648
URL: https://issues.apache.org/jira/browse/FLINK-12648
Project: Flink
Issue Type: Improvement
Components: Connectors / FileSystem
Reporter: Victor Wong
Assignee: Victor Wong
I think there are some duplicated codes in
_org.apache.flink.runtime.fs.hdfs.HadoopFsFactory#create_ with codes in apache
hadoop-common dependency.
We can use _org.apache.hadoop.fs.FileSystem#get(java.net.URI,
org.apache.hadoop.conf.Configuration)_ to remove the duplicated codes.
Replace
{code:java}
// -- (2) get the Hadoop file system class for that scheme
final Class<? extends org.apache.hadoop.fs.FileSystem> fsClass;
try {
fsClass = org.apache.hadoop.fs.FileSystem.getFileSystemClass(scheme,
hadoopConfig);
}
catch (IOException e) {
throw new UnsupportedFileSystemSchemeException(
"Hadoop File System abstraction does not support scheme '" + scheme +
"'. " +
"Either no file system implementation exists for that scheme, " +
"or the relevant classes are missing from the classpath.", e);
}
// -- (3) instantiate the Hadoop file system
LOG.debug("Instantiating for file system scheme {} Hadoop File System {}",
scheme, fsClass.getName());
final org.apache.hadoop.fs.FileSystem hadoopFs = fsClass.newInstance();
// -- (4) create the proper URI to initialize the file system
final URI initUri;
if (fsUri.getAuthority() != null) {
initUri = fsUri;
}
else {
LOG.debug("URI {} does not specify file system authority, trying to load
default authority (fs.defaultFS)");
String configEntry = hadoopConfig.get("fs.defaultFS", null);
if (configEntry == null) {
// fs.default.name deprecated as of hadoop 2.2.0 - see
//
http://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/DeprecatedProperties.html
configEntry = hadoopConfig.get("fs.default.name", null);
}
if (LOG.isDebugEnabled()) {
LOG.debug("Hadoop's 'fs.defaultFS' is set to {}", configEntry);
}
if (configEntry == null) {
throw new IOException(getMissingAuthorityErrorPrefix(fsUri) +
"Hadoop configuration did not contain an entry for the default file
system ('fs.defaultFS').");
}
else {
try {
initUri = URI.create(configEntry);
}
catch (IllegalArgumentException e) {
throw new IOException(getMissingAuthorityErrorPrefix(fsUri) +
"The configuration contains an invalid file system default name
" +
"('fs.default.name' or 'fs.defaultFS'): " + configEntry);
}
if (initUri.getAuthority() == null) {
throw new IOException(getMissingAuthorityErrorPrefix(fsUri) +
"Hadoop configuration for default file system ('fs.default.name'
or 'fs.defaultFS') " +
"contains no valid authority component (like hdfs namenode, S3
host, etc)");
}
}
}
// -- (5) configure the Hadoop file system
try {
hadoopFs.initialize(initUri, hadoopConfig);
}
catch (UnknownHostException e) {
String message = "The Hadoop file system's authority (" +
initUri.getAuthority() +
"), specified by either the file URI or the configuration, cannot be
resolved.";
throw new IOException(message, e);
}
{code}
with
{code:java}
final org.apache.hadoop.fs.FileSystem hadoopFs =
org.apache.hadoop.fs.FileSystem.get(fsUri, hadoopConfig);
{code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)