vinothchandar commented on a change in pull request #1768:
URL: https://github.com/apache/hudi/pull/1768#discussion_r446621351



##########
File path: hudi-common/pom.xml
##########
@@ -147,6 +147,16 @@
       <scope>test</scope>
     </dependency>
 
+    <!-- Spark -->
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-core_${scala.binary.version}</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-sql_${scala.binary.version}</artifactId>
+    </dependency>
+

Review comment:
       yes.. we cannot depend on spark in `hudi-common` 

##########
File path: hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java
##########
@@ -386,13 +389,26 @@ public void finalizeWrite(JavaSparkContext jsc, String 
instantTs, List<HoodieWri
    *
    * @param instantTs Instant Time
    */
-  public void deleteMarkerDir(String instantTs) {
+  public void deleteMarkerDir(JavaSparkContext jsc, String instantTs) {
     try {
       FileSystem fs = getMetaClient().getFs();
       Path markerDir = new Path(metaClient.getMarkerFolderPath(instantTs));
+
       if (fs.exists(markerDir)) {
         // For append only case, we do not write to marker dir. Hence, the 
above check
-        LOG.info("Removing marker directory=" + markerDir);
+        LOG.info("Removing marker directory = " + markerDir);
+
+        FileStatus[] fileStatuses = fs.listStatus(markerDir);

Review comment:
       cc @n3nash should we have flag to protect this for HDFS.. i.e if the 
recursive delete works better there (IIUC). you might want to tradeoff less 
RPCs ..? 
   we can override defaults at spark datasource level, and set these based on 
`StorageSchemes` as well. 

##########
File path: hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java
##########
@@ -386,13 +389,26 @@ public void finalizeWrite(JavaSparkContext jsc, String 
instantTs, List<HoodieWri
    *
    * @param instantTs Instant Time
    */
-  public void deleteMarkerDir(String instantTs) {
+  public void deleteMarkerDir(JavaSparkContext jsc, String instantTs) {
     try {
       FileSystem fs = getMetaClient().getFs();
       Path markerDir = new Path(metaClient.getMarkerFolderPath(instantTs));
+
       if (fs.exists(markerDir)) {
         // For append only case, we do not write to marker dir. Hence, the 
above check
-        LOG.info("Removing marker directory=" + markerDir);
+        LOG.info("Removing marker directory = " + markerDir);
+
+        FileStatus[] fileStatuses = fs.listStatus(markerDir);

Review comment:
       @umehrot2 so seems like, for object stores this is different..  and 
makes sense completely to do parallel cleaning of individual files.

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
##########
@@ -199,16 +201,40 @@ public static String getRelativePartitionPath(Path 
basePath, Path partitionPath)
     return partitions;
   }
 
-  public static List<String> getAllDataFilesForMarkers(FileSystem fs, String 
basePath, String instantTs,
-      String markerDir, String baseFileExtension) throws IOException {
-    List<String> dataFiles = new LinkedList<>();
-    processFiles(fs, markerDir, (status) -> {
-      String pathStr = status.getPath().toString();
-      if (pathStr.endsWith(HoodieTableMetaClient.MARKER_EXTN)) {
-        dataFiles.add(FSUtils.translateMarkerToDataPath(basePath, pathStr, 
instantTs, baseFileExtension));
+  public static Set<String> getAllDataFilesForMarkers(JavaSparkContext jsc, 
FileSystem fs, String basePath,

Review comment:
       i think this is the reason for needing spark in `hudi-common`.. we can 
move refactor the code to `hudi-client`.. 
   
   In fact, #1755  has already modularized this more.. 

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
##########
@@ -199,16 +201,40 @@ public static String getRelativePartitionPath(Path 
basePath, Path partitionPath)
     return partitions;
   }
 
-  public static List<String> getAllDataFilesForMarkers(FileSystem fs, String 
basePath, String instantTs,
-      String markerDir, String baseFileExtension) throws IOException {
-    List<String> dataFiles = new LinkedList<>();
-    processFiles(fs, markerDir, (status) -> {
-      String pathStr = status.getPath().toString();
-      if (pathStr.endsWith(HoodieTableMetaClient.MARKER_EXTN)) {
-        dataFiles.add(FSUtils.translateMarkerToDataPath(basePath, pathStr, 
instantTs, baseFileExtension));
+  public static Set<String> getAllDataFilesForMarkers(JavaSparkContext jsc, 
FileSystem fs, String basePath,
+      String instantTs, String markerDir, String baseFileExtension, int 
parallelism) throws IOException {
+    FileStatus[] topLevelStatuses = fs.listStatus(new Path(markerDir));
+
+    Set<String> dataFiles = new HashSet<>();
+
+    List<String> subDirectories = new ArrayList<>();
+    for (FileStatus topLevelStatus: topLevelStatuses) {
+      if (topLevelStatus.isFile()) {
+        String pathStr = topLevelStatus.getPath().toString();
+        if (pathStr.endsWith(HoodieTableMetaClient.MARKER_EXTN)) {
+          dataFiles.add(FSUtils.translateMarkerToDataPath(basePath, pathStr, 
instantTs, baseFileExtension));
+        }
+      } else {
+        subDirectories.add(topLevelStatus.getPath().toString());
       }
-      return true;
-    }, false);
+    }
+
+    parallelism = subDirectories.size() < parallelism ? subDirectories.size() 
: parallelism;

Review comment:
       Math.min(subDirectories.size(), parallelism)?

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
##########
@@ -199,16 +201,40 @@ public static String getRelativePartitionPath(Path 
basePath, Path partitionPath)
     return partitions;
   }
 
-  public static List<String> getAllDataFilesForMarkers(FileSystem fs, String 
basePath, String instantTs,
-      String markerDir, String baseFileExtension) throws IOException {
-    List<String> dataFiles = new LinkedList<>();
-    processFiles(fs, markerDir, (status) -> {
-      String pathStr = status.getPath().toString();
-      if (pathStr.endsWith(HoodieTableMetaClient.MARKER_EXTN)) {
-        dataFiles.add(FSUtils.translateMarkerToDataPath(basePath, pathStr, 
instantTs, baseFileExtension));
+  public static Set<String> getAllDataFilesForMarkers(JavaSparkContext jsc, 
FileSystem fs, String basePath,
+      String instantTs, String markerDir, String baseFileExtension, int 
parallelism) throws IOException {
+    FileStatus[] topLevelStatuses = fs.listStatus(new Path(markerDir));
+
+    Set<String> dataFiles = new HashSet<>();
+
+    List<String> subDirectories = new ArrayList<>();
+    for (FileStatus topLevelStatus: topLevelStatuses) {
+      if (topLevelStatus.isFile()) {
+        String pathStr = topLevelStatus.getPath().toString();
+        if (pathStr.endsWith(HoodieTableMetaClient.MARKER_EXTN)) {
+          dataFiles.add(FSUtils.translateMarkerToDataPath(basePath, pathStr, 
instantTs, baseFileExtension));
+        }
+      } else {
+        subDirectories.add(topLevelStatus.getPath().toString());
       }
-      return true;
-    }, false);
+    }
+
+    parallelism = subDirectories.size() < parallelism ? subDirectories.size() 
: parallelism;
+    dataFiles.addAll(jsc.parallelize(subDirectories, 
parallelism).flatMap(directory -> {

Review comment:
       similar question here.. cc @n3nash .. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to