This is an automated email from the ASF dual-hosted git repository.
vinoth pushed a commit to branch rfc-15
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/rfc-15 by this push:
new 66ce7f1 [HUDI-1312] [RFC-15] Support for metadata listing for
snapshot queries through Hive/SparkSQL (#2366)
66ce7f1 is described below
commit 66ce7f1b14cdb0a073de5edab9dbafb3aa457cb3
Author: rmpifer <[email protected]>
AuthorDate: Tue Dec 29 13:09:55 2020 -0800
[HUDI-1312] [RFC-15] Support for metadata listing for snapshot queries
through Hive/SparkSQL (#2366)
Co-authored-by: Ryan Pifer <[email protected]>
---
.../apache/hudi/hadoop/HoodieHFileInputFormat.java | 12 ++---
.../hudi/hadoop/HoodieParquetInputFormat.java | 13 ++----
.../hudi/hadoop/utils/HoodieInputFormatUtils.java | 53 ++++++++++++++++------
.../utils/HoodieRealtimeInputFormatUtils.java | 20 +++++++-
4 files changed, 68 insertions(+), 30 deletions(-)
diff --git
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHFileInputFormat.java
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHFileInputFormat.java
index 1747888..048402a 100644
---
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHFileInputFormat.java
+++
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHFileInputFormat.java
@@ -31,7 +31,6 @@ import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapreduce.Job;
-import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
@@ -97,13 +96,10 @@ public class HoodieHFileInputFormat extends
FileInputFormat<NullWritable, ArrayW
// process snapshot queries next.
List<Path> snapshotPaths = inputPathHandler.getSnapshotPaths();
if (snapshotPaths.size() > 0) {
- setInputPaths(job, snapshotPaths.toArray(new
Path[snapshotPaths.size()]));
- FileStatus[] fileStatuses = super.listStatus(job);
- Map<HoodieTableMetaClient, List<FileStatus>> groupedFileStatus =
- HoodieInputFormatUtils.groupFileStatusForSnapshotPaths(fileStatuses,
HoodieFileFormat.HFILE.getFileExtension(),
- tableMetaClientMap.values());
- LOG.info("Found a total of " + groupedFileStatus.size() + " groups");
- for (Map.Entry<HoodieTableMetaClient, List<FileStatus>> entry :
groupedFileStatus.entrySet()) {
+ Map<HoodieTableMetaClient, List<Path>> groupedPaths =
+
HoodieInputFormatUtils.groupSnapshotPathsByMetaClient(tableMetaClientMap.values(),
snapshotPaths);
+ LOG.info("Found a total of " + groupedPaths.size() + " groups");
+ for (Map.Entry<HoodieTableMetaClient, List<Path>> entry :
groupedPaths.entrySet()) {
List<FileStatus> result =
HoodieInputFormatUtils.filterFileStatusForSnapshotMode(job, entry.getKey(),
entry.getValue());
if (result != null) {
returns.addAll(result);
diff --git
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java
index 8b89949..d51aff0 100644
---
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java
+++
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java
@@ -18,7 +18,6 @@
package org.apache.hudi.hadoop;
-import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline;
@@ -109,13 +108,11 @@ public class HoodieParquetInputFormat extends
MapredParquetInputFormat implement
// process snapshot queries next.
List<Path> snapshotPaths = inputPathHandler.getSnapshotPaths();
if (snapshotPaths.size() > 0) {
- setInputPaths(job, snapshotPaths.toArray(new
Path[snapshotPaths.size()]));
- FileStatus[] fileStatuses = super.listStatus(job);
- Map<HoodieTableMetaClient, List<FileStatus>> groupedFileStatus =
- HoodieInputFormatUtils.groupFileStatusForSnapshotPaths(fileStatuses,
- HoodieFileFormat.PARQUET.getFileExtension(),
tableMetaClientMap.values());
- LOG.info("Found a total of " + groupedFileStatus.size() + " groups");
- for (Map.Entry<HoodieTableMetaClient, List<FileStatus>> entry :
groupedFileStatus.entrySet()) {
+ Map<HoodieTableMetaClient, List<Path>> groupedPaths =
+
HoodieInputFormatUtils.groupSnapshotPathsByMetaClient(tableMetaClientMap.values(),
snapshotPaths);
+ LOG.info("Found a total of " + groupedPaths.size() + " groups");
+ for (Map.Entry<HoodieTableMetaClient, List<Path>> entry :
groupedPaths.entrySet()) {
+ HoodieTableMetaClient metaClient = entry.getKey();
List<FileStatus> result =
HoodieInputFormatUtils.filterFileStatusForSnapshotMode(job, entry.getKey(),
entry.getValue());
if (result != null) {
returns.addAll(result);
diff --git
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java
index b368851..c10fdad 100644
---
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java
+++
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java
@@ -27,6 +27,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.view.FileSystemViewManager;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.table.view.TableFileSystemView;
import org.apache.hudi.common.util.Option;
@@ -62,6 +63,11 @@ import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
+import static
org.apache.hudi.common.config.HoodieMetadataConfig.METADATA_ENABLE_PROP;
+import static
org.apache.hudi.common.config.HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS;
+import static
org.apache.hudi.common.config.HoodieMetadataConfig.METADATA_VALIDATE_PROP;
+import static
org.apache.hudi.common.config.HoodieMetadataConfig.DEFAULT_METADATA_VALIDATE;
+
public class HoodieInputFormatUtils {
// These positions have to be deterministic across all tables
@@ -391,27 +397,48 @@ public class HoodieInputFormatUtils {
return grouped;
}
+ public static Map<HoodieTableMetaClient, List<Path>>
groupSnapshotPathsByMetaClient(
+ Collection<HoodieTableMetaClient> metaClientList,
+ List<Path> snapshotPaths
+ ) {
+ Map<HoodieTableMetaClient, List<Path>> grouped = new HashMap<>();
+ metaClientList.forEach(metaClient -> grouped.put(metaClient, new
ArrayList<>()));
+ for (Path path : snapshotPaths) {
+ // Find meta client associated with the input path
+ metaClientList.stream().filter(metaClient ->
path.toString().contains(metaClient.getBasePath()))
+ .forEach(metaClient -> grouped.get(metaClient).add(path));
+ }
+ return grouped;
+ }
+
/**
- * Filters data files for a snapshot queried table.
+ * Filters data files under @param paths for a snapshot queried table.
* @param job
- * @param metadata
- * @param fileStatuses
+ * @param metaClient
+ * @param paths
* @return
*/
public static List<FileStatus> filterFileStatusForSnapshotMode(
- JobConf job, HoodieTableMetaClient metadata, List<FileStatus>
fileStatuses) throws IOException {
- FileStatus[] statuses = fileStatuses.toArray(new FileStatus[0]);
+ JobConf job, HoodieTableMetaClient metaClient, List<Path> paths)
throws IOException {
if (LOG.isDebugEnabled()) {
- LOG.debug("Hoodie Metadata initialized with completed commit Ts as :" +
metadata);
+ LOG.debug("Hoodie Metadata initialized with completed commit Ts as :" +
metaClient);
}
- // Get all commits, delta commits, compactions, as all of them produce a
base parquet file today
- HoodieTimeline timeline =
metadata.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
- TableFileSystemView.BaseFileOnlyView roView = new
HoodieTableFileSystemView(metadata, timeline, statuses);
- // filter files on the latest commit found
- List<HoodieBaseFile> filteredFiles =
roView.getLatestBaseFiles().collect(Collectors.toList());
- LOG.info("Total paths to process after hoodie filter " +
filteredFiles.size());
+
+ boolean useFileListingFromMetadata = job.getBoolean(METADATA_ENABLE_PROP,
DEFAULT_METADATA_ENABLE_FOR_READERS);
+ boolean verifyFileListing = job.getBoolean(METADATA_VALIDATE_PROP,
DEFAULT_METADATA_VALIDATE);
+ HoodieTableFileSystemView fsView =
FileSystemViewManager.createInMemoryFileSystemView(metaClient,
+ useFileListingFromMetadata, verifyFileListing);
+
+ List<HoodieBaseFile> filteredBaseFiles = new ArrayList<>();
+ for (Path p : paths) {
+ String relativePartitionPath = FSUtils.getRelativePartitionPath(new
Path(metaClient.getBasePath()), p);
+ List<HoodieBaseFile> matched =
fsView.getLatestBaseFiles(relativePartitionPath).collect(Collectors.toList());
+ filteredBaseFiles.addAll(matched);
+ }
+
+ LOG.info("Total paths to process after hoodie filter " +
filteredBaseFiles.size());
List<FileStatus> returns = new ArrayList<>();
- for (HoodieBaseFile filteredFile : filteredFiles) {
+ for (HoodieBaseFile filteredFile : filteredBaseFiles) {
if (LOG.isDebugEnabled()) {
LOG.debug("Processing latest hoodie file - " + filteredFile.getPath());
}
diff --git
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java
index 760dd96..f14bc40 100644
---
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java
+++
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java
@@ -26,6 +26,7 @@ import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.view.FileSystemViewManager;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.Option;
@@ -53,6 +54,11 @@ import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import static
org.apache.hudi.common.config.HoodieMetadataConfig.METADATA_ENABLE_PROP;
+import static
org.apache.hudi.common.config.HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS;
+import static
org.apache.hudi.common.config.HoodieMetadataConfig.METADATA_VALIDATE_PROP;
+import static
org.apache.hudi.common.config.HoodieMetadataConfig.DEFAULT_METADATA_VALIDATE;
+
public class HoodieRealtimeInputFormatUtils extends HoodieInputFormatUtils {
private static final Logger LOG =
LogManager.getLogger(HoodieRealtimeInputFormatUtils.class);
@@ -63,13 +69,25 @@ public class HoodieRealtimeInputFormatUtils extends
HoodieInputFormatUtils {
// TODO(vc): Should we handle also non-hoodie splits here?
Map<Path, HoodieTableMetaClient> partitionsToMetaClient =
getTableMetaClientByBasePath(conf, partitionsToParquetSplits.keySet());
+ boolean useFileListingFromMetadata = conf.getBoolean(METADATA_ENABLE_PROP,
DEFAULT_METADATA_ENABLE_FOR_READERS);
+ boolean verifyFileListing = conf.getBoolean(METADATA_VALIDATE_PROP,
DEFAULT_METADATA_VALIDATE);
+ // Create file system cache so metadata table is only instantiated once.
Also can benefit normal file listing if
+ // partition path is listed twice so file groups will already be loaded in
file system
+ Map<HoodieTableMetaClient, HoodieTableFileSystemView> fsCache = new
HashMap<>();
// for all unique split parents, obtain all delta files based on delta
commit timeline,
// grouped on file id
List<InputSplit> rtSplits = new ArrayList<>();
partitionsToParquetSplits.keySet().forEach(partitionPath -> {
// for each partition path obtain the data & log file groupings, then
map back to inputsplits
HoodieTableMetaClient metaClient =
partitionsToMetaClient.get(partitionPath);
- HoodieTableFileSystemView fsView = new
HoodieTableFileSystemView(metaClient, metaClient.getActiveTimeline());
+ if (!fsCache.containsKey(metaClient)) {
+
+ HoodieTableFileSystemView fsView =
FileSystemViewManager.createInMemoryFileSystemView(metaClient,
+ useFileListingFromMetadata, verifyFileListing);
+ fsCache.put(metaClient, fsView);
+ }
+ HoodieTableFileSystemView fsView = fsCache.get(metaClient);
+
String relPartitionPath = FSUtils.getRelativePartitionPath(new
Path(metaClient.getBasePath()), partitionPath);
try {