vinothchandar commented on a change in pull request #2263:
URL: https://github.com/apache/hudi/pull/2263#discussion_r546114603



##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.config;
+
+import org.apache.hudi.common.config.DefaultHoodieConfig;
+
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.Properties;
+
+/**
+ * Clustering specific configs.
+ */
+public class HoodieClusteringConfig extends DefaultHoodieConfig {
+
+  // Config to provide a strategy class to create ClusteringPlan. Class has to 
be subclass of ClusteringPlanStrategy
+  public static final String CLUSTERING_PLAN_STRATEGY_CLASS = 
"hoodie.clustering.plan.strategy.class";
+  public static final String DEFAULT_CLUSTERING_PLAN_STRATEGY_CLASS =
+      
"org.apache.hudi.client.clustering.plan.strategy.SparkBoundedDayBasedClusteringPlanStrategy";
+
+  // Config to provide a strategy class to execute a ClusteringPlan. Class has 
to be subclass of RunClusteringStrategy
+  public static final String RUN_CLUSTERING_STRATEGY_CLASS = 
"hoodie.clustering.run.strategy.class";

Review comment:
       may be call it `execution`.  
`hoodie.clustering.execution.strategy.class` ? 

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.config;
+
+import org.apache.hudi.common.config.DefaultHoodieConfig;
+
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.Properties;
+
+/**
+ * Clustering specific configs.
+ */
+public class HoodieClusteringConfig extends DefaultHoodieConfig {
+
+  // Config to provide a strategy class to create ClusteringPlan. Class has to 
be subclass of ClusteringPlanStrategy
+  public static final String CLUSTERING_PLAN_STRATEGY_CLASS = 
"hoodie.clustering.plan.strategy.class";
+  public static final String DEFAULT_CLUSTERING_PLAN_STRATEGY_CLASS =
+      
"org.apache.hudi.client.clustering.plan.strategy.SparkBoundedDayBasedClusteringPlanStrategy";
+
+  // Config to provide a strategy class to execute a ClusteringPlan. Class has 
to be subclass of RunClusteringStrategy
+  public static final String RUN_CLUSTERING_STRATEGY_CLASS = 
"hoodie.clustering.run.strategy.class";
+  public static final String DEFAULT_RUN_CLUSTERING_STRATEGY_CLASS =
+      
"org.apache.hudi.client.clustering.run.strategy.SparkBulkInsertBasedRunClusteringStrategy";
+
+  // Turn on inline clustering - clustering will be run after write operation 
is complete.
+  public static final String INLINE_CLUSTERING_PROP = 
"hoodie.clustering.inline";
+  private static final String DEFAULT_INLINE_CLUSTERING = "false";
+
+  // Config to control frequency of clustering
+  public static final String INLINE_CLUSTERING_NUM_COMMIT_PROP = 
"hoodie.clustering.inline.num.commits";

Review comment:
       max.commits? 

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/ClusteringPlanStrategy.java
##########
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.cluster.strategy;
+
+import org.apache.hudi.avro.model.HoodieClusteringPlan;
+import org.apache.hudi.avro.model.HoodieSliceInfo;
+import org.apache.hudi.client.common.HoodieEngineContext;
+import org.apache.hudi.client.utils.FileSliceMetricUtils;
+import org.apache.hudi.common.model.BaseFile;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieFileGroupId;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.view.SyncableFileSystemView;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * Pluggable implementation for scheduling clustering and creating 
ClusteringPlan.
+ */
+public abstract class ClusteringPlanStrategy<T extends 
HoodieRecordPayload,I,K,O> implements Serializable {
+  private static final Logger LOG = 
LogManager.getLogger(ClusteringPlanStrategy.class);
+
+  public static final int CLUSTERING_PLAN_VERSION_1 = 1;
+
+  private final HoodieTable<T,I,K,O> hoodieTable;
+  private final transient HoodieEngineContext engineContext;
+  private final HoodieWriteConfig writeConfig;
+
+  public ClusteringPlanStrategy(HoodieTable table, HoodieEngineContext 
engineContext, HoodieWriteConfig writeConfig) {
+    this.writeConfig = writeConfig;
+    this.hoodieTable = table;
+    this.engineContext = engineContext;
+  }
+
+  /**
+   * Generate metadata for grouping eligible files and create a plan. Note 
that data is not moved around
+   * as part of this step.
+   *
+   * If there is no data available to cluster, return None.
+   */
+  public abstract Option<HoodieClusteringPlan> generateClusteringPlan();
+
+  /**
+   * Return file slices eligible for clustering. FileIds in
+   * 1) pending clustering/compaction
+   * 2) Larger than clustering target file size

Review comment:
       remove this?

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.config;
+
+import org.apache.hudi.common.config.DefaultHoodieConfig;
+
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.Properties;
+
+/**
+ * Clustering specific configs.
+ */
+public class HoodieClusteringConfig extends DefaultHoodieConfig {
+
+  // Config to provide a strategy class to create ClusteringPlan. Class has to 
be subclass of ClusteringPlanStrategy
+  public static final String CLUSTERING_PLAN_STRATEGY_CLASS = 
"hoodie.clustering.plan.strategy.class";
+  public static final String DEFAULT_CLUSTERING_PLAN_STRATEGY_CLASS =
+      
"org.apache.hudi.client.clustering.plan.strategy.SparkBoundedDayBasedClusteringPlanStrategy";
+
+  // Config to provide a strategy class to execute a ClusteringPlan. Class has 
to be subclass of RunClusteringStrategy
+  public static final String RUN_CLUSTERING_STRATEGY_CLASS = 
"hoodie.clustering.run.strategy.class";
+  public static final String DEFAULT_RUN_CLUSTERING_STRATEGY_CLASS =
+      
"org.apache.hudi.client.clustering.run.strategy.SparkBulkInsertBasedRunClusteringStrategy";
+
+  // Turn on inline clustering - clustering will be run after write operation 
is complete.
+  public static final String INLINE_CLUSTERING_PROP = 
"hoodie.clustering.inline";
+  private static final String DEFAULT_INLINE_CLUSTERING = "false";
+
+  // Config to control frequency of clustering
+  public static final String INLINE_CLUSTERING_NUM_COMMIT_PROP = 
"hoodie.clustering.inline.num.commits";
+  private static final String DEFAULT_INLINE_CLUSTERING_NUM_COMMITS = "4";
+
+  // Number of partitions to scan to create ClusteringPlan.
+  public static final String CLUSTERING_TARGET_PARTITIONS = 
"hoodie.clustering.target.partitions";
+  public static final String DEFAULT_CLUSTERING_TARGET_PARTITIONS = 
String.valueOf(2);
+
+  // Each clustering operation can create multiple groups. Total amount of 
data processed by clustering operation
+  // is defined by below two properties (CLUSTERING_MAX_BYTES_IN_GROUP * 
CLUSTERING_MAX_NUM_GROUPS).
+  // Max amount of data to be included in one group
+  public static final String CLUSTERING_MAX_BYTES_IN_GROUP = 
"hoodie.clustering.max.bytes.group";
+  public static final String DEFAULT_CLUSTERING_MAX_GROUP_SIZE = 
String.valueOf(2 * 1024 * 1024 * 1024L);
+
+  // Maximum number of groups to create as part of ClusteringPlan. Increasing 
groups will increase parallelism.
+  public static final String CLUSTERING_MAX_NUM_GROUPS = 
"hoodie.clustering.max.num.groups";
+  public static final String DEFAULT_CLUSTERING_MAX_NUM_GROUPS = "30";
+
+  // Each group can produce 'N' 
(CLUSTERING_MAX_GROUP_SIZE/CLUSTERING_TARGET_FILE_SIZE) output file groups.

Review comment:
       strategy specific?

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/PartitionAwareClusteringPlanStrategy.java
##########
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.cluster.strategy;
+
+import org.apache.hudi.avro.model.HoodieClusteringGroup;
+import org.apache.hudi.avro.model.HoodieClusteringPlan;
+import org.apache.hudi.avro.model.HoodieClusteringStrategy;
+import org.apache.hudi.client.common.HoodieEngineContext;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * Scheduling strategy with restriction that clustering groups can only 
contain files from same partition.
+ */
+public abstract class PartitionAwareClusteringPlanStrategy<T extends 
HoodieRecordPayload,I,K,O> extends ClusteringPlanStrategy<T,I,K,O> {

Review comment:
       also rename the subclass to indicate that it prefers the recent 
partitions?

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.config;
+
+import org.apache.hudi.common.config.DefaultHoodieConfig;
+
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.Properties;
+
+/**
+ * Clustering specific configs.
+ */
+public class HoodieClusteringConfig extends DefaultHoodieConfig {
+
+  // Config to provide a strategy class to create ClusteringPlan. Class has to 
be subclass of ClusteringPlanStrategy
+  public static final String CLUSTERING_PLAN_STRATEGY_CLASS = 
"hoodie.clustering.plan.strategy.class";
+  public static final String DEFAULT_CLUSTERING_PLAN_STRATEGY_CLASS =
+      
"org.apache.hudi.client.clustering.plan.strategy.SparkBoundedDayBasedClusteringPlanStrategy";
+
+  // Config to provide a strategy class to execute a ClusteringPlan. Class has 
to be subclass of RunClusteringStrategy
+  public static final String RUN_CLUSTERING_STRATEGY_CLASS = 
"hoodie.clustering.run.strategy.class";
+  public static final String DEFAULT_RUN_CLUSTERING_STRATEGY_CLASS =
+      
"org.apache.hudi.client.clustering.run.strategy.SparkBulkInsertBasedRunClusteringStrategy";
+
+  // Turn on inline clustering - clustering will be run after write operation 
is complete.
+  public static final String INLINE_CLUSTERING_PROP = 
"hoodie.clustering.inline";
+  private static final String DEFAULT_INLINE_CLUSTERING = "false";
+
+  // Config to control frequency of clustering
+  public static final String INLINE_CLUSTERING_NUM_COMMIT_PROP = 
"hoodie.clustering.inline.num.commits";
+  private static final String DEFAULT_INLINE_CLUSTERING_NUM_COMMITS = "4";
+
+  // Number of partitions to scan to create ClusteringPlan.
+  public static final String CLUSTERING_TARGET_PARTITIONS = 
"hoodie.clustering.target.partitions";
+  public static final String DEFAULT_CLUSTERING_TARGET_PARTITIONS = 
String.valueOf(2);
+
+  // Each clustering operation can create multiple groups. Total amount of 
data processed by clustering operation
+  // is defined by below two properties (CLUSTERING_MAX_BYTES_IN_GROUP * 
CLUSTERING_MAX_NUM_GROUPS).
+  // Max amount of data to be included in one group
+  public static final String CLUSTERING_MAX_BYTES_IN_GROUP = 
"hoodie.clustering.max.bytes.group";

Review comment:
       may be `hoodie.clustering.max.bytes.per.group` ?

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.config;
+
+import org.apache.hudi.common.config.DefaultHoodieConfig;
+
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.Properties;
+
+/**
+ * Clustering specific configs.
+ */
+public class HoodieClusteringConfig extends DefaultHoodieConfig {
+
+  // Config to provide a strategy class to create ClusteringPlan. Class has to 
be subclass of ClusteringPlanStrategy
+  public static final String CLUSTERING_PLAN_STRATEGY_CLASS = 
"hoodie.clustering.plan.strategy.class";
+  public static final String DEFAULT_CLUSTERING_PLAN_STRATEGY_CLASS =
+      
"org.apache.hudi.client.clustering.plan.strategy.SparkBoundedDayBasedClusteringPlanStrategy";
+
+  // Config to provide a strategy class to execute a ClusteringPlan. Class has 
to be subclass of RunClusteringStrategy
+  public static final String RUN_CLUSTERING_STRATEGY_CLASS = 
"hoodie.clustering.run.strategy.class";
+  public static final String DEFAULT_RUN_CLUSTERING_STRATEGY_CLASS =
+      
"org.apache.hudi.client.clustering.run.strategy.SparkBulkInsertBasedRunClusteringStrategy";
+
+  // Turn on inline clustering - clustering will be run after write operation 
is complete.
+  public static final String INLINE_CLUSTERING_PROP = 
"hoodie.clustering.inline";
+  private static final String DEFAULT_INLINE_CLUSTERING = "false";
+
+  // Config to control frequency of clustering
+  public static final String INLINE_CLUSTERING_NUM_COMMIT_PROP = 
"hoodie.clustering.inline.num.commits";
+  private static final String DEFAULT_INLINE_CLUSTERING_NUM_COMMITS = "4";
+
+  // Number of partitions to scan to create ClusteringPlan.

Review comment:
       replace `scan` with `list`? 
   
   should we also do it based on last N commits? It helps for tables that 
receive data across partitions? This is probably some follow on work we can do 
to add a new plan strategy for this. Lets add a JIRA? 

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.config;
+
+import org.apache.hudi.common.config.DefaultHoodieConfig;
+
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.Properties;
+
+/**
+ * Clustering specific configs.
+ */
+public class HoodieClusteringConfig extends DefaultHoodieConfig {
+
+  // Config to provide a strategy class to create ClusteringPlan. Class has to 
be subclass of ClusteringPlanStrategy
+  public static final String CLUSTERING_PLAN_STRATEGY_CLASS = 
"hoodie.clustering.plan.strategy.class";
+  public static final String DEFAULT_CLUSTERING_PLAN_STRATEGY_CLASS =
+      
"org.apache.hudi.client.clustering.plan.strategy.SparkBoundedDayBasedClusteringPlanStrategy";
+
+  // Config to provide a strategy class to execute a ClusteringPlan. Class has 
to be subclass of RunClusteringStrategy
+  public static final String RUN_CLUSTERING_STRATEGY_CLASS = 
"hoodie.clustering.run.strategy.class";
+  public static final String DEFAULT_RUN_CLUSTERING_STRATEGY_CLASS =
+      
"org.apache.hudi.client.clustering.run.strategy.SparkBulkInsertBasedRunClusteringStrategy";
+
+  // Turn on inline clustering - clustering will be run after write operation 
is complete.
+  public static final String INLINE_CLUSTERING_PROP = 
"hoodie.clustering.inline";
+  private static final String DEFAULT_INLINE_CLUSTERING = "false";
+
+  // Config to control frequency of clustering
+  public static final String INLINE_CLUSTERING_NUM_COMMIT_PROP = 
"hoodie.clustering.inline.num.commits";
+  private static final String DEFAULT_INLINE_CLUSTERING_NUM_COMMITS = "4";
+
+  // Number of partitions to scan to create ClusteringPlan.
+  public static final String CLUSTERING_TARGET_PARTITIONS = 
"hoodie.clustering.target.partitions";

Review comment:
       `hoodie.clustering.plan.strategy.daybased.lookback.partitions` or 
something ties this to a specific strategy and also captures that this look at 
N partitions from now. 

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.config;
+
+import org.apache.hudi.common.config.DefaultHoodieConfig;
+
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.Properties;
+
+/**
+ * Clustering specific configs.
+ */
+public class HoodieClusteringConfig extends DefaultHoodieConfig {
+
+  // Config to provide a strategy class to create ClusteringPlan. Class has to 
be subclass of ClusteringPlanStrategy
+  public static final String CLUSTERING_PLAN_STRATEGY_CLASS = 
"hoodie.clustering.plan.strategy.class";
+  public static final String DEFAULT_CLUSTERING_PLAN_STRATEGY_CLASS =
+      
"org.apache.hudi.client.clustering.plan.strategy.SparkBoundedDayBasedClusteringPlanStrategy";
+
+  // Config to provide a strategy class to execute a ClusteringPlan. Class has 
to be subclass of RunClusteringStrategy
+  public static final String RUN_CLUSTERING_STRATEGY_CLASS = 
"hoodie.clustering.run.strategy.class";
+  public static final String DEFAULT_RUN_CLUSTERING_STRATEGY_CLASS =
+      
"org.apache.hudi.client.clustering.run.strategy.SparkBulkInsertBasedRunClusteringStrategy";
+
+  // Turn on inline clustering - clustering will be run after write operation 
is complete.
+  public static final String INLINE_CLUSTERING_PROP = 
"hoodie.clustering.inline";
+  private static final String DEFAULT_INLINE_CLUSTERING = "false";
+
+  // Config to control frequency of clustering
+  public static final String INLINE_CLUSTERING_NUM_COMMIT_PROP = 
"hoodie.clustering.inline.num.commits";
+  private static final String DEFAULT_INLINE_CLUSTERING_NUM_COMMITS = "4";
+
+  // Number of partitions to scan to create ClusteringPlan.
+  public static final String CLUSTERING_TARGET_PARTITIONS = 
"hoodie.clustering.target.partitions";
+  public static final String DEFAULT_CLUSTERING_TARGET_PARTITIONS = 
String.valueOf(2);
+
+  // Each clustering operation can create multiple groups. Total amount of 
data processed by clustering operation

Review comment:
       something to think about. should this be strategy specific too? 

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/BaseCreateClusteringPlanActionExecutor.java
##########
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.cluster;
+
+import org.apache.hudi.avro.model.HoodieClusteringPlan;
+import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata;
+import org.apache.hudi.client.common.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.BaseActionExecutor;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+
+public abstract class BaseCreateClusteringPlanActionExecutor<T extends 
HoodieRecordPayload, I, K, O> extends BaseActionExecutor<T, I, K, O, 
Option<HoodieClusteringPlan>> {

Review comment:
       drop the `Create`? 

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/ClusteringPlanStrategy.java
##########
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.cluster.strategy;
+
+import org.apache.hudi.avro.model.HoodieClusteringPlan;
+import org.apache.hudi.avro.model.HoodieSliceInfo;
+import org.apache.hudi.client.common.HoodieEngineContext;
+import org.apache.hudi.client.utils.FileSliceMetricUtils;
+import org.apache.hudi.common.model.BaseFile;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieFileGroupId;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.view.SyncableFileSystemView;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * Pluggable implementation for scheduling clustering and creating 
ClusteringPlan.
+ */
+public abstract class ClusteringPlanStrategy<T extends 
HoodieRecordPayload,I,K,O> implements Serializable {
+  private static final Logger LOG = 
LogManager.getLogger(ClusteringPlanStrategy.class);
+
+  public static final int CLUSTERING_PLAN_VERSION_1 = 1;
+
+  private final HoodieTable<T,I,K,O> hoodieTable;
+  private final transient HoodieEngineContext engineContext;
+  private final HoodieWriteConfig writeConfig;
+
+  public ClusteringPlanStrategy(HoodieTable table, HoodieEngineContext 
engineContext, HoodieWriteConfig writeConfig) {
+    this.writeConfig = writeConfig;
+    this.hoodieTable = table;
+    this.engineContext = engineContext;
+  }
+
+  /**
+   * Generate metadata for grouping eligible files and create a plan. Note 
that data is not moved around
+   * as part of this step.
+   *
+   * If there is no data available to cluster, return None.
+   */
+  public abstract Option<HoodieClusteringPlan> generateClusteringPlan();
+
+  /**
+   * Return file slices eligible for clustering. FileIds in
+   * 1) pending clustering/compaction
+   * 2) Larger than clustering target file size
+   *
+   * are not eligible for clustering.
+   */
+  protected Stream<FileSlice> getFileSlicesEligibleForClustering(String 
partition) {
+    SyncableFileSystemView fileSystemView = (SyncableFileSystemView) 
getHoodieTable().getSliceView();
+    Set<HoodieFileGroupId> fgIdsInPendingCompactionAndClustering = 
fileSystemView.getPendingCompactionOperations()
+        .map(instantTimeOpPair -> 
instantTimeOpPair.getValue().getFileGroupId())
+        .collect(Collectors.toSet());
+    
fgIdsInPendingCompactionAndClustering.addAll(fileSystemView.getFileGroupsInPendingClustering().map(Pair::getKey).collect(Collectors.toSet()));
+
+    return hoodieTable.getSliceView().getLatestFileSlices(partition)
+        // file ids already in clustering are not eligible
+        .filter(slice -> 
!fgIdsInPendingCompactionAndClustering.contains(slice.getFileGroupId()));
+  }
+
+  /**
+   * Get parameters specific to strategy. These parameters are passed from 
'schedule clustering' step to
+   * 'run clustering' step. 'run clustering' step is typically async. So these 
params help with passing any required
+   * context from schedule to run step.
+   */
+  protected abstract Map<String, String> getStrategyParams();
+
+  /**
+   * Returns any specific parameters to be stored as part of clustering 
metadata.
+   */
+  protected Map<String, String> getExtraMetadata() {
+    return Collections.emptyMap();
+  }
+
+  /**
+   * Version to support future changes for plan.
+   */
+  protected int getPlanVersion() {
+    return CLUSTERING_PLAN_VERSION_1;
+  }
+
+  /**
+   * Transform {@link FileSlice} to {@link HoodieSliceInfo}.
+   */
+  protected List<HoodieSliceInfo> getFileSliceInfo(List<FileSlice> slices) {
+    return slices.stream().map(slice -> new HoodieSliceInfo().newBuilder()
+        .setPartitionPath(slice.getPartitionPath())
+        .setFileId(slice.getFileId())
+        .setDataFilePath(slice.getBaseFile().map(BaseFile::getPath).orElse(""))

Review comment:
       EMPTY_STRING? 

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/PartitionAwareClusteringPlanStrategy.java
##########
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.cluster.strategy;
+
+import org.apache.hudi.avro.model.HoodieClusteringGroup;
+import org.apache.hudi.avro.model.HoodieClusteringPlan;
+import org.apache.hudi.avro.model.HoodieClusteringStrategy;
+import org.apache.hudi.client.common.HoodieEngineContext;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * Scheduling strategy with restriction that clustering groups can only 
contain files from same partition.
+ */
+public abstract class PartitionAwareClusteringPlanStrategy<T extends 
HoodieRecordPayload,I,K,O> extends ClusteringPlanStrategy<T,I,K,O> {

Review comment:
       same something like DayBased? to indicate date partitioning?

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDCustomColumnsSortPartitioner.java
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.execution.bulkinsert;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.table.BulkInsertPartitioner;
+import org.apache.spark.api.java.JavaRDD;
+
+/**
+ * A partitioner that does sorting based on specified column values for each 
RDD partition.
+ *
+ * @param <T> HoodieRecordPayload type
+ */
+public class RDDCustomColumnsSortPartitioner<T extends HoodieRecordPayload>
+    implements BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>> {
+
+  private final String[] sortColumnNames;
+  private final String schemaString;
+
+  public RDDCustomColumnsSortPartitioner(String[] columnNames, Schema schema) {
+    this.sortColumnNames = columnNames;
+    //TODO Schema is not serializable. So convert to String here. Figure out 
how to improve this
+    this.schemaString = schema.toString();
+  }
+
+  @Override
+  public JavaRDD<HoodieRecord<T>> repartitionRecords(JavaRDD<HoodieRecord<T>> 
records,
+                                                     int 
outputSparkPartitions) {
+    final String[] sortColumns = this.sortColumnNames;
+    final String schemaStr = this.schemaString;
+    return records.sortBy(record -> {
+      Schema schema = new Schema.Parser().parse(schemaStr);

Review comment:
       we can add `SerializableSchema` (we do this for Hadoop Config object) ?

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/cluster/SparkRunClusteringCommitActionExecutor.java
##########
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.cluster;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.avro.model.HoodieClusteringGroup;
+import org.apache.hudi.avro.model.HoodieClusteringPlan;
+import org.apache.hudi.client.SparkTaskContextSupplier;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.common.HoodieEngineContext;
+import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.client.utils.ConcatenatingIterator;
+import org.apache.hudi.common.model.ClusteringOperation;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.log.HoodieFileSliceReader;
+import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.ClusteringUtils;
+import org.apache.hudi.common.util.CommitUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieClusteringException;
+import org.apache.hudi.io.IOUtils;
+import org.apache.hudi.io.storage.HoodieFileReader;
+import org.apache.hudi.io.storage.HoodieFileReaderFactory;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
+import org.apache.hudi.table.action.cluster.strategy.RunClusteringStrategy;
+import org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+public class SparkRunClusteringCommitActionExecutor<T extends 
HoodieRecordPayload<T>>

Review comment:
       SparkClusteringCommitActionExecutor?

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieFileSliceReader.java
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.table.log;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.util.SpillableMapUtils;
+import org.apache.hudi.io.storage.HoodieFileReader;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Iterator;
+
+/**
+ * Reads records from base file and merges any updates from log files and 
provides iterable over all records in the file slice.
+ */
+public class HoodieFileSliceReader implements Iterable<HoodieRecord<? extends 
HoodieRecordPayload>> {
+  private HoodieMergedLogRecordScanner logRecordScanner;
+
+  public static <R extends IndexedRecord, T extends HoodieRecordPayload> 
HoodieFileSliceReader getFileSliceReader(
+      HoodieFileReader<R> baseFileReader, HoodieMergedLogRecordScanner 
scanner, Schema schema, String payloadClass) throws IOException {

Review comment:
       Nonetheless we should file a `code cleanup` JIRA to provide these 
iterators as core building blocks under a nice abstractions. 

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/SparkBoundedDayBasedClusteringPlanStrategy.java
##########
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.clustering.plan.strategy;
+
+import org.apache.hudi.avro.model.HoodieClusteringGroup;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieSparkCopyOnWriteTable;
+import org.apache.hudi.table.HoodieSparkMergeOnReadTable;
+import 
org.apache.hudi.table.action.cluster.strategy.PartitionAwareClusteringPlanStrategy;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.JavaRDD;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static 
org.apache.hudi.config.HoodieClusteringConfig.SORT_COLUMNS_PROPERTY;
+
+/**
+ * Clustering Strategy based on following.
+ * 1) Spark execution engine.
+ * 2) Limits amount of data per clustering operation.
+ */
+public class SparkBoundedDayBasedClusteringPlanStrategy<T extends 
HoodieRecordPayload<T>>
+    extends PartitionAwareClusteringPlanStrategy<T, JavaRDD<HoodieRecord<T>>, 
JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> {
+  private static final Logger LOG = 
LogManager.getLogger(SparkBoundedDayBasedClusteringPlanStrategy.class);
+
+  public 
SparkBoundedDayBasedClusteringPlanStrategy(HoodieSparkCopyOnWriteTable<T> table,
+                                                    HoodieSparkEngineContext 
engineContext,
+                                                    HoodieWriteConfig 
writeConfig) {
+    super(table, engineContext, writeConfig);
+  }
+
+  public 
SparkBoundedDayBasedClusteringPlanStrategy(HoodieSparkMergeOnReadTable<T> table,
+                                                    HoodieSparkEngineContext 
engineContext,
+                                                    HoodieWriteConfig 
writeConfig) {
+    super(table, engineContext, writeConfig);
+  }
+
+  @Override
+  protected Stream<HoodieClusteringGroup> 
buildClusteringGroupsForPartition(String partitionPath, List<FileSlice> 
fileSlices) {
+    List<Pair<List<FileSlice>, Integer>> fileSliceGroups = new ArrayList<>();
+    List<FileSlice> currentGroup = new ArrayList<>();
+    int totalSizeSoFar = 0;
+    for (FileSlice currentSlice : fileSlices) {
+      // assume each filegroup size is ~= parquet.max.file.size
+      totalSizeSoFar += currentSlice.getBaseFile().isPresent() ? 
currentSlice.getBaseFile().get().getFileSize() : 
getWriteConfig().getParquetMaxFileSize();
+      // check if max size is reached and create new group, if needed.
+      if (totalSizeSoFar >= getWriteConfig().getClusteringMaxBytesInGroup() && 
!currentGroup.isEmpty()) {
+        fileSliceGroups.add(Pair.of(currentGroup, 
getNumberOfOutputFileGroups(totalSizeSoFar, 
getWriteConfig().getClusteringTargetFileMaxBytes())));
+        currentGroup = new ArrayList<>();
+        totalSizeSoFar = 0;
+      }
+      currentGroup.add(currentSlice);
+    }
+    if (!currentGroup.isEmpty()) {
+      fileSliceGroups.add(Pair.of(currentGroup, 
getNumberOfOutputFileGroups(totalSizeSoFar, 
getWriteConfig().getClusteringTargetFileMaxBytes())));
+    }
+
+    return fileSliceGroups.stream().map(fileSliceGroup -> 
HoodieClusteringGroup.newBuilder()
+        .setSlices(getFileSliceInfo(fileSliceGroup.getLeft()))
+        .setNumOutputFileGroups(fileSliceGroup.getRight())
+        .setMetrics(buildMetrics(fileSliceGroup.getLeft()))
+        .build());
+  }
+
+  @Override
+  protected Map<String, String> getStrategyParams() {
+    Map<String, String> params = new HashMap<>();
+    if (getWriteConfig().getProps().containsKey(SORT_COLUMNS_PROPERTY)) {
+      params.put(SORT_COLUMNS_PROPERTY, 
getWriteConfig().getProps().getProperty(SORT_COLUMNS_PROPERTY));
+    }
+    return params;
+  }
+
+  @Override
+  protected List<String> filterPartitionPaths(List<String> partitionPaths) {
+    int targetPartitionsForClustering = 
getWriteConfig().getTargetPartitionsForClustering();
+    return partitionPaths.stream()
+        .sorted(Comparator.reverseOrder())
+        .limit(targetPartitionsForClustering > 0 ? 
targetPartitionsForClustering : partitionPaths.size())
+        .collect(Collectors.toList());
+  }
+
+  @Override
+  protected Stream<FileSlice> getFileSlicesEligibleForClustering(final String 
partition) {
+    return super.getFileSlicesEligibleForClustering(partition)
+        // files that have basefile size larger than clustering target file 
size are not eligible (Note that compaction can merge any updates)
+        .filter(slice -> 
slice.getBaseFile().map(HoodieBaseFile::getFileSize).orElse(0L) < 
getWriteConfig().getClusteringTargetFileMaxBytes());

Review comment:
       Something like `hoodie.clustering.small.file.limit` 

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/cluster/SparkRunClusteringCommitActionExecutor.java
##########
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.cluster;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.avro.model.HoodieClusteringGroup;
+import org.apache.hudi.avro.model.HoodieClusteringPlan;
+import org.apache.hudi.client.SparkTaskContextSupplier;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.common.HoodieEngineContext;
+import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.client.utils.ConcatenatingIterator;
+import org.apache.hudi.common.model.ClusteringOperation;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.log.HoodieFileSliceReader;
+import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.ClusteringUtils;
+import org.apache.hudi.common.util.CommitUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieClusteringException;
+import org.apache.hudi.io.IOUtils;
+import org.apache.hudi.io.storage.HoodieFileReader;
+import org.apache.hudi.io.storage.HoodieFileReaderFactory;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
+import org.apache.hudi.table.action.cluster.strategy.RunClusteringStrategy;
+import org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+public class SparkRunClusteringCommitActionExecutor<T extends 
HoodieRecordPayload<T>>
+    extends BaseSparkCommitActionExecutor<T> {
+
+  private static final Logger LOG = 
LogManager.getLogger(SparkRunClusteringCommitActionExecutor.class);
+  private final HoodieClusteringPlan clusteringPlan;
+
+  public SparkRunClusteringCommitActionExecutor(HoodieEngineContext context,
+                                                HoodieWriteConfig config, 
HoodieTable table,
+                                                String instantTime) {
+    super(context, config, table, instantTime, WriteOperationType.CLUSTER);
+    this.clusteringPlan = 
ClusteringUtils.getClusteringPlan(table.getMetaClient(), 
HoodieTimeline.getReplaceCommitRequestedInstant(instantTime))
+      .map(Pair::getRight).orElseThrow(() -> new 
HoodieClusteringException("Unable to read clustering plan for instant: " + 
instantTime));
+  }
+
+  @Override
+  public HoodieWriteMetadata<JavaRDD<WriteStatus>> execute() {
+    HoodieInstant instant = 
HoodieTimeline.getReplaceCommitRequestedInstant(instantTime);
+    // Mark instant as clustering inflight
+    table.getActiveTimeline().transitionReplaceRequestedToInflight(instant, 
Option.empty());
+    table.getMetaClient().reloadActiveTimeline();
+
+    JavaSparkContext engineContext = 
HoodieSparkEngineContext.getSparkContext(context);
+    // run clustering for each group async and collect WriteStatus
+    JavaRDD<WriteStatus> writeStatusRDD = 
clusteringPlan.getInputGroups().stream()

Review comment:
       this is very cool! 

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/ClusteringPlanStrategy.java
##########
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.cluster.strategy;
+
+import org.apache.hudi.avro.model.HoodieClusteringPlan;
+import org.apache.hudi.avro.model.HoodieSliceInfo;
+import org.apache.hudi.client.common.HoodieEngineContext;
+import org.apache.hudi.client.utils.FileSliceMetricUtils;
+import org.apache.hudi.common.model.BaseFile;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieFileGroupId;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.view.SyncableFileSystemView;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * Pluggable implementation for scheduling clustering and creating 
ClusteringPlan.
+ */
+public abstract class ClusteringPlanStrategy<T extends 
HoodieRecordPayload,I,K,O> implements Serializable {
+  private static final Logger LOG = 
LogManager.getLogger(ClusteringPlanStrategy.class);
+
+  public static final int CLUSTERING_PLAN_VERSION_1 = 1;
+
+  private final HoodieTable<T,I,K,O> hoodieTable;
+  private final transient HoodieEngineContext engineContext;
+  private final HoodieWriteConfig writeConfig;
+
+  public ClusteringPlanStrategy(HoodieTable table, HoodieEngineContext 
engineContext, HoodieWriteConfig writeConfig) {
+    this.writeConfig = writeConfig;
+    this.hoodieTable = table;
+    this.engineContext = engineContext;
+  }
+
+  /**
+   * Generate metadata for grouping eligible files and create a plan. Note 
that data is not moved around
+   * as part of this step.
+   *
+   * If there is no data available to cluster, return None.
+   */
+  public abstract Option<HoodieClusteringPlan> generateClusteringPlan();
+
+  /**
+   * Return file slices eligible for clustering. FileIds in
+   * 1) pending clustering/compaction
+   * 2) Larger than clustering target file size
+   *
+   * are not eligible for clustering.
+   */
+  protected Stream<FileSlice> getFileSlicesEligibleForClustering(String 
partition) {
+    SyncableFileSystemView fileSystemView = (SyncableFileSystemView) 
getHoodieTable().getSliceView();
+    Set<HoodieFileGroupId> fgIdsInPendingCompactionAndClustering = 
fileSystemView.getPendingCompactionOperations()
+        .map(instantTimeOpPair -> 
instantTimeOpPair.getValue().getFileGroupId())
+        .collect(Collectors.toSet());
+    
fgIdsInPendingCompactionAndClustering.addAll(fileSystemView.getFileGroupsInPendingClustering().map(Pair::getKey).collect(Collectors.toSet()));
+
+    return hoodieTable.getSliceView().getLatestFileSlices(partition)
+        // file ids already in clustering are not eligible
+        .filter(slice -> 
!fgIdsInPendingCompactionAndClustering.contains(slice.getFileGroupId()));
+  }
+
+  /**
+   * Get parameters specific to strategy. These parameters are passed from 
'schedule clustering' step to
+   * 'run clustering' step. 'run clustering' step is typically async. So these 
params help with passing any required
+   * context from schedule to run step.
+   */
+  protected abstract Map<String, String> getStrategyParams();
+
+  /**
+   * Returns any specific parameters to be stored as part of clustering 
metadata.
+   */
+  protected Map<String, String> getExtraMetadata() {
+    return Collections.emptyMap();
+  }
+
+  /**
+   * Version to support future changes for plan.
+   */
+  protected int getPlanVersion() {
+    return CLUSTERING_PLAN_VERSION_1;
+  }
+
+  /**
+   * Transform {@link FileSlice} to {@link HoodieSliceInfo}.
+   */
+  protected List<HoodieSliceInfo> getFileSliceInfo(List<FileSlice> slices) {

Review comment:
       worth thinking about making this a static helper?

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDCustomColumnsSortPartitioner.java
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.execution.bulkinsert;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.table.BulkInsertPartitioner;
+import org.apache.spark.api.java.JavaRDD;
+
+/**
+ * A partitioner that does sorting based on specified column values for each 
RDD partition.
+ *
+ * @param <T> HoodieRecordPayload type
+ */
+public class RDDCustomColumnsSortPartitioner<T extends HoodieRecordPayload>
+    implements BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>> {
+
+  private final String[] sortColumnNames;
+  private final String schemaString;
+
+  public RDDCustomColumnsSortPartitioner(String[] columnNames, Schema schema) {
+    this.sortColumnNames = columnNames;
+    //TODO Schema is not serializable. So convert to String here. Figure out 
how to improve this
+    this.schemaString = schema.toString();
+  }
+
+  @Override
+  public JavaRDD<HoodieRecord<T>> repartitionRecords(JavaRDD<HoodieRecord<T>> 
records,
+                                                     int 
outputSparkPartitions) {
+    final String[] sortColumns = this.sortColumnNames;
+    final String schemaStr = this.schemaString;
+    return records.sortBy(record -> {
+      Schema schema = new Schema.Parser().parse(schemaStr);

Review comment:
       See `SerialiableConfiguration` for reference
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to