vinothchandar commented on a change in pull request #1827:
URL: https://github.com/apache/hudi/pull/1827#discussion_r494648857
##########
File path:
hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestRollbacksCommand.java
##########
@@ -88,7 +88,7 @@ public void init() throws IOException {
HoodieWriteConfig config =
HoodieWriteConfig.newBuilder().withPath(tablePath)
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build()).build();
- try (HoodieWriteClient client = getHoodieWriteClient(config)) {
+ try (AbstractHoodieWriteClient client = getHoodieWriteClient(config)) {
Review comment:
just like this, we should try to use the abstract class as much as we
can
##########
File path:
hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestArchivedCommitsCommand.java
##########
@@ -92,8 +93,9 @@ public void init() throws IOException {
metaClient.getActiveTimeline().reload().getAllCommitsTimeline().filterCompletedInstants();
// archive
- HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg,
hadoopConf);
- archiveLog.archiveIfRequired(jsc);
+ HoodieSparkTable table = HoodieSparkTable.create(cfg, context, metaClient);
Review comment:
can we replace more of the code to direclty just use `HoodieTable`
instead. Need to examine cases that need an explicit HoodieSparkTable
##########
File path: hudi-client/hudi-client-common/pom.xml
##########
@@ -0,0 +1,264 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
Review comment:
we are missing the dependency we had on hbase-client and hbase-server
here. Will punt for now, as it will get picked up from hudi-common.
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
##########
@@ -100,6 +99,7 @@
public static final String EMBEDDED_TIMELINE_SERVER_ENABLED =
"hoodie.embed.timeline.server";
public static final String DEFAULT_EMBEDDED_TIMELINE_SERVER_ENABLED = "true";
+ public static final String EMBEDDED_TIMELINE_SERVER_HOST =
"hoodie.embed.timeline.server.host";
Review comment:
this cannot be configurable. yarn/k8s will decide the actual driver
host. changing it to how it was before
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##########
@@ -716,32 +669,95 @@ private void rollbackPendingCommits() {
* @param compactionInstantTime Compaction Instant Time
* @return RDD of Write Status
*/
- private JavaRDD<WriteStatus> compact(String compactionInstantTime, boolean
shouldComplete) {
- HoodieTable<T> table = HoodieTable.create(config, hadoopConf);
- HoodieTimeline pendingCompactionTimeline =
table.getActiveTimeline().filterPendingCompactionTimeline();
- HoodieInstant inflightInstant =
HoodieTimeline.getCompactionInflightInstant(compactionInstantTime);
- if (pendingCompactionTimeline.containsInstant(inflightInstant)) {
- rollbackInflightCompaction(inflightInstant, table);
- table.getMetaClient().reloadActiveTimeline();
- }
- compactionTimer = metrics.getCompactionCtx();
- HoodieWriteMetadata compactionMetadata = table.compact(jsc,
compactionInstantTime);
- JavaRDD<WriteStatus> statuses = compactionMetadata.getWriteStatuses();
- if (shouldComplete && compactionMetadata.getCommitMetadata().isPresent()) {
- completeCompaction(compactionMetadata.getCommitMetadata().get(),
statuses, table, compactionInstantTime);
- }
- return statuses;
- }
+ protected abstract O compact(String compactionInstantTime, boolean
shouldComplete);
/**
* Performs a compaction operation on a table, serially before or after an
insert/upsert action.
*/
- private Option<String> inlineCompact(Option<Map<String, String>>
extraMetadata) {
+ protected Option<String> inlineCompact(Option<Map<String, String>>
extraMetadata) {
Option<String> compactionInstantTimeOpt =
scheduleCompaction(extraMetadata);
compactionInstantTimeOpt.ifPresent(compactionInstantTime -> {
// inline compaction should auto commit as the user is never given
control
compact(compactionInstantTime, true);
});
return compactionInstantTimeOpt;
}
+
+ /**
+ * Finalize Write operation.
+ *
+ * @param table HoodieTable
+ * @param instantTime Instant Time
+ * @param stats Hoodie Write Stat
+ */
+ protected void finalizeWrite(HoodieTable<T, I, K, O, P> table, String
instantTime, List<HoodieWriteStat> stats) {
+ try {
+ final Timer.Context finalizeCtx = metrics.getFinalizeCtx();
+ table.finalizeWrite(context, instantTime, stats);
+ if (finalizeCtx != null) {
+ Option<Long> durationInMs =
Option.of(metrics.getDurationInMs(finalizeCtx.stop()));
+ durationInMs.ifPresent(duration -> {
+ LOG.info("Finalize write elapsed time (milliseconds): " + duration);
+ metrics.updateFinalizeWriteMetrics(duration, stats.size());
+ });
+ }
+ } catch (HoodieIOException ioe) {
+ throw new HoodieCommitException("Failed to complete commit " +
instantTime + " due to finalize errors.", ioe);
+ }
+ }
+
+ public HoodieMetrics getMetrics() {
+ return metrics;
+ }
+
+ public HoodieIndex<T, I, K, O, P> getIndex() {
Review comment:
this needs to be removed. but not the issue for this PR to be bothered
about may be
##########
File path: hudi-client/pom.xml
##########
@@ -24,294 +24,14 @@
<modelVersion>4.0.0</modelVersion>
<artifactId>hudi-client</artifactId>
- <packaging>jar</packaging>
+ <packaging>pom</packaging>
<properties>
<main.basedir>${project.parent.basedir}</main.basedir>
</properties>
- <build>
Review comment:
Need to ensure there are no side effects in the pom due to this. i.e
something that can affect bundles so forth.
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##########
@@ -18,120 +18,195 @@
package org.apache.hudi.client;
+import com.codahale.metrics.Timer;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.avro.model.HoodieRestoreMetadata;
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
-import org.apache.hudi.client.embedded.EmbeddedTimelineService;
+import org.apache.hudi.callback.HoodieWriteCommitCallback;
+import org.apache.hudi.callback.common.HoodieWriteCommitCallbackMessage;
+import org.apache.hudi.callback.util.HoodieCommitCallbackFactory;
+import org.apache.hudi.client.embebbed.BaseEmbeddedTimelineService;
+import org.apache.hudi.common.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieKey;
-import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
-import org.apache.hudi.common.table.timeline.HoodieInstant.State;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
-import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieWriteConfig;
+
import org.apache.hudi.exception.HoodieCommitException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieRestoreException;
import org.apache.hudi.exception.HoodieRollbackException;
import org.apache.hudi.exception.HoodieSavepointException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.metrics.HoodieMetrics;
-import org.apache.hudi.table.HoodieTable;
-import org.apache.hudi.table.HoodieTimelineArchiveLog;
-import org.apache.hudi.table.MarkerFiles;
import org.apache.hudi.table.BulkInsertPartitioner;
+import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;
-import org.apache.hudi.table.action.compact.CompactHelpers;
import org.apache.hudi.table.action.savepoint.SavepointHelpers;
-
-import com.codahale.metrics.Timer;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
-import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.JavaSparkContext;
import java.io.IOException;
+import java.nio.charset.StandardCharsets;
import java.text.ParseException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
- * Hoodie Write Client helps you build tables on HDFS [insert()] and then
perform efficient mutations on an HDFS
- * table [upsert()]
- * <p>
- * Note that, at any given time, there can only be one Spark job performing
these operations on a Hoodie table.
+ * Abstract Write Client providing functionality for performing commit, index
updates and rollback
+ * Reused for regular write operations like upsert/insert/bulk-insert.. as
well as bootstrap
+ *
+ * @param <T> Sub type of HoodieRecordPayload
+ * @param <I> Type of inputs
+ * @param <K> Type of keys
+ * @param <O> Type of outputs
+ * @param <P> Type of record position [Key, Option[partitionPath, fileID]] in
hoodie table
*/
-public class HoodieWriteClient<T extends HoodieRecordPayload> extends
AbstractHoodieWriteClient<T> {
-
+public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload,
I, K, O, P> extends AbstractHoodieClient {
private static final long serialVersionUID = 1L;
- private static final Logger LOG =
LogManager.getLogger(HoodieWriteClient.class);
- private static final String LOOKUP_STR = "lookup";
- private final boolean rollbackPending;
- private final transient HoodieMetrics metrics;
- private transient Timer.Context compactionTimer;
+ private static final Logger LOG =
LogManager.getLogger(AbstractHoodieWriteClient.class);
+
+ protected final transient HoodieMetrics metrics;
+ private final transient HoodieIndex<T, I, K, O, P> index;
+
+ protected transient Timer.Context writeContext = null;
+ private transient WriteOperationType operationType;
+ private transient HoodieWriteCommitCallback commitCallback;
+
+ protected static final String LOOKUP_STR = "lookup";
+ protected final boolean rollbackPending;
+ protected transient Timer.Context compactionTimer;
private transient AsyncCleanerService asyncCleanerService;
+ public void setOperationType(WriteOperationType operationType) {
+ this.operationType = operationType;
+ }
+
+ public WriteOperationType getOperationType() {
+ return this.operationType;
+ }
+
/**
* Create a write client, without cleaning up failed/inflight commits.
*
- * @param jsc Java Spark Context
+ * @param context Java Spark Context
* @param clientConfig instance of HoodieWriteConfig
*/
- public HoodieWriteClient(JavaSparkContext jsc, HoodieWriteConfig
clientConfig) {
- this(jsc, clientConfig, false);
+ public AbstractHoodieWriteClient(HoodieEngineContext context,
HoodieWriteConfig clientConfig) {
+ this(context, clientConfig, false);
}
/**
* Create a write client, with new hudi index.
*
- * @param jsc Java Spark Context
- * @param writeConfig instance of HoodieWriteConfig
+ * @param context HoodieEngineContext
+ * @param writeConfig instance of HoodieWriteConfig
* @param rollbackPending whether need to cleanup pending commits
*/
- public HoodieWriteClient(JavaSparkContext jsc, HoodieWriteConfig
writeConfig, boolean rollbackPending) {
- this(jsc, writeConfig, rollbackPending,
HoodieIndex.createIndex(writeConfig));
- }
-
- public HoodieWriteClient(JavaSparkContext jsc, HoodieWriteConfig
writeConfig, boolean rollbackPending, HoodieIndex index) {
- this(jsc, writeConfig, rollbackPending, index, Option.empty());
+ public AbstractHoodieWriteClient(HoodieEngineContext context,
HoodieWriteConfig writeConfig, boolean rollbackPending) {
+ this(context, writeConfig, rollbackPending, Option.empty());
}
/**
- * Create a write client, allows to specify all parameters.
+ * Create a write client, allows to specify all parameters.
*
- * @param jsc Java Spark Context
- * @param writeConfig instance of HoodieWriteConfig
+ * @param context HoodieEngineContext
+ * @param writeConfig instance of HoodieWriteConfig
* @param rollbackPending whether need to cleanup pending commits
* @param timelineService Timeline Service that runs as part of write client.
*/
- public HoodieWriteClient(JavaSparkContext jsc, HoodieWriteConfig
writeConfig, boolean rollbackPending,
- HoodieIndex index, Option<EmbeddedTimelineService> timelineService) {
- super(jsc, index, writeConfig, timelineService);
+ public AbstractHoodieWriteClient(HoodieEngineContext context,
HoodieWriteConfig writeConfig, boolean rollbackPending,
+ Option<BaseEmbeddedTimelineService>
timelineService) {
+ super(context, writeConfig, timelineService);
this.metrics = new HoodieMetrics(config, config.getTableName());
this.rollbackPending = rollbackPending;
+ this.index = createIndex(writeConfig);
Review comment:
Understood
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##########
@@ -716,32 +669,95 @@ private void rollbackPendingCommits() {
* @param compactionInstantTime Compaction Instant Time
* @return RDD of Write Status
*/
- private JavaRDD<WriteStatus> compact(String compactionInstantTime, boolean
shouldComplete) {
- HoodieTable<T> table = HoodieTable.create(config, hadoopConf);
- HoodieTimeline pendingCompactionTimeline =
table.getActiveTimeline().filterPendingCompactionTimeline();
- HoodieInstant inflightInstant =
HoodieTimeline.getCompactionInflightInstant(compactionInstantTime);
- if (pendingCompactionTimeline.containsInstant(inflightInstant)) {
- rollbackInflightCompaction(inflightInstant, table);
- table.getMetaClient().reloadActiveTimeline();
- }
- compactionTimer = metrics.getCompactionCtx();
- HoodieWriteMetadata compactionMetadata = table.compact(jsc,
compactionInstantTime);
- JavaRDD<WriteStatus> statuses = compactionMetadata.getWriteStatuses();
- if (shouldComplete && compactionMetadata.getCommitMetadata().isPresent()) {
- completeCompaction(compactionMetadata.getCommitMetadata().get(),
statuses, table, compactionInstantTime);
- }
- return statuses;
- }
+ protected abstract O compact(String compactionInstantTime, boolean
shouldComplete);
/**
* Performs a compaction operation on a table, serially before or after an
insert/upsert action.
*/
- private Option<String> inlineCompact(Option<Map<String, String>>
extraMetadata) {
+ protected Option<String> inlineCompact(Option<Map<String, String>>
extraMetadata) {
Option<String> compactionInstantTimeOpt =
scheduleCompaction(extraMetadata);
compactionInstantTimeOpt.ifPresent(compactionInstantTime -> {
// inline compaction should auto commit as the user is never given
control
compact(compactionInstantTime, true);
});
return compactionInstantTimeOpt;
}
+
+ /**
+ * Finalize Write operation.
+ *
+ * @param table HoodieTable
+ * @param instantTime Instant Time
+ * @param stats Hoodie Write Stat
+ */
+ protected void finalizeWrite(HoodieTable<T, I, K, O, P> table, String
instantTime, List<HoodieWriteStat> stats) {
+ try {
+ final Timer.Context finalizeCtx = metrics.getFinalizeCtx();
+ table.finalizeWrite(context, instantTime, stats);
+ if (finalizeCtx != null) {
+ Option<Long> durationInMs =
Option.of(metrics.getDurationInMs(finalizeCtx.stop()));
+ durationInMs.ifPresent(duration -> {
+ LOG.info("Finalize write elapsed time (milliseconds): " + duration);
+ metrics.updateFinalizeWriteMetrics(duration, stats.size());
+ });
+ }
+ } catch (HoodieIOException ioe) {
+ throw new HoodieCommitException("Failed to complete commit " +
instantTime + " due to finalize errors.", ioe);
+ }
+ }
+
+ public HoodieMetrics getMetrics() {
+ return metrics;
+ }
+
+ public HoodieIndex<T, I, K, O, P> getIndex() {
+ return index;
+ }
+
+ /**
+ * Get HoodieTable and init {@link Timer.Context}.
+ *
+ * @param operationType write operation type
+ * @param instantTime current inflight instant time
+ * @return HoodieTable
+ */
+ protected abstract HoodieTable<T, I, K, O, P>
getTableAndInitCtx(WriteOperationType operationType, String instantTime);
+
+ /**
+ * Sets write schema from last instant since deletes may not have schema set
in the config.
+ */
+ protected void setWriteSchemaForDeletes(HoodieTableMetaClient metaClient) {
+ try {
+ HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();
+ Option<HoodieInstant> lastInstant =
+ activeTimeline.filterCompletedInstants().filter(s ->
s.getAction().equals(metaClient.getCommitActionType()))
+ .lastInstant();
+ if (lastInstant.isPresent()) {
+ HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(
+ activeTimeline.getInstantDetails(lastInstant.get()).get(),
HoodieCommitMetadata.class);
+ if
(commitMetadata.getExtraMetadata().containsKey(HoodieCommitMetadata.SCHEMA_KEY))
{
+
config.setSchema(commitMetadata.getExtraMetadata().get(HoodieCommitMetadata.SCHEMA_KEY));
+ } else {
+ throw new HoodieIOException("Latest commit does not have any schema
in commit metadata");
+ }
+ } else {
+ throw new HoodieIOException("Deletes issued without any prior
commits");
+ }
+ } catch (IOException e) {
+ throw new HoodieIOException("IOException thrown while reading last
commit metadata", e);
+ }
+ }
+
+ @Override
+ public void close() {
+ // Stop timeline-server if running
+ super.close();
+ // Calling this here releases any resources used by your index, so make
sure to finish any related operations
+ // before this point
+ this.index.close();
+
+ // release AsyncCleanerService
+ AsyncCleanerService.forceShutdown(asyncCleanerService);
+ asyncCleanerService = null;
Review comment:
this was actually same. fixing it
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
##########
@@ -149,7 +148,7 @@ private void init(String fileId, String partitionPath,
HoodieBaseFile baseFileTo
private void init(String fileId, Iterator<HoodieRecord<T>> newRecordsItr) {
try {
// Load the new records in a map
- long memoryForMerge =
SparkConfigUtils.getMaxMemoryPerPartitionMerge(config.getProps());
+ long memoryForMerge = config.getMaxMemoryPerPartitionMerge();
Review comment:
this is a problem. it changes behavior and needs to be reworked.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]