This is an automated email from the ASF dual-hosted git repository.
voonhous pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 508e2955ef8b refactor: Add Lombok annotations to hudi-utilities (Part
3) (#17877)
508e2955ef8b is described below
commit 508e2955ef8be9d0062b5ea46baf3f183c5a411e
Author: voonhous <[email protected]>
AuthorDate: Wed Jun 3 19:42:35 2026 +0800
refactor: Add Lombok annotations to hudi-utilities (Part 3) (#17877)
---
.../hudi/utilities/streamer/BootstrapExecutor.java | 16 ++--
.../utilities/streamer/DefaultStreamContext.java | 20 +---
.../apache/hudi/utilities/streamer/ErrorEvent.java | 37 +------
.../streamer/HoodieMultiTableStreamer.java | 34 +++----
.../hudi/utilities/streamer/HoodieStreamer.java | 62 +++++-------
.../utilities/streamer/HoodieStreamerUtils.java | 8 +-
.../streamer/NoNewDataTerminationStrategy.java | 8 +-
.../utilities/streamer/SchedulerConfGenerator.java | 12 +--
.../utilities/streamer/SourceFormatAdapter.java | 23 ++---
.../utilities/streamer/SparkSampleWritesUtils.java | 24 +++--
.../apache/hudi/utilities/streamer/StreamSync.java | 106 ++++++++-------------
.../streamer/StreamerCheckpointUtils.java | 9 +-
.../utilities/streamer/TableExecutionContext.java | 58 ++---------
.../TestHoodieMetadataTableValidator.java | 24 +----
.../hudi/utilities/TestHoodieRepairTool.java | 8 +-
.../deltastreamer/HoodieDeltaStreamerTestBase.java | 34 ++++---
.../MockConfigurationHotUpdateStrategy.java | 7 +-
.../deltastreamer/TestHoodieDeltaStreamer.java | 68 +++++++------
.../TestHoodieDeltaStreamerWithMultiWriter.java | 18 ++--
.../TestHoodieMultiTableDeltaStreamer.java | 12 +--
.../functional/TestHiveSchemaProvider.java | 10 +-
.../functional/TestJdbcbasedSchemaProvider.java | 7 +-
.../TestHoodieMultiTableServicesMain.java | 14 ++-
.../utilities/sources/BaseTestKafkaSource.java | 22 ++---
.../sources/S3EventsHoodieIncrSourceHarness.java | 23 ++---
.../hudi/utilities/sources/TestDataSource.java | 11 +--
.../sources/TestGcsEventsHoodieIncrSource.java | 6 +-
.../utilities/sources/TestHoodieIncrSource.java | 38 ++------
.../hudi/utilities/testutils/JdbcTestUtils.java | 18 ++--
.../utilities/testutils/UtilitiesTestBase.java | 7 +-
.../testutils/sources/AbstractBaseTestSource.java | 18 ++--
.../sources/DistributedTestDataSource.java | 10 +-
32 files changed, 272 insertions(+), 500 deletions(-)
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/BootstrapExecutor.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/BootstrapExecutor.java
index c9328cfc3bff..24ce507b34a7 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/BootstrapExecutor.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/BootstrapExecutor.java
@@ -44,12 +44,12 @@ import org.apache.hudi.util.SparkKeyGenUtils;
import org.apache.hudi.utilities.UtilHelpers;
import org.apache.hudi.utilities.schema.SchemaProvider;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.spark.api.java.JavaSparkContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.Serializable;
@@ -71,10 +71,9 @@ import static
org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_BASE_PATH;
/**
* Performs bootstrap from a non-hudi source.
*/
+@Slf4j
public class BootstrapExecutor implements Serializable {
- private static final Logger LOG =
LoggerFactory.getLogger(BootstrapExecutor.class);
-
/**
* Config.
*/
@@ -103,6 +102,7 @@ public class BootstrapExecutor implements Serializable {
/**
* Bootstrap Configuration.
*/
+ @Getter
private final HoodieWriteConfig bootstrapConfig;
/**
@@ -146,7 +146,7 @@ public class BootstrapExecutor implements Serializable {
builder =
builder.withSchema(schemaProvider.getTargetHoodieSchema().toString());
}
this.bootstrapConfig = builder.build();
- LOG.info("Created bootstrap executor with configs : " +
bootstrapConfig.getProps());
+ log.info("Created bootstrap executor with configs: {}",
bootstrapConfig.getProps());
}
/**
@@ -189,7 +189,7 @@ public class BootstrapExecutor implements Serializable {
Path basePath = new Path(cfg.targetBasePath);
if (fs.exists(basePath)) {
if (cfg.bootstrapOverwrite) {
- LOG.info("Target base path already exists, overwrite it");
+ log.info("Target base path already exists, overwrite it");
fs.delete(basePath, true);
} else {
throw new HoodieException("target base path already exists at " +
cfg.targetBasePath
@@ -244,8 +244,4 @@ public class BootstrapExecutor implements Serializable {
builder.initTable(HadoopFSUtils.getStorageConfWithCopy(jssc.hadoopConfiguration()),
cfg.targetBasePath);
}
-
- public HoodieWriteConfig getBootstrapConfig() {
- return bootstrapConfig;
- }
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/DefaultStreamContext.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/DefaultStreamContext.java
index f8dabeb89c96..46972fe353cf 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/DefaultStreamContext.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/DefaultStreamContext.java
@@ -21,28 +21,18 @@ package org.apache.hudi.utilities.streamer;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.utilities.schema.SchemaProvider;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+
/**
* The default implementation for the StreamContext interface,
* composes SchemaProvider and SourceProfileSupplier currently,
* can be extended for other arguments in the future.
*/
+@AllArgsConstructor
+@Getter
public class DefaultStreamContext implements StreamContext {
private final SchemaProvider schemaProvider;
private final Option<SourceProfileSupplier> sourceProfileSupplier;
-
- public DefaultStreamContext(SchemaProvider schemaProvider,
Option<SourceProfileSupplier> sourceProfileSupplier) {
- this.schemaProvider = schemaProvider;
- this.sourceProfileSupplier = sourceProfileSupplier;
- }
-
- @Override
- public SchemaProvider getSchemaProvider() {
- return schemaProvider;
- }
-
- @Override
- public Option<SourceProfileSupplier> getSourceProfileSupplier() {
- return sourceProfileSupplier;
- }
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/ErrorEvent.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/ErrorEvent.java
index a2f1cb277ec6..5bbba78e3921 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/ErrorEvent.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/ErrorEvent.java
@@ -19,45 +19,16 @@
package org.apache.hudi.utilities.streamer;
-import java.util.Objects;
+import lombok.Value;
/**
* Error event is an event triggered during write or processing failure of a
record.
*/
+@Value
public class ErrorEvent<T> {
- private final ErrorReason reason;
- private final T payload;
-
- public ErrorEvent(T payload, ErrorReason reason) {
- this.payload = payload;
- this.reason = reason;
- }
-
- public T getPayload() {
- return payload;
- }
-
- public ErrorReason getReason() {
- return reason;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- ErrorEvent<?> that = (ErrorEvent<?>) o;
- return reason == that.reason && Objects.equals(payload, that.payload);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(reason, payload);
- }
+ T payload;
+ ErrorReason reason;
/**
* The reason behind write or processing failure of a record
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieMultiTableStreamer.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieMultiTableStreamer.java
index 90f8f6558ded..56101e9ae93a 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieMultiTableStreamer.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieMultiTableStreamer.java
@@ -39,12 +39,13 @@ import org.apache.hudi.utilities.sources.JsonDFSSource;
import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
+import lombok.AccessLevel;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.spark.api.java.JavaSparkContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.Serializable;
@@ -69,11 +70,12 @@ import static
org.apache.hudi.utilities.config.HoodieStreamerConfig.TRANSFORMER_
* Helps with ingesting incremental data into hoodie datasets for multiple
tables.
* Supports COPY_ON_WRITE and MERGE_ON_READ storage types.
*/
+@Getter
+@Slf4j
public class HoodieMultiTableStreamer {
- private static final Logger LOG =
LoggerFactory.getLogger(HoodieMultiTableStreamer.class);
-
private final List<TableExecutionContext> tableExecutionContexts;
+ @Getter(AccessLevel.NONE)
private transient JavaSparkContext jssc;
private final Set<String> successTables;
private final Set<String> failedTables;
@@ -120,7 +122,7 @@ public class HoodieMultiTableStreamer {
//commonProps are passed as parameter which contain table to config file
mapping
private void populateTableExecutionContextList(TypedProperties properties,
String configFolder, FileSystem fs, Config config) throws IOException {
List<String> tablesToBeIngested = getTablesToBeIngested(properties);
- LOG.info("tables to be ingested via MultiTableDeltaStreamer : " +
tablesToBeIngested);
+ log.info("tables to be ingested via MultiTableDeltaStreamer : {}",
tablesToBeIngested);
TableExecutionContext executionContext;
for (String table : tablesToBeIngested) {
String[] tableWithDatabase = table.split("\\.");
@@ -271,11 +273,11 @@ public class HoodieMultiTableStreamer {
}
if (config.enableHiveSync) {
- LOG.warn("--enable-hive-sync will be deprecated in a future release;
please use --enable-sync instead for Hive syncing");
+ log.warn("--enable-hive-sync will be deprecated in a future release;
please use --enable-sync instead for Hive syncing");
}
if (config.targetTableName != null) {
- LOG.warn("--target-table is deprecated and will be removed in a future
release due to it's useless;"
+ log.warn("--target-table is deprecated and will be removed in a future
release due to it's useless;"
+ " please use {} to configure multiple target tables",
HoodieStreamerConfig.TABLES_TO_BE_INGESTED.key());
}
@@ -469,7 +471,7 @@ public class HoodieMultiTableStreamer {
successTables.add(Helpers.getTableWithDatabase(context));
streamer.shutdownGracefully();
} catch (Exception e) {
- LOG.error("error while running MultiTableDeltaStreamer for table: " +
context.getTableName(), e);
+ log.error("error while running MultiTableDeltaStreamer for table: {}",
context.getTableName(), e);
failedTables.add(Helpers.getTableWithDatabase(context));
} finally {
if (streamer != null) {
@@ -478,9 +480,9 @@ public class HoodieMultiTableStreamer {
}
}
- LOG.info("Ingestion was successful for topics: " + successTables);
+ log.info("Ingestion was successful for topics: {}", successTables);
if (!failedTables.isEmpty()) {
- LOG.info("Ingestion failed for topics: " + failedTables);
+ log.info("Ingestion failed for topics: {}", failedTables);
}
}
@@ -496,16 +498,4 @@ public class HoodieMultiTableStreamer {
private static final String UNDERSCORE = "_";
private static final String COMMA_SEPARATOR = ",";
}
-
- public Set<String> getSuccessTables() {
- return successTables;
- }
-
- public Set<String> getFailedTables() {
- return failedTables;
- }
-
- public List<TableExecutionContext> getTableExecutionContexts() {
- return this.tableExecutionContexts;
- }
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java
index 84929ca69556..0186e5fd0584 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java
@@ -74,14 +74,14 @@ import org.apache.hudi.utilities.sources.JsonDFSSource;
import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.Serializable;
@@ -96,7 +96,6 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import static java.lang.String.format;
import static
org.apache.hudi.common.table.checkpoint.StreamerCheckpointV1.STREAMER_CHECKPOINT_RESET_KEY_V1;
import static org.apache.hudi.common.util.ValidationUtils.checkArgument;
import static org.apache.hudi.utilities.UtilHelpers.buildProperties;
@@ -111,10 +110,10 @@ import static
org.apache.hudi.utilities.UtilHelpers.readConfig;
* write-to-sink (c) Schedule Compactions if needed (d) Conditionally Sync to
Hive each cycle. For MOR table with
* continuous mode enabled, a separate compactor thread is allocated to
execute compactions
*/
+@Slf4j
public class HoodieStreamer implements Serializable {
private static final long serialVersionUID = 1L;
- private static final Logger LOG =
LoggerFactory.getLogger(HoodieStreamer.class);
private static final List<String> DEFAULT_SENSITIVE_CONFIG_KEYS =
Arrays.asList(
HoodieWriteConfig.SENSITIVE_CONFIG_KEYS_FILTER.defaultValue().split(","));
private static final String SENSITIVE_VALUES_MASKED =
"SENSITIVE_INFO_MASKED";
@@ -214,9 +213,9 @@ public class HoodieStreamer implements Serializable {
public void shutdownGracefully() {
ingestionService.ifPresent(ds -> {
- LOG.info("Shutting down DeltaStreamer");
+ log.info("Shutting down DeltaStreamer");
ds.shutdown(false);
- LOG.info("Async service shutdown complete. Closing DeltaSync ");
+ log.info("Async service shutdown complete. Closing DeltaSync ");
ds.close();
});
}
@@ -226,7 +225,7 @@ public class HoodieStreamer implements Serializable {
*/
public void sync() throws Exception {
if (bootstrapExecutor.isPresent()) {
- LOG.info("Performing bootstrap. Source=" +
bootstrapExecutor.get().getBootstrapConfig().getBootstrapSourceBasePath());
+ log.info("Performing bootstrap. Source={}",
bootstrapExecutor.get().getBootstrapConfig().getBootstrapSourceBasePath());
bootstrapExecutor.get().execute();
} else {
ingestionService.ifPresent(HoodieIngestionService::startIngestion);
@@ -608,7 +607,7 @@ public class HoodieStreamer implements Serializable {
for (String key : allKeys) {
String value = Option.ofNullable(props.get(key)).orElse("").toString();
// Truncate too long values.
- if (value.length() > 255 && !LOG.isDebugEnabled()) {
+ if (value.length() > 255 && !log.isDebugEnabled()) {
value = value.substring(0, 128) + "[...]";
}
@@ -648,7 +647,7 @@ public class HoodieStreamer implements Serializable {
jssc = UtilHelpers.buildSparkContext(sparkAppName, cfg.sparkMaster,
cfg.enableHiveSupport, additionalSparkConfigs);
}
if (cfg.enableHiveSync) {
- LOG.warn("--enable-hive-sync will be deprecated in a future release;
please use --enable-sync instead for Hive syncing");
+ log.warn("--enable-hive-sync will be deprecated in a future release;
please use --enable-sync instead for Hive syncing");
}
int exitCode = 0;
@@ -676,11 +675,13 @@ public class HoodieStreamer implements Serializable {
/**
* Schema provider that supplies the command for reading the input and
writing out the target table.
*/
+ @Getter
private transient SchemaProvider schemaProvider;
/**
* Spark Session.
*/
+ @Getter
private transient SparkSession sparkSession;
/**
@@ -696,6 +697,7 @@ public class HoodieStreamer implements Serializable {
/**
* Bag of properties with source, hoodie client, key generator etc.
*/
+ @Getter
TypedProperties props;
/**
@@ -757,7 +759,7 @@ public class HoodieStreamer implements Serializable {
properties.get().forEach((k, v) -> propsToValidate.put(k.toString(),
v.toString()));
HoodieWriterUtils.validateTableConfig(this.sparkSession,
org.apache.hudi.HoodieConversionUtils.mapAsScalaImmutableMap(propsToValidate),
meta.getTableConfig());
} catch (HoodieIOException e) {
- LOG.warn("Full exception msg {}, msg {}", e.getLocalizedMessage(),
e.getMessage());
+ log.warn("Full exception msg {}, msg {}", e.getLocalizedMessage(),
e.getMessage());
if (e.getMessage().contains("Could not load Hoodie properties") &&
e.getMessage().contains(HoodieTableConfig.HOODIE_PROPERTIES_FILE)) {
initializeTableTypeAndBaseFileFormat();
} else {
@@ -772,7 +774,7 @@ public class HoodieStreamer implements Serializable {
"'--filter-dupes' needs to be disabled when '--op' is 'UPSERT' to
ensure updates are not missed.");
this.props = properties.get();
- LOG.info(toSortedTruncatedString(props));
+ log.info(toSortedTruncatedString(props));
this.schemaProvider = UtilHelpers.wrapSchemaProviderWithPostProcessor(
UtilHelpers.createSchemaProvider(cfg.schemaProviderClassName, props,
hoodieSparkContext.jsc()),
@@ -817,7 +819,7 @@ public class HoodieStreamer implements Serializable {
boolean error = false;
if (cfg.isAsyncCompactionEnabled()) {
// set Scheduler Pool.
- LOG.info("Setting Spark Pool name for delta-sync to " +
STREAMSYNC_POOL_NAME);
+ log.info("Setting Spark Pool name for delta-sync to {}",
STREAMSYNC_POOL_NAME);
hoodieSparkContext.setProperty(EngineProperty.DELTASYNC_POOL_NAME,
STREAMSYNC_POOL_NAME);
}
@@ -834,14 +836,14 @@ public class HoodieStreamer implements Serializable {
if (newProps.isPresent()) {
this.props = newProps.get();
// reinit the DeltaSync only when the props updated
- LOG.info("Re-init delta sync with new config properties:");
- LOG.info(toSortedTruncatedString(props));
+ log.info("Re-init delta sync with new config properties:");
+ log.info(toSortedTruncatedString(props));
reInitDeltaSync();
}
}
Option<Pair<Option<String>, JavaRDD<WriteStatus>>>
scheduledCompactionInstantAndRDD = Option.ofNullable(streamSync.syncOnce());
if (scheduledCompactionInstantAndRDD.isPresent() &&
scheduledCompactionInstantAndRDD.get().getLeft().isPresent()) {
- LOG.info("Enqueuing new pending compaction instant (" +
scheduledCompactionInstantAndRDD.get().getLeft() + ")");
+ log.info("Enqueuing new pending compaction instant ({})",
scheduledCompactionInstantAndRDD.get().getLeft());
asyncCompactService.get().enqueuePendingAsyncServiceInstant(scheduledCompactionInstantAndRDD.get().getLeft().get());
asyncCompactService.get().waitTillPendingAsyncServiceInstantsReducesTo(cfg.maxPendingCompactions);
if (asyncCompactService.get().hasError()) {
@@ -852,7 +854,7 @@ public class HoodieStreamer implements Serializable {
if (clusteringConfig.isAsyncClusteringEnabled()) {
Option<String> clusteringInstant =
streamSync.getClusteringInstantOpt();
if (clusteringInstant.isPresent()) {
- LOG.info("Scheduled async clustering for instant: " +
clusteringInstant.get());
+ log.info("Scheduled async clustering for instant: {}",
clusteringInstant.get());
asyncClusteringService.get().enqueuePendingAsyncServiceInstant(clusteringInstant.get());
asyncClusteringService.get().waitTillPendingAsyncServiceInstantsReducesTo(cfg.maxPendingClustering);
if (asyncClusteringService.get().hasError()) {
@@ -865,7 +867,7 @@ public class HoodieStreamer implements Serializable {
Option<HoodieData<WriteStatus>> lastWriteStatuses =
Option.ofNullable(
scheduledCompactionInstantAndRDD.isPresent() ?
HoodieJavaRDD.of(scheduledCompactionInstantAndRDD.get().getRight()) : null);
if (requestShutdownIfNeeded(lastWriteStatuses)) {
- LOG.info("Closing and shutting down ingestion service");
+ log.info("Closing and shutting down ingestion service");
error = true;
onIngestionCompletes(false);
shutdown(true);
@@ -875,7 +877,7 @@ public class HoodieStreamer implements Serializable {
} catch (HoodieUpsertException ue) {
handleUpsertException(ue);
} catch (Exception e) {
- LOG.error("Shutting down delta-sync due to exception", e);
+ log.error("Shutting down delta-sync due to exception", e);
error = true;
throw new HoodieException(e.getMessage(), e);
}
@@ -890,7 +892,7 @@ public class HoodieStreamer implements Serializable {
private void handleUpsertException(HoodieUpsertException ue) {
if (ue.getCause() instanceof HoodieClusteringUpdateException) {
- LOG.warn("Write rejected due to conflicts with pending clustering
operation. Going to retry after 1 min with the hope "
+ log.warn("Write rejected due to conflicts with pending clustering
operation. Going to retry after 1 min with the hope "
+ "that clustering will complete by then.", ue);
try {
Thread.sleep(60000); // Intentionally not using
cfg.minSyncIntervalSeconds, since it could be too high or it could be 0.
@@ -907,13 +909,13 @@ public class HoodieStreamer implements Serializable {
* Shutdown async services like compaction/clustering as DeltaSync is
shutdown.
*/
private void shutdownAsyncServices(boolean error) {
- LOG.info("Delta Sync shutdown. Error ?{}", error);
+ log.info("Delta Sync shutdown. Error ?{}", error);
if (asyncCompactService.isPresent()) {
- LOG.info("Gracefully shutting down compactor");
+ log.info("Gracefully shutting down compactor");
asyncCompactService.get().shutdown(false);
}
if (asyncClusteringService.isPresent()) {
- LOG.info("Gracefully shutting down clustering service");
+ log.info("Gracefully shutting down clustering service");
asyncClusteringService.get().shutdown(false);
}
}
@@ -979,7 +981,7 @@ public class HoodieStreamer implements Serializable {
.setBasePath(cfg.targetBasePath)
.setLoadActiveTimelineOnLoad(true).build();
List<HoodieInstant> pending =
ClusteringUtils.getPendingClusteringInstantTimes(meta);
- LOG.info(format("Found %d pending clustering instants ",
pending.size()));
+ log.info("Found {} pending clustering instants ", pending.size());
pending.forEach(hoodieInstant ->
asyncClusteringService.get().enqueuePendingAsyncServiceInstant(hoodieInstant.requestedTime()));
asyncClusteringService.get().start(error -> true);
try {
@@ -997,7 +999,7 @@ public class HoodieStreamer implements Serializable {
@Override
protected boolean onIngestionCompletes(boolean hasError) {
- LOG.info("Ingestion completed. Has error: " + hasError);
+ log.info("Ingestion completed. Has error: {}", hasError);
close();
return true;
}
@@ -1014,18 +1016,6 @@ public class HoodieStreamer implements Serializable {
}
}
- public SchemaProvider getSchemaProvider() {
- return schemaProvider;
- }
-
- public SparkSession getSparkSession() {
- return sparkSession;
- }
-
- public TypedProperties getProps() {
- return props;
- }
-
@VisibleForTesting
public HoodieSparkEngineContext getHoodieSparkContext() {
return hoodieSparkContext;
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamerUtils.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamerUtils.java
index 12e620156c49..9e455795e592 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamerUtils.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamerUtils.java
@@ -54,6 +54,7 @@ import
org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory;
import org.apache.hudi.util.SparkKeyGenUtils;
import org.apache.hudi.utilities.schema.SchemaProvider;
+import lombok.extern.slf4j.Slf4j;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
@@ -64,8 +65,6 @@ import org.apache.spark.sql.HoodieInternalRowUtils;
import org.apache.spark.sql.avro.HoodieAvroDeserializer;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.types.StructType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.util.Arrays;
import java.util.Iterator;
@@ -79,10 +78,9 @@ import static
org.apache.hudi.config.HoodieErrorTableConfig.ERROR_ENABLE_VALIDAT
/**
* Util class for HoodieStreamer.
*/
+@Slf4j
public class HoodieStreamerUtils {
- private static final Logger LOG =
LoggerFactory.getLogger(HoodieStreamerUtils.class);
-
/**
* Generates HoodieRecords for the avro data read from source.
* Takes care of dropping columns, precombine, auto key generation.
@@ -111,7 +109,7 @@ public class HoodieStreamerUtils {
records = avroRDD.mapPartitions(
(FlatMapFunction<Iterator<GenericRecord>,
Either<HoodieRecord,String>>) genericRecordIterator -> {
TaskContext taskContext = TaskContext.get();
- LOG.info("Creating HoodieRecords with stageId : {}, stage
attempt no: {}, taskId : {}, task attempt no : {}, task attempt id : {} ",
+ log.info("Creating HoodieRecords with stageId : {}, stage
attempt no: {}, taskId : {}, task attempt no : {}, task attempt id : {} ",
taskContext.stageId(), taskContext.stageAttemptNumber(),
taskContext.partitionId(), taskContext.attemptNumber(),
taskContext.taskAttemptId());
if (autoGenerateRecordKeys) {
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/NoNewDataTerminationStrategy.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/NoNewDataTerminationStrategy.java
index 686bdb52e3c7..08e4cf25dcbc 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/NoNewDataTerminationStrategy.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/NoNewDataTerminationStrategy.java
@@ -23,17 +23,15 @@ import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.Option;
+import lombok.extern.slf4j.Slf4j;
import org.apache.spark.api.java.JavaRDD;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* Post writer termination strategy for deltastreamer in continuous mode. This
strategy is based on no new data for consecutive number of times.
*/
+@Slf4j
public class NoNewDataTerminationStrategy implements
PostWriteTerminationStrategy {
- private static final Logger LOG =
LoggerFactory.getLogger(NoNewDataTerminationStrategy.class);
-
public static final String MAX_ROUNDS_WITHOUT_NEW_DATA_TO_SHUTDOWN =
"max.rounds.without.new.data.to.shutdown";
public static final int DEFAULT_MAX_ROUNDS_WITHOUT_NEW_DATA_TO_SHUTDOWN = 3;
@@ -48,7 +46,7 @@ public class NoNewDataTerminationStrategy implements
PostWriteTerminationStrateg
public boolean shouldShutdown(Option<JavaRDD<WriteStatus>> writeStatuses) {
numTimesNoNewData = writeStatuses.isPresent() ? 0 : numTimesNoNewData + 1;
if (numTimesNoNewData >= numTimesNoNewDataToShutdown) {
- LOG.info("Shutting down on continuous mode as there is no new data for "
+ numTimesNoNewData);
+ log.info("Shutting down on continuous mode as there is no new data for
{}", numTimesNoNewData);
return true;
}
return false;
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SchedulerConfGenerator.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SchedulerConfGenerator.java
index 19df192aad8e..5e1c6ff2692e 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SchedulerConfGenerator.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SchedulerConfGenerator.java
@@ -24,9 +24,8 @@ import org.apache.hudi.async.AsyncCompactService;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.util.Option;
+import lombok.extern.slf4j.Slf4j;
import org.apache.spark.SparkConf;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.BufferedWriter;
import java.io.File;
@@ -42,10 +41,9 @@ import static
org.apache.hudi.async.AsyncClusteringService.CLUSTERING_POOL_NAME;
* Utility Class to generate Spark Scheduling allocation file. This kicks in
only when user sets
* spark.scheduler.mode=FAIR at spark-submit time
*/
+@Slf4j
public class SchedulerConfGenerator {
- private static final Logger LOG =
LoggerFactory.getLogger(SchedulerConfGenerator.class);
-
public static final String DELTASYNC_POOL_NAME =
HoodieStreamer.STREAMSYNC_POOL_NAME;
public static final String COMPACT_POOL_NAME =
AsyncCompactService.COMPACT_POOL_NAME;
public static final String SPARK_SCHEDULER_MODE_KEY = "spark.scheduler.mode";
@@ -106,10 +104,10 @@ public class SchedulerConfGenerator {
String sparkSchedulingConfFile =
generateAndStoreConfig(cfg.deltaSyncSchedulingWeight,
cfg.compactSchedulingWeight, cfg.deltaSyncSchedulingMinShare,
cfg.compactSchedulingMinShare,
cfg.clusterSchedulingWeight, cfg.clusterSchedulingMinShare);
- LOG.info("Spark scheduling config file {}", sparkSchedulingConfFile);
+ log.info("Spark scheduling config file {}", sparkSchedulingConfFile);
additionalSparkConfigs.put(SparkConfigs.SPARK_SCHEDULER_ALLOCATION_FILE_KEY(),
sparkSchedulingConfFile);
} else {
- LOG.warn("Job Scheduling Configs will not be in effect as
spark.scheduler.mode "
+ log.warn("Job Scheduling Configs will not be in effect as
spark.scheduler.mode "
+ "is not set to FAIR at instantiation time. Continuing without
scheduling configs");
}
return additionalSparkConfigs;
@@ -135,7 +133,7 @@ public class SchedulerConfGenerator {
}
// SPARK-35083 introduces remote scheduler pool files, so the file must
include scheme since Spark 3.2
String path = tempConfigFile.toURI().toString();
- LOG.info("Configs written to file " + path);
+ log.info("Configs written to file {}", path);
return path;
}
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SourceFormatAdapter.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SourceFormatAdapter.java
index 763caf8f54fe..0d63ffd6c0f4 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SourceFormatAdapter.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SourceFormatAdapter.java
@@ -43,6 +43,8 @@ import org.apache.hudi.utilities.sources.helpers.RowConverter;
import org.apache.hudi.utilities.sources.helpers.SanitizationUtils;
import com.google.protobuf.Message;
+import lombok.AccessLevel;
+import lombok.Getter;
import org.apache.avro.generic.GenericRecord;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Column;
@@ -73,12 +75,15 @@ import static
org.apache.hudi.utilities.streamer.BaseErrorTableWriter.ERROR_TABL
*/
public class SourceFormatAdapter implements Closeable {
+ @Getter
private final Source source;
private boolean shouldSanitize = SANITIZE_SCHEMA_FIELD_NAMES.defaultValue();
private boolean wrapWithException =
ROW_THROW_EXPLICIT_EXCEPTIONS.defaultValue();
+ @Getter(AccessLevel.PRIVATE)
private String invalidCharMask =
SCHEMA_FIELD_NAME_INVALID_CHAR_MASK.defaultValue();
+ @Getter(AccessLevel.PRIVATE)
private boolean useJava8api = (boolean)
SQLConf.DATETIME_JAVA8API_ENABLED().defaultValue().get();
@@ -110,18 +115,6 @@ public class SourceFormatAdapter implements Closeable {
return shouldSanitize;
}
- /**
- * Replacement mask for invalid characters encountered in avro names.
- * @return sanitized value.
- */
- private String getInvalidCharMask() {
- return invalidCharMask;
- }
-
- private boolean getUseJava8api() {
- return useJava8api;
- }
-
/**
* transform input rdd of json string to generic records with support for
adding error events to error table
* @param inputBatch
@@ -144,7 +137,7 @@ public class SourceFormatAdapter implements Closeable {
private JavaRDD<Row> transformJsonToRowRdd(InputBatch<JavaRDD<String>>
inputBatch) {
MercifulJsonConverter.clearCache(inputBatch.getSchemaProvider().getSourceHoodieSchema().getFullName());
- RowConverter convertor = new
RowConverter(inputBatch.getSchemaProvider().getSourceHoodieSchema(),
isFieldNameSanitizingEnabled(), getInvalidCharMask(), getUseJava8api());
+ RowConverter convertor = new
RowConverter(inputBatch.getSchemaProvider().getSourceHoodieSchema(),
isFieldNameSanitizingEnabled(), getInvalidCharMask(), isUseJava8api());
return inputBatch.getBatch().map(rdd -> {
if (errorTableWriter.isPresent()) {
JavaRDD<Either<Row, String>> javaRDD =
rdd.map(convertor::fromJsonToRowWithError);
@@ -323,10 +316,6 @@ public class SourceFormatAdapter implements Closeable {
}
}
- public Source getSource() {
- return source;
- }
-
@Override
public void close() {
source.releaseResources();
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SparkSampleWritesUtils.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SparkSampleWritesUtils.java
index 05aeca28067b..c50ff3484cad 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SparkSampleWritesUtils.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SparkSampleWritesUtils.java
@@ -36,11 +36,10 @@ import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StoragePath;
+import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.fs.FileSystem;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.List;
@@ -58,31 +57,30 @@ import static
org.apache.hudi.utilities.config.HoodieStreamerConfig.SAMPLE_WRITE
* <p>
* TODO handle sample_writes sub-path clean-up w.r.t. rollback and insert
overwrite. (HUDI-6044)
*/
+@Slf4j
public class SparkSampleWritesUtils {
- private static final Logger LOG =
LoggerFactory.getLogger(SparkSampleWritesUtils.class);
-
public static Option<HoodieWriteConfig>
getWriteConfigWithRecordSizeEstimate(JavaSparkContext jsc,
Option<JavaRDD<HoodieRecord>> recordsOpt, HoodieWriteConfig writeConfig) {
if (!writeConfig.getBoolean(SAMPLE_WRITES_ENABLED)) {
- LOG.debug("Skip overwriting record size estimate as it's disabled.");
+ log.debug("Skip overwriting record size estimate as it's disabled.");
return Option.empty();
}
HoodieTableMetaClient metaClient = getMetaClient(jsc,
writeConfig.getBasePath());
if (metaClient.isTimelineNonEmpty()) {
- LOG.info("Skip overwriting record size estimate due to timeline is
non-empty.");
+ log.info("Skip overwriting record size estimate due to timeline is
non-empty.");
return Option.empty();
}
try {
Pair<Boolean, String> result = doSampleWrites(jsc, recordsOpt,
writeConfig);
if (result.getLeft()) {
long avgSize = getAvgSizeFromSampleWrites(jsc, result.getRight());
- LOG.info("Overwriting record size estimate to {}", avgSize);
+ log.info("Overwriting record size estimate to {}", avgSize);
TypedProperties props = writeConfig.getProps();
props.put(COPY_ON_WRITE_RECORD_SIZE_ESTIMATE.key(),
String.valueOf(avgSize));
return
Option.of(HoodieWriteConfig.newBuilder().withProperties(props).build());
}
} catch (IOException e) {
- LOG.error(String.format("Not overwriting record size estimate for table
%s due to error when doing sample writes.", writeConfig.getTableName()), e);
+ log.error("Not overwriting record size estimate for table {} due to
error when doing sample writes.", writeConfig.getTableName(), e);
}
return Option.empty();
}
@@ -117,13 +115,13 @@ public class SparkSampleWritesUtils {
String instantTime = sampleWriteClient.startCommit();
JavaRDD<WriteStatus> writeStatusRDD =
sampleWriteClient.bulkInsert(jsc.parallelize(samples, 1), instantTime);
if (writeStatusRDD.filter(WriteStatus::hasErrors).count() > 0) {
- LOG.error("sample writes for table {} failed with errors.",
writeConfig.getTableName());
- if (LOG.isTraceEnabled()) {
- LOG.trace("Printing out the top 100 errors");
+ log.error("sample writes for table {} failed with errors.",
writeConfig.getTableName());
+ if (log.isTraceEnabled()) {
+ log.trace("Printing out the top 100 errors");
writeStatusRDD.filter(WriteStatus::hasErrors).take(100).forEach(ws
-> {
- LOG.trace("Global error :", ws.getGlobalError());
+ log.trace("Global error :", ws.getGlobalError());
ws.getErrors().forEach((key, throwable) ->
- LOG.trace(String.format("Error for key: %s", key),
throwable));
+ log.trace("Error for key: {}", key, throwable));
});
}
return emptyRes;
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
index 9811fe9b0178..1af9c503b613 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
@@ -112,11 +112,13 @@ import org.apache.hudi.utilities.schema.SchemaSet;
import org.apache.hudi.utilities.schema.SimpleSchemaProvider;
import org.apache.hudi.utilities.sources.InputBatch;
import org.apache.hudi.utilities.sources.Source;
-import org.apache.hudi.utilities.streamer.HoodieStreamer.Config;
import
org.apache.hudi.utilities.streamer.validator.SparkStreamerValidatorUtils;
import org.apache.hudi.utilities.transform.Transformer;
import com.codahale.metrics.Timer;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.extern.slf4j.Slf4j;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@@ -128,8 +130,6 @@ import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.storage.StorageLevel;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.io.IOException;
@@ -177,16 +177,17 @@ import static
org.apache.hudi.utilities.streamer.StreamerCheckpointUtils.getLate
/**
* Sync's one batch of data to hoodie table.
*/
+@Slf4j
public class StreamSync implements Serializable, Closeable {
private static final long serialVersionUID = 1L;
- private static final Logger LOG = LoggerFactory.getLogger(StreamSync.class);
private static final String NULL_PLACEHOLDER = "[null]";
public static final String CHECKPOINT_IGNORE_KEY =
"deltastreamer.checkpoint.ignore_key";
/**
* Delta Sync Config.
*/
+ @Getter
private final HoodieStreamer.Config cfg;
/**
@@ -214,6 +215,7 @@ public class StreamSync implements Serializable, Closeable {
/**
* Filesystem used.
*/
+ @Getter
private transient HoodieStorage storage;
/**
@@ -236,6 +238,7 @@ public class StreamSync implements Serializable, Closeable {
*
* NOTE: These properties are already consolidated w/ CLI provided
config-overrides
*/
+ @Getter
private final TypedProperties props;
/**
@@ -246,6 +249,7 @@ public class StreamSync implements Serializable, Closeable {
/**
* Timeline with completed commits, including both .commit and .deltacommit.
*/
+ @Getter
private transient Option<HoodieTimeline> commitsTimelineOpt;
// all commits timeline, including all (commits, delta commits, compaction,
clean, savepoint, rollback, replace commits, index)
@@ -270,6 +274,7 @@ public class StreamSync implements Serializable, Closeable {
private Option<BaseErrorTableWriter> errorTableWriter = Option.empty();
private HoodieErrorTableConfig.ErrorWriteFailureStrategy
errorWriteFailureStrategy;
+ @Getter
private transient HoodieIngestionMetrics metrics;
private transient HoodieMetrics hoodieMetrics;
@@ -394,7 +399,7 @@ public class StreamSync implements Serializable, Closeable {
}
return metaClient;
} catch (HoodieIOException e) {
- LOG.warn("Full exception msg {}", e.getMessage());
+ log.warn("Full exception msg {}", e.getMessage());
if (e.getMessage().contains("Could not load Hoodie properties") &&
e.getMessage().contains(HoodieTableConfig.HOODIE_PROPERTIES_FILE)) {
String basePathWithForwardSlash = cfg.targetBasePath.endsWith("/") ?
cfg.targetBasePath :
String.format("%s/", cfg.targetBasePath);
@@ -409,7 +414,7 @@ public class StreamSync implements Serializable, Closeable {
&& storage.exists(new StoragePath(pathToHoodiePropsBackup));
if (!hoodiePropertiesExists) {
- LOG.warn("Base path exists, but table is not fully initialized.
Re-initializing again");
+ log.warn("Base path exists, but table is not fully initialized.
Re-initializing again");
HoodieTableMetaClient metaClientToValidate =
initializeEmptyTable();
// reload the timeline from metaClient and validate that its empty
table. If there are any instants found, then we should fail the pipeline, bcoz
hoodie.properties got deleted by mistake.
if (metaClientToValidate.reloadActiveTimeline().countInstants() >
0) {
@@ -552,7 +557,7 @@ public class StreamSync implements Serializable, Closeable {
|| (newTargetSchema != null &&
!processedSchema.isSchemaPresent(newTargetSchema))) {
String sourceStr = newSourceSchema == null ? NULL_PLACEHOLDER :
newSourceSchema.toString(true);
String targetStr = newTargetSchema == null ? NULL_PLACEHOLDER :
newTargetSchema.toString(true);
- LOG.info("Seeing new schema. Source: {}, Target: {}", sourceStr,
targetStr);
+ log.info("Seeing new schema. Source: {}, Target: {}", sourceStr,
targetStr);
// We need to recreate write client with new schema and register them.
reInitWriteClient(newSourceSchema, newTargetSchema,
inputBatch.getBatch(), metaClient);
if (newSourceSchema != null) {
@@ -607,7 +612,7 @@ public class StreamSync implements Serializable, Closeable {
public Pair<InputBatch, Boolean> readFromSource(HoodieTableMetaClient
metaClient) throws IOException {
// Retrieve the previous round checkpoints, if any
Option<Checkpoint> checkpointToResume =
StreamerCheckpointUtils.resolveCheckpointToResumeFrom(commitsTimelineOpt, cfg,
props, metaClient);
- LOG.info("Checkpoint to resume from : {}", checkpointToResume);
+ log.info("Checkpoint to resume from : {}", checkpointToResume);
int maxRetryCount = cfg.retryOnSourceFailures ? cfg.maxRetryCount : 1;
int curRetryCount = 0;
@@ -620,11 +625,11 @@ public class StreamSync implements Serializable,
Closeable {
throw e;
}
try {
- LOG.error("Exception thrown while fetching data from source. Msg : "
+ e.getMessage() + ", class : " + e.getClass() + ", cause : " + e.getCause());
- LOG.error("Sleeping for " + (cfg.retryIntervalSecs) + " before
retrying again. Current retry count " + curRetryCount + ", max retry count " +
cfg.maxRetryCount);
+ log.error("Exception thrown while fetching data from source. Msg :
{}, class : {}, cause : {}", e.getMessage(), e.getClass(), e.getCause());
+ log.error("Sleeping for {} before retrying again. Current retry
count {}, max retry count {}", cfg.retryIntervalSecs, curRetryCount,
cfg.maxRetryCount);
Thread.sleep(cfg.retryIntervalSecs * 1000);
} catch (InterruptedException ex) {
- LOG.error("Ignoring InterruptedException while waiting to retry on
source failure " + e.getMessage());
+ log.error("Ignoring InterruptedException while waiting to retry on
source failure {}", e.getMessage());
}
}
}
@@ -648,8 +653,8 @@ public class StreamSync implements Serializable, Closeable {
// handle no new data and no change in checkpoint
if (!cfg.allowCommitOnNoCheckpointChange &&
checkpoint.equals(resumeCheckpoint.orElse(null))) {
- LOG.info("No new data, source checkpoint has not changed. Nothing to
commit. Old checkpoint=("
- + resumeCheckpoint + "). New Checkpoint=(" + checkpoint + ")");
+ log.info("No new data, source checkpoint has not changed. Nothing to
commit. Old checkpoint=({}). New Checkpoint=({})",
+ resumeCheckpoint, checkpoint);
String commitActionType = CommitUtils.getCommitActionType(cfg.operation,
HoodieTableType.valueOf(cfg.tableType));
hoodieMetrics.updateMetricsForEmptyData(commitActionType);
return null;
@@ -756,7 +761,7 @@ public class StreamSync implements Serializable, Closeable {
inputBatchForWriter = new
InputBatch<>(inputBatchNeedsDeduceSchema.getBatch(),
inputBatchNeedsDeduceSchema.getCheckpointForNextBatch(),
getDeducedSchemaProvider(inputBatchNeedsDeduceSchema.getSchemaProvider().getTargetHoodieSchema(),
inputBatchNeedsDeduceSchema.getSchemaProvider(), metaClient));
} else {
- LOG.warn("Row-writer is enabled but cannot be used due to the target
schema");
+ log.warn("Row-writer is enabled but cannot be used due to the target
schema");
}
}
// if row writer was enabled but the target schema prevents us from
using it, do not use the row writer
@@ -913,7 +918,7 @@ public class StreamSync implements Serializable, Closeable {
writeClient.rollback(instantTime);
throw new HoodieStreamerWriteException("Error table commit
failed for instant " + instantTime);
case LOG_ERROR:
- LOG.error("Error table write failed for instant {}",
instantTime);
+ log.error("Error table write failed for instant {}",
instantTime);
break;
default:
throw new HoodieStreamerWriteException("Write failure strategy
not implemented for " + errorWriteFailureStrategy);
@@ -933,7 +938,7 @@ public class StreamSync implements Serializable, Closeable {
SparkStreamerValidatorUtils.runValidators(props, instantTime,
writeStatuses,
checkpointCommitMetadata, metaClient);
} catch (HoodieValidationException e) {
- LOG.error("Pre-commit validators failed for instant {}",
instantTime, e);
+ log.error("Pre-commit validators failed for instant {}",
instantTime, e);
writeClient.rollback(instantTime);
throw new HoodieStreamerWriteException("Pre-commit validators
failed for instant " + instantTime, e);
}
@@ -943,11 +948,11 @@ public class StreamSync implements Serializable,
Closeable {
SuccessfulRecordCounter.Counts counts =
SuccessfulRecordCounter.compute(
writeStatuses, errorTableWriteStatusRDDOpt,
isErrorTableWriteUnificationEnabled);
totalSuccessfulRecords.set(counts.getTotalSuccessfulRecords());
- LOG.info("instantTime={}, totalRecords={}, totalErrorRecords={},
totalSuccessfulRecords={}",
+ log.info("instantTime={}, totalRecords={}, totalErrorRecords={},
totalSuccessfulRecords={}",
instantTime, counts.getTotalRecords(),
counts.getTotalErrorRecords(),
counts.getTotalSuccessfulRecords());
if (counts.getTotalRecords() == 0) {
- LOG.info("No new data, perform empty commit.");
+ log.info("No new data, perform empty commit.");
}
// Step 4: Apply the legacy HSWSV write-error gate.
@@ -960,10 +965,10 @@ public class StreamSync implements Serializable,
Closeable {
// failure-policy story.
if (counts.hasErrors()) {
if (cfg.commitOnErrors) {
- LOG.warn("Some records failed to be merged but forcing commit
since commitOnErrors set. Errors/Total={}/{}",
+ log.warn("Some records failed to be merged but forcing commit
since commitOnErrors set. Errors/Total={}/{}",
counts.getTotalErrorRecords(), counts.getTotalRecords());
} else {
- LOG.error("Delta Sync found errors when writing.
Errors/Total={}/{}",
+ log.error("Delta Sync found errors when writing.
Errors/Total={}/{}",
counts.getTotalErrorRecords(), counts.getTotalRecords());
WriteErrorReporter.logTopErrors(writeStatuses);
// Roll back the inflight data-table instant so it doesn't leak
under LAZY
@@ -983,7 +988,7 @@ public class StreamSync implements Serializable, Closeable {
}
releaseResourcesInvoked = true;
if (success) {
- LOG.info("Commit " + instantTime + " successful!");
+ log.info("Commit {} successful!", instantTime);
this.formatAdapter.getSource().onCommit(inputBatch.getCheckpointForNextBatch()
!= null
? inputBatch.getCheckpointForNextBatch().getCheckpointKey() :
null);
// Schedule compaction if needed
@@ -994,10 +999,10 @@ public class StreamSync implements Serializable,
Closeable {
if ((totalSuccessfulRecords.get() > 0) || cfg.forceEmptyMetaSync) {
runMetaSync();
} else {
- LOG.info(String.format("Not running metaSync
totalSuccessfulRecords=%d", totalSuccessfulRecords.get()));
+ log.info("Not running metaSync totalSuccessfulRecords={}",
totalSuccessfulRecords.get());
}
} else {
- LOG.info("Commit " + instantTime + " failed!");
+ log.info("Commit {} failed!", instantTime);
throw new HoodieStreamerWriteException("Commit " + instantTime + "
failed!");
}
@@ -1051,7 +1056,7 @@ public class StreamSync implements Serializable,
Closeable {
if (!retryEnabled) {
throw ie;
}
- LOG.error("Got error trying to start a new commit. Retrying after
sleeping for a sec", ie);
+ log.error("Got error trying to start a new commit. Retrying after
sleeping for a sec", ie);
retryNum++;
try {
Thread.sleep(1000);
@@ -1131,10 +1136,10 @@ public class StreamSync implements Serializable,
Closeable {
if (cfg.enableHiveSync) {
cfg.enableMetaSync = true;
syncClientToolClasses.add(HiveSyncTool.class.getName());
- LOG.info("When set --enable-hive-sync will use HiveSyncTool for backward
compatibility");
+ log.info("When set --enable-hive-sync will use HiveSyncTool for backward
compatibility");
}
if (cfg.enableMetaSync && !syncClientToolClasses.isEmpty()) {
- LOG.debug("[MetaSync] Starting sync");
+ log.debug("[MetaSync] Starting sync");
HoodieTableMetaClient metaClient;
try {
metaClient = initializeMetaClient();
@@ -1158,7 +1163,7 @@ public class StreamSync implements Serializable,
Closeable {
Map<String, HoodieException> failedMetaSyncs = new HashMap<>();
for (String impl : syncClientToolClasses) {
if (impl.trim().isEmpty()) {
- LOG.warn("Cannot run MetaSync with empty class name");
+ log.warn("Cannot run MetaSync with empty class name");
continue;
}
@@ -1183,10 +1188,10 @@ public class StreamSync implements Serializable,
Closeable {
long timeMs = metaSyncTimeNanos / 1000000L;
String timeString = String.format("and took %d s %d ms ", timeMs / 1000L,
timeMs % 1000L);
if (metaSyncException.isPresent()) {
- LOG.error("[MetaSync] SyncTool class {} failed with exception {} {}",
impl.trim(), metaSyncException.get(), timeString);
+ log.error("[MetaSync] SyncTool class {} failed with exception {} {}",
impl.trim(), metaSyncException.get(), timeString);
failedMetaSyncs.put(impl, metaSyncException.get());
} else {
- LOG.info("[MetaSync] SyncTool class {} completed successfully {}",
impl.trim(), timeString);
+ log.info("[MetaSync] SyncTool class {} completed successfully {}",
impl.trim(), timeString);
}
}
@@ -1204,7 +1209,7 @@ public class StreamSync implements Serializable,
Closeable {
}
private void reInitWriteClient(HoodieSchema sourceSchema, HoodieSchema
targetSchema, Option<JavaRDD<HoodieRecord>> recordsOpt, HoodieTableMetaClient
metaClient) throws IOException {
- LOG.info("Setting up new Hoodie Write Client");
+ log.info("Setting up new Hoodie Write Client");
if (HoodieStreamerUtils.isDropPartitionColumns(props)) {
targetSchema =
org.apache.hudi.common.schema.HoodieSchemaUtils.removeFields(targetSchema,
HoodieStreamerUtils.getPartitionColumns(props));
}
@@ -1350,7 +1355,7 @@ public class StreamSync implements Serializable,
Closeable {
if (tableSchema.isPresent()) {
newWriteSchema = tableSchema.get();
} else {
- LOG.warn("Could not fetch schema from table. Falling back to using
target schema from schema provider");
+ log.warn("Could not fetch schema from table. Falling back to using
target schema from schema provider");
}
}
}
@@ -1376,7 +1381,7 @@ public class StreamSync implements Serializable,
Closeable {
schemas.add(targetSchema);
}
if (!schemas.isEmpty()) {
- LOG.debug("Registering Schema: {}", schemas);
+ log.debug("Registering Schema: {}", schemas);
// Use the underlying spark context in case the java context is changed
during runtime
hoodieSparkContext.getJavaSparkContext().sc().getConf()
.registerAvroSchemas(JavaScalaConverters.convertJavaListToScalaList(schemas.stream().map(HoodieSchema::toAvroSchema).collect(Collectors.toList())).toList());
@@ -1397,7 +1402,7 @@ public class StreamSync implements Serializable,
Closeable {
formatAdapter.close();
}
- LOG.info("Shutting down embedded timeline server");
+ log.info("Shutting down embedded timeline server");
if (embeddedTimelineService.isPresent()) {
embeddedTimelineService.get().stopForBasePath(cfg.targetBasePath);
}
@@ -1408,26 +1413,6 @@ public class StreamSync implements Serializable,
Closeable {
}
- public HoodieStorage getStorage() {
- return storage;
- }
-
- public TypedProperties getProps() {
- return props;
- }
-
- public Config getCfg() {
- return cfg;
- }
-
- public Option<HoodieTimeline> getCommitsTimelineOpt() {
- return commitsTimelineOpt;
- }
-
- public HoodieIngestionMetrics getMetrics() {
- return metrics;
- }
-
/**
* Schedule clustering.
* Called from {@link HoodieStreamer} when async clustering is enabled.
@@ -1454,25 +1439,16 @@ public class StreamSync implements Serializable,
Closeable {
}
}
+ @Getter
class WriteClientWriteResult {
+
+ @Setter
private Map<String, List<String>> partitionToReplacedFileIds =
Collections.emptyMap();
private final JavaRDD<WriteStatus> writeStatusRDD;
public WriteClientWriteResult(JavaRDD<WriteStatus> writeStatusRDD) {
this.writeStatusRDD = writeStatusRDD;
}
-
- public Map<String, List<String>> getPartitionToReplacedFileIds() {
- return partitionToReplacedFileIds;
- }
-
- public void setPartitionToReplacedFileIds(Map<String, List<String>>
partitionToReplacedFileIds) {
- this.partitionToReplacedFileIds = partitionToReplacedFileIds;
- }
-
- public JavaRDD<WriteStatus> getWriteStatusRDD() {
- return writeStatusRDD;
- }
}
/**
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamerCheckpointUtils.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamerCheckpointUtils.java
index 560d9fd2172a..323a2b95149d 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamerCheckpointUtils.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamerCheckpointUtils.java
@@ -40,8 +40,7 @@ import
org.apache.hudi.exception.HoodieUpgradeDowngradeException;
import org.apache.hudi.utilities.config.KafkaSourceConfig;
import org.apache.hudi.utilities.exception.HoodieStreamerException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
@@ -54,8 +53,8 @@ import static
org.apache.hudi.common.table.timeline.InstantComparison.compareTim
import static org.apache.hudi.common.util.ConfigUtils.removeConfigFromProps;
import static
org.apache.hudi.table.upgrade.UpgradeDowngrade.needsUpgradeOrDowngrade;
+@Slf4j
public class StreamerCheckpointUtils {
- private static final Logger LOG =
LoggerFactory.getLogger(StreamerCheckpointUtils.class);
/**
* The first phase of checkpoint resolution - read the checkpoint configs
from 2 sources and resolve
@@ -116,7 +115,7 @@ public class StreamerCheckpointUtils {
private static Option<Checkpoint> useCkpFromOverrideConfigIfAny(
HoodieStreamer.Config streamerConfig, TypedProperties props,
Option<Checkpoint> checkpoint) {
- LOG.debug("Checkpoint from config: {}", streamerConfig.checkpoint);
+ log.debug("Checkpoint from config: {}", streamerConfig.checkpoint);
if (!checkpoint.isPresent() && streamerConfig.checkpoint != null) {
int writeTableVersion = ConfigUtils.getIntWithAltKeys(props,
HoodieWriteConfig.WRITE_TABLE_VERSION);
checkpoint =
Option.of(buildCheckpointFromConfigOverride(streamerConfig.sourceClassName,
writeTableVersion, streamerConfig.checkpoint));
@@ -157,7 +156,7 @@ public class StreamerCheckpointUtils {
if (commitMetadataOption.isPresent()) {
HoodieCommitMetadata commitMetadata = commitMetadataOption.get();
Checkpoint checkpointFromCommit =
CheckpointUtils.getCheckpoint(commitMetadata);
- LOG.debug("Checkpoint reset from metadata: {}",
checkpointFromCommit.getCheckpointResetKey());
+ log.debug("Checkpoint reset from metadata: {}",
checkpointFromCommit.getCheckpointResetKey());
if (ignoreCkpCfgPrevailsOverCkpFromPrevCommit(streamerConfig,
checkpointFromCommit)) {
// we ignore any existing checkpoint and start ingesting afresh
resumeCheckpoint = Option.empty();
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/TableExecutionContext.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/TableExecutionContext.java
index f5f523a5c213..e24269a6afa4 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/TableExecutionContext.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/TableExecutionContext.java
@@ -21,66 +21,22 @@ package org.apache.hudi.utilities.streamer;
import org.apache.hudi.common.config.TypedProperties;
-import java.util.Objects;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.Setter;
/**
* Wrapper over TableConfig objects.
* Useful for incrementally syncing multiple tables one by one via
HoodieMultiTableStreamer.java class.
*/
+@Getter
+@Setter
+@EqualsAndHashCode
public class TableExecutionContext {
private TypedProperties properties;
+ @EqualsAndHashCode.Exclude
private HoodieStreamer.Config config;
private String database;
private String tableName;
-
- public HoodieStreamer.Config getConfig() {
- return config;
- }
-
- public void setConfig(HoodieStreamer.Config config) {
- this.config = config;
- }
-
- public String getDatabase() {
- return database;
- }
-
- public void setDatabase(String database) {
- this.database = database;
- }
-
- public String getTableName() {
- return tableName;
- }
-
- public void setTableName(String tableName) {
- this.tableName = tableName;
- }
-
- public TypedProperties getProperties() {
- return properties;
- }
-
- public void setProperties(TypedProperties properties) {
- this.properties = properties;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
-
- TableExecutionContext that = (TableExecutionContext) o;
- return Objects.equals(properties, that.properties) &&
Objects.equals(database, that.database) && Objects.equals(tableName,
that.tableName);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(properties, database, tableName);
- }
}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieMetadataTableValidator.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieMetadataTableValidator.java
index 9aebbfd4dc3f..b9fbe74b6d02 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieMetadataTableValidator.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieMetadataTableValidator.java
@@ -68,6 +68,8 @@ import org.apache.hudi.storage.hadoop.HoodieHadoopStorage;
import org.apache.hudi.testutils.HoodieSparkClientTestBase;
import org.apache.hudi.testutils.SparkRDDValidationUtils;
+import lombok.AccessLevel;
+import lombok.Setter;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -1345,6 +1347,7 @@ public class TestHoodieMetadataTableValidator extends
HoodieSparkClientTestBase
}
}
+ @Setter(AccessLevel.PACKAGE)
class MockHoodieMetadataTableValidator extends HoodieMetadataTableValidator {
private List<String> metadataPartitionsToReturn;
@@ -1355,18 +1358,6 @@ public class TestHoodieMetadataTableValidator extends
HoodieSparkClientTestBase
super(jsc, cfg);
}
- void setMetadataPartitionsToReturn(List<String>
metadataPartitionsToReturn) {
- this.metadataPartitionsToReturn = metadataPartitionsToReturn;
- }
-
- void setFsPartitionsToReturn(List<String> fsPartitionsToReturn) {
- this.fsPartitionsToReturn = fsPartitionsToReturn;
- }
-
- void setPartitionCreationTime(Option<String> partitionCreationTime) {
- this.partitionCreationTime = partitionCreationTime;
- }
-
@Override
List<String> getPartitionsFromFileSystem(HoodieEngineContext
engineContext, HoodieTableMetaClient metaClient, HoodieTimeline
completedTimeline) {
return fsPartitionsToReturn;
@@ -1449,6 +1440,7 @@ public class TestHoodieMetadataTableValidator extends
HoodieSparkClientTestBase
/**
* Class to assist with testing a false positive case with RLI validation.
*/
+ @Setter
static class MockHoodieMetadataTableValidatorForRli extends
HoodieMetadataTableValidator {
private String destFilePath;
@@ -1466,14 +1458,6 @@ public class TestHoodieMetadataTableValidator extends
HoodieSparkClientTestBase
new File(destFilePath).renameTo(new File(originalFilePath));
return super.getRecordLocationsFromRLI(sparkEngineContext, basePath,
latestCompletedCommit);
}
-
- public void setDestFilePath(String destFilePath) {
- this.destFilePath = destFilePath;
- }
-
- public void setOriginalFilePath(String originalFilePath) {
- this.originalFilePath = originalFilePath;
- }
}
private String getTempLocation() {
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieRepairTool.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieRepairTool.java
index 0541b429a27f..a2e9b0e15e01 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieRepairTool.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieRepairTool.java
@@ -33,6 +33,7 @@ import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.testutils.providers.SparkProvider;
+import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.fs.Path;
import org.apache.spark.HoodieSparkKryoRegistrar$;
import org.apache.spark.SparkConf;
@@ -45,8 +46,6 @@ import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.provider.Arguments;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.security.SecureRandom;
@@ -67,8 +66,9 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
+@Slf4j
public class TestHoodieRepairTool extends HoodieCommonTestHarness implements
SparkProvider {
- private static final Logger LOG =
LoggerFactory.getLogger(TestHoodieRepairTool.class);
+
// Instant time -> List<Pair<relativePartitionPath, fileId>>
private static final Map<String, List<Pair<String, String>>> BASE_FILE_INFO
= new HashMap<>();
private static final Map<String, List<Pair<String, String>>> LOG_FILE_INFO =
new HashMap<>();
@@ -385,7 +385,7 @@ public class TestHoodieRepairTool extends
HoodieCommonTestHarness implements Spa
storage.create(path, false);
}
} catch (IOException e) {
- LOG.error("Error creating file: " + path);
+ log.error("Error creating file: {}", path);
}
return path.toString();
})
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java
index 7ea5a39af3fa..f38f8db4f48b 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java
@@ -57,6 +57,7 @@ import org.apache.hudi.utilities.streamer.HoodieStreamer;
import org.apache.hudi.utilities.testutils.KafkaTestUtils;
import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
+import lombok.extern.slf4j.Slf4j;
import org.apache.avro.Schema;
import org.apache.hadoop.fs.Path;
import org.apache.kafka.clients.consumer.ConsumerConfig;
@@ -67,8 +68,6 @@ import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
@@ -91,10 +90,9 @@ import static
org.apache.hudi.common.testutils.HoodieTestUtils.createMetaClient;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
+@Slf4j
public class HoodieDeltaStreamerTestBase extends UtilitiesTestBase {
- private static final Logger LOG =
LoggerFactory.getLogger(HoodieDeltaStreamerTestBase.class);
-
static final Random RANDOM = new Random();
static final String PROPS_FILENAME_TEST_SOURCE = "test-source.properties";
static final String PROPS_FILENAME_TEST_SOURCE1 = "test-source1.properties";
@@ -706,7 +704,7 @@ public class HoodieDeltaStreamerTestBase extends
UtilitiesTestBase {
static void assertAtleastNCompactionCommits(int minExpected, String
tablePath) {
HoodieTableMetaClient meta = createMetaClient(storage, tablePath);
HoodieTimeline timeline =
meta.getActiveTimeline().getCommitAndReplaceTimeline().filterCompletedInstants();
- LOG.info("Timeline Instants={}", meta.getActiveTimeline().getInstants());
+ log.info("Timeline Instants={}", meta.getActiveTimeline().getInstants());
int numCompactionCommits = timeline.countInstants();
assertTrue(minExpected <= numCompactionCommits, "Got=" +
numCompactionCommits + ", exp >=" + minExpected);
}
@@ -714,7 +712,7 @@ public class HoodieDeltaStreamerTestBase extends
UtilitiesTestBase {
static void assertAtleastNDeltaCommits(int minExpected, String tablePath) {
HoodieTableMetaClient meta = createMetaClient(storage.getConf(),
tablePath);
HoodieTimeline timeline =
meta.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants();
- LOG.info("Timeline Instants={}", meta.getActiveTimeline().getInstants());
+ log.info("Timeline Instants={}", meta.getActiveTimeline().getInstants());
int numDeltaCommits = timeline.countInstants();
assertTrue(minExpected <= numDeltaCommits, "Got=" + numDeltaCommits + ",
exp >=" + minExpected);
}
@@ -722,7 +720,7 @@ public class HoodieDeltaStreamerTestBase extends
UtilitiesTestBase {
static void assertAtleastNCompactionCommitsAfterCommit(int minExpected,
String lastSuccessfulCommit, String tablePath) {
HoodieTableMetaClient meta = createMetaClient(storage.getConf(),
tablePath);
HoodieTimeline timeline =
meta.getActiveTimeline().getCommitAndReplaceTimeline().findInstantsAfter(lastSuccessfulCommit).filterCompletedInstants();
- LOG.info("Timeline Instants={}", meta.getActiveTimeline().getInstants());
+ log.info("Timeline Instants={}", meta.getActiveTimeline().getInstants());
int numCompactionCommits = timeline.countInstants();
assertTrue(minExpected <= numCompactionCommits, "Got=" +
numCompactionCommits + ", exp >=" + minExpected);
}
@@ -730,7 +728,7 @@ public class HoodieDeltaStreamerTestBase extends
UtilitiesTestBase {
static void assertAtleastNDeltaCommitsAfterCommit(int minExpected, String
lastSuccessfulCommit, String tablePath) {
HoodieTableMetaClient meta = createMetaClient(storage.getConf(),
tablePath);
HoodieTimeline timeline =
meta.reloadActiveTimeline().getDeltaCommitTimeline().findInstantsAfter(lastSuccessfulCommit).filterCompletedInstants();
- LOG.info("Timeline Instants={}", meta.getActiveTimeline().getInstants());
+ log.info("Timeline Instants={}", meta.getActiveTimeline().getInstants());
int numDeltaCommits = timeline.countInstants();
assertTrue(minExpected <= numDeltaCommits, "Got=" + numDeltaCommits + ",
exp >=" + minExpected);
}
@@ -757,9 +755,9 @@ public class HoodieDeltaStreamerTestBase extends
UtilitiesTestBase {
try {
Thread.sleep(2000);
ret = condition.apply(true);
- LOG.info("Condition completed successfully");
+ log.info("Condition completed successfully");
} catch (Throwable error) {
- LOG.debug("Got error waiting for condition", error);
+ log.debug("Got error waiting for condition", error);
ret = false;
}
}
@@ -777,7 +775,7 @@ public class HoodieDeltaStreamerTestBase extends
UtilitiesTestBase {
try {
Thread.sleep(5);
} catch (Throwable error) {
- LOG.debug("Got error waiting for condition", error);
+ log.debug("Got error waiting for condition", error);
}
}
}
@@ -785,7 +783,7 @@ public class HoodieDeltaStreamerTestBase extends
UtilitiesTestBase {
static void assertAtLeastNCommits(int minExpected, String tablePath) {
HoodieTableMetaClient meta = createMetaClient(storage.getConf(),
tablePath);
HoodieTimeline timeline =
meta.getActiveTimeline().filterCompletedInstants();
- LOG.info("Timeline Instants={}", meta.getActiveTimeline().getInstants());
+ log.info("Timeline Instants={}", meta.getActiveTimeline().getInstants());
int numDeltaCommits = timeline.countInstants();
assertTrue(minExpected <= numDeltaCommits, "Got=" + numDeltaCommits + ",
exp >=" + minExpected);
}
@@ -793,7 +791,7 @@ public class HoodieDeltaStreamerTestBase extends
UtilitiesTestBase {
static void assertAtLeastNReplaceCommits(int minExpected, String
tablePath) {
HoodieTableMetaClient meta = createMetaClient(storage.getConf(),
tablePath);
HoodieTimeline timeline =
meta.getActiveTimeline().getCompletedReplaceTimeline();
- LOG.info("Timeline Instants={}", meta.getActiveTimeline().getInstants());
+ log.info("Timeline Instants={}", meta.getActiveTimeline().getInstants());
int numDeltaCommits = timeline.countInstants();
assertTrue(minExpected <= numDeltaCommits, "Got=" + numDeltaCommits + ",
exp >=" + minExpected);
}
@@ -801,7 +799,7 @@ public class HoodieDeltaStreamerTestBase extends
UtilitiesTestBase {
static void assertPendingIndexCommit(String tablePath) {
HoodieTableMetaClient meta = createMetaClient(storage.getConf(),
tablePath);
HoodieTimeline timeline =
meta.reloadActiveTimeline().getAllCommitsTimeline().filterPendingIndexTimeline();
- LOG.info("Timeline Instants={}", meta.getActiveTimeline().getInstants());
+ log.info("Timeline Instants={}", meta.getActiveTimeline().getInstants());
int numIndexCommits = timeline.countInstants();
assertEquals(1, numIndexCommits, "Got=" + numIndexCommits + ", exp=1");
}
@@ -809,7 +807,7 @@ public class HoodieDeltaStreamerTestBase extends
UtilitiesTestBase {
static void assertCompletedIndexCommit(String tablePath) {
HoodieTableMetaClient meta = createMetaClient(storage.getConf(),
tablePath);
HoodieTimeline timeline =
meta.reloadActiveTimeline().getAllCommitsTimeline().filterCompletedIndexTimeline();
- LOG.info("Timeline Instants={}", meta.getActiveTimeline().getInstants());
+ log.info("Timeline Instants={}", meta.getActiveTimeline().getInstants());
int numIndexCommits = timeline.countInstants();
assertEquals(1, numIndexCommits, "Got=" + numIndexCommits + ", exp=1");
}
@@ -817,7 +815,7 @@ public class HoodieDeltaStreamerTestBase extends
UtilitiesTestBase {
static void assertNoReplaceCommits(String tablePath) {
HoodieTableMetaClient meta = createMetaClient(storage.getConf(),
tablePath);
HoodieTimeline timeline =
meta.getActiveTimeline().getCompletedReplaceTimeline();
- LOG.info("Timeline Instants={}", meta.getActiveTimeline().getInstants());
+ log.info("Timeline Instants={}", meta.getActiveTimeline().getInstants());
int numDeltaCommits = timeline.countInstants();
assertEquals(0, numDeltaCommits, "Got=" + numDeltaCommits + ", exp =" +
0);
}
@@ -825,7 +823,7 @@ public class HoodieDeltaStreamerTestBase extends
UtilitiesTestBase {
static void assertAtLeastNClusterRequests(int minExpected, String
tablePath) {
HoodieTableMetaClient meta = createMetaClient(storage.getConf(),
tablePath);
HoodieTimeline timeline =
meta.getActiveTimeline().filterPendingClusteringTimeline();
- LOG.info("Timeline Instants={}", meta.getActiveTimeline().getInstants());
+ log.info("Timeline Instants={}", meta.getActiveTimeline().getInstants());
int numDeltaCommits = timeline.countInstants();
assertTrue(minExpected <= numDeltaCommits, "Got=" + numDeltaCommits + ",
exp >=" + minExpected);
}
@@ -833,7 +831,7 @@ public class HoodieDeltaStreamerTestBase extends
UtilitiesTestBase {
static void assertAtLeastNCommitsAfterRollback(int minExpectedRollback,
int minExpectedCommits, String tablePath) {
HoodieTableMetaClient meta = createMetaClient(storage.getConf(),
tablePath);
HoodieTimeline timeline =
meta.getActiveTimeline().getRollbackTimeline().filterCompletedInstants();
- LOG.info("Rollback Timeline Instants={}",
meta.getActiveTimeline().getInstants());
+ log.info("Rollback Timeline Instants={}",
meta.getActiveTimeline().getInstants());
int numRollbackCommits = timeline.countInstants();
assertTrue(minExpectedRollback <= numRollbackCommits, "Got=" +
numRollbackCommits + ", exp >=" + minExpectedRollback);
HoodieInstant firstRollback = timeline.getInstants().get(0);
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/MockConfigurationHotUpdateStrategy.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/MockConfigurationHotUpdateStrategy.java
index ac2a9a4c5604..9ef10f76ffc1 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/MockConfigurationHotUpdateStrategy.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/MockConfigurationHotUpdateStrategy.java
@@ -23,8 +23,7 @@ import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.utilities.streamer.ConfigurationHotUpdateStrategy;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.extern.slf4j.Slf4j;
import java.io.Serializable;
@@ -33,8 +32,8 @@ import static
org.apache.hudi.config.HoodieWriteConfig.UPSERT_PARALLELISM_VALUE;
/**
* ConfigurationHotUpdateStrategy for test purpose.
*/
+@Slf4j
public class MockConfigurationHotUpdateStrategy extends
ConfigurationHotUpdateStrategy implements Serializable {
- private static final Logger LOG =
LoggerFactory.getLogger(MockConfigurationHotUpdateStrategy.class);
public MockConfigurationHotUpdateStrategy(HoodieDeltaStreamer.Config cfg,
TypedProperties properties) {
super(cfg, properties);
@@ -46,7 +45,7 @@ public class MockConfigurationHotUpdateStrategy extends
ConfigurationHotUpdateSt
long upsertShuffleParallelism =
currentProps.getLong(UPSERT_PARALLELISM_VALUE.key());
TypedProperties newProps = TypedProperties.copy(currentProps);
newProps.setProperty(UPSERT_PARALLELISM_VALUE.key(),
String.valueOf(upsertShuffleParallelism + 5));
- LOG.info("update {} from [{}] to [{}]", UPSERT_PARALLELISM_VALUE.key(),
upsertShuffleParallelism, upsertShuffleParallelism + 5);
+ log.info("update {} from [{}] to [{}]", UPSERT_PARALLELISM_VALUE.key(),
upsertShuffleParallelism, upsertShuffleParallelism + 5);
return Option.of(newProps);
}
return Option.empty();
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
index 04725249ca0f..d180dc720ea5 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
@@ -127,6 +127,7 @@ import
org.apache.hudi.utilities.transform.SqlQueryBasedTransformer;
import org.apache.hudi.utilities.transform.Transformer;
import com.fasterxml.jackson.databind.ObjectMapper;
+import lombok.extern.slf4j.Slf4j;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
@@ -156,8 +157,6 @@ import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.InputStream;
@@ -206,10 +205,9 @@ import static
org.junit.jupiter.params.provider.Arguments.arguments;
/**
* Basic tests against {@link HoodieDeltaStreamer}, by issuing bulk_inserts,
upserts, inserts. Check counts at the end.
*/
+@Slf4j
public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
- private static final Logger LOG =
LoggerFactory.getLogger(TestHoodieDeltaStreamer.class);
-
private void addRecordMerger(HoodieRecordType type, List<String>
hoodieConfig) {
if (type == HoodieRecordType.SPARK) {
Map<String, String> opts = new HashMap<>();
@@ -433,7 +431,7 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
deltaStreamer.sync();
}, "Should error out when setting the key generator class property to an
invalid value");
// expected
- LOG.warn("Expected error during getting the key generator", e);
+ log.warn("Expected error during getting the key generator", e);
assertTrue(e.getMessage().contains("Unable to load class"));
}
@@ -483,7 +481,7 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
syncOnce(TestHelpers.makeConfig(basePath + "/not_a_table",
WriteOperationType.BULK_INSERT));
}, "Should error out when pointed out at a dir thats not a table");
// expected
- LOG.debug("Expected error during table creation", e);
+ log.debug("Expected error during table creation", e);
}
@ParameterizedTest
@@ -560,7 +558,7 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
cfg.targetBasePath = newDatasetBasePath;
syncOnce(cfg);
Dataset<Row> res =
sqlContext.read().format("org.apache.hudi").load(newDatasetBasePath);
- LOG.info("Schema : {}", res.schema());
+ log.info("Schema : {}", res.schema());
assertRecordCount(1950, newDatasetBasePath, sqlContext);
res.registerTempTable("bootstrapped");
@@ -1546,7 +1544,7 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
try {
ds.sync();
} catch (Exception ex) {
- LOG.warn("DS continuous job failed, hence not proceeding with
condition check for " + jobId);
+ log.warn("DS continuous job failed, hence not proceeding with
condition check for {}", jobId);
throw new RuntimeException(ex.getMessage(), ex);
}
});
@@ -1944,19 +1942,19 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
buildIndexerConfig(tableBasePath, ds.getConfig().targetTableName,
null, UtilHelpers.SCHEDULE, "COLUMN_STATS"));
scheduleIndexInstantTime = scheduleIndexingJob.doSchedule();
} catch (Exception e) {
- LOG.info("Schedule indexing failed", e);
+ log.info("Schedule indexing failed", e);
return false;
}
if (scheduleIndexInstantTime.isPresent()) {
TestHelpers.assertPendingIndexCommit(tableBasePath);
- LOG.info("Schedule indexing success, now build index with instant time
" + scheduleIndexInstantTime.get());
+ log.info("Schedule indexing success, now build index with instant time
{}", scheduleIndexInstantTime.get());
HoodieIndexer runIndexingJob = new HoodieIndexer(jsc,
buildIndexerConfig(tableBasePath, ds.getConfig().targetTableName,
scheduleIndexInstantTime.get(), UtilHelpers.EXECUTE, "COLUMN_STATS"));
runIndexingJob.start(0);
- LOG.info("Metadata indexing success");
+ log.info("Metadata indexing success");
TestHelpers.assertCompletedIndexCommit(tableBasePath);
} else {
- LOG.warn("Metadata indexing failed");
+ log.warn("Metadata indexing failed");
}
return true;
});
@@ -1984,7 +1982,7 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
Arrays.asList(HoodieMetadataConfig.GLOBAL_RECORD_LEVEL_INDEX_ENABLE_PROP.key()
+ "=true", HoodieWriteConfig.MARKERS_TYPE.key() + "=DIRECT")));
scheduleIndexInstantTime = scheduleIndexingJob.doSchedule();
TestHelpers.assertPendingIndexCommit(tableBasePath);
- LOG.info("Schedule indexing success, now build index with instant time
" + scheduleIndexInstantTime.get());
+ log.info("Schedule indexing success, now build index with instant time
{}", scheduleIndexInstantTime.get());
// Wait for a pending commit before starting execution phase for the
executor. This ensures that indexer waits for the commit to complete.
TestHelpers.waitFor(() -> {
HoodieTableMetaClient metaClient =
HoodieTestUtils.createMetaClient(storage.getConf(), tableBasePath);
@@ -1995,7 +1993,7 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
buildIndexerConfig(tableBasePath, ds.getConfig().targetTableName,
scheduleIndexInstantTime.get(), UtilHelpers.EXECUTE, "RECORD_INDEX",
Arrays.asList(HoodieMetadataConfig.GLOBAL_RECORD_LEVEL_INDEX_ENABLE_PROP.key()
+ "=true", HoodieWriteConfig.MARKERS_TYPE.key() + "=DIRECT")));
runIndexingJob.start(0);
- LOG.info("Metadata indexing success");
+ log.info("Metadata indexing success");
TestHelpers.assertCompletedIndexCommit(tableBasePath);
// Assert no pending commits before indexing instant
HoodieTableMetaClient metaClient =
HoodieTestUtils.createMetaClient(storage.getConf(), tableBasePath);
@@ -2044,7 +2042,7 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
buildIndexerConfig(tableBasePath, ds.getConfig().targetTableName,
null, UtilHelpers.SCHEDULE, "RECORD_INDEX"));
scheduleIndexInstantTime = scheduleIndexingJob.doSchedule();
TestHelpers.assertPendingIndexCommit(tableBasePath);
- LOG.info("Schedule indexing success, now build index with instant time
" + scheduleIndexInstantTime.get());
+ log.info("Schedule indexing success, now build index with instant time
{}", scheduleIndexInstantTime.get());
// Wait for clustering instant to be scheduled before starting
execution phase of the executor
TestHelpers.waitFor(() -> {
HoodieTableMetaClient metaClient =
HoodieTestUtils.createMetaClient(storage, tableBasePath);
@@ -2061,7 +2059,7 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
boolean res = JavaTestUtils.checkNestedExceptionContains(t, "Index
catchup failed");
assertTrue(res, "Indexing catchup task should have timed out");
}
- LOG.info("Metadata indexing timed out");
+ log.info("Metadata indexing timed out");
} catch (Exception e) {
fail("Indexing job should not have failed", e);
}
@@ -2091,19 +2089,19 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
initialHoodieClusteringJob(tableBasePath, null, true, null);
scheduleClusteringInstantTime = scheduleClusteringJob.doSchedule();
} catch (Exception e) {
- LOG.warn("Schedule clustering failed", e);
+ log.warn("Schedule clustering failed", e);
Assertions.fail("Schedule clustering failed", e);
}
if (scheduleClusteringInstantTime.isPresent()) {
- LOG.info("Schedule clustering success, now cluster with instant time "
+ scheduleClusteringInstantTime.get());
+ log.info("Schedule clustering success, now cluster with instant time
{}", scheduleClusteringInstantTime.get());
HoodieClusteringJob.Config clusterClusteringConfig =
buildHoodieClusteringUtilConfig(tableBasePath,
shouldPassInClusteringInstantTime ?
scheduleClusteringInstantTime.get() : null, false);
HoodieClusteringJob clusterClusteringJob = new
HoodieClusteringJob(jsc, clusterClusteringConfig);
clusterClusteringJob.cluster(clusterClusteringConfig.retry);
TestHelpers.assertAtLeastNReplaceCommits(1, tableBasePath);
- LOG.info("Cluster success");
+ log.info("Cluster success");
} else {
- LOG.warn("Clustering execution failed");
+ log.warn("Clustering execution failed");
Assertions.fail("Clustering execution failed");
}
} else {
@@ -2273,15 +2271,15 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
try {
int result = scheduleClusteringJob.cluster(0);
if (result == 0) {
- LOG.info("Cluster success");
+ log.info("Cluster success");
} else {
- LOG.warn("Cluster failed");
+ log.warn("Cluster failed");
if (!runningMode.toLowerCase().equals(UtilHelpers.EXECUTE)) {
return false;
}
}
} catch (Exception e) {
- LOG.warn("ScheduleAndExecute clustering failed", e);
+ log.warn("ScheduleAndExecute clustering failed", e);
exception = e;
if (!runningMode.equalsIgnoreCase(UtilHelpers.EXECUTE)) {
return false;
@@ -2441,13 +2439,13 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
try {
int counter = 2;
while (counter < 100) { // lets keep going. if the test times out, we
will cancel the future within finally. So, safe to generate 100 batches.
- LOG.info("Generating data for batch {}", counter);
+ log.info("Generating data for batch {}", counter);
prepareParquetDFSFiles(100, PARQUET_SOURCE_ROOT,
Integer.toString(counter) + ".parquet", false, null, null, makeDatesAmbiguous);
counter++;
Thread.sleep(2000);
}
} catch (Exception ex) {
- LOG.warn("Input data generation failed", ex);
+ log.warn("Input data generation failed", ex);
throw new RuntimeException(ex.getMessage(), ex);
}
});
@@ -2591,7 +2589,7 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
Exception e = assertThrows(HoodieException.class, () -> {
syncOnce(new HoodieDeltaStreamer(cfg, jsc, fs,
hiveServer.getHiveConf()));
}, "Should error out when schema provider is not provided");
- LOG.debug("Expected error during reading data from source ", e);
+ log.debug("Expected error during reading data from source ", e);
assertTrue(e.getMessage().contains("Schema provider is required for this
operation and for the source of interest. "
+ "Please set '--schemaprovider-class' in the top level HoodieStreamer
config for the source of interest. "
+ "Based on the schema provider class chosen, additional configs might
be required. "
@@ -2823,7 +2821,7 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
// Ensure it is empty
HoodieCommitMetadata commitMetadata =
mClient.getActiveTimeline().readCommitMetadata(newLastFinished);
- LOG.info("New Commit Metadata={}", commitMetadata);
+ log.info("New Commit Metadata={}", commitMetadata);
assertTrue(commitMetadata.getPartitionToWriteStats().isEmpty());
// Try UPSERT with filterDupes true. Expect exception
@@ -3339,7 +3337,7 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
try {
fs.delete(entry.getPath());
} catch (IOException e) {
- LOG.warn("Failed to delete " + entry.getPath().toString(), e);
+ log.warn("Failed to delete: {}", entry.getPath().toString(), e);
}
});
}
@@ -3515,7 +3513,7 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
Exception e = assertThrows(AnalysisException.class, () -> {
testCsvDFSSource(false, '\t', false,
Collections.singletonList(TripsWithDistanceTransformer.class.getName()));
}, "Should error out when doing the transformation.");
- LOG.debug("Expected error during transformation", e);
+ log.debug("Expected error during transformation", e);
// First message for Spark 3.4 and above, second message for Spark 3.3,
third message for Spark 3.2 and below
assertTrue(
e.getMessage().contains("[UNRESOLVED_COLUMN.WITH_SUGGESTION] A column
or function parameter "
@@ -3882,9 +3880,9 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
.build();
Properties hoodieProps = new Properties();
hoodieProps.load(fs.open(new Path(cfg.targetBasePath +
"/.hoodie/hoodie.properties")));
- LOG.info("old props: {}", hoodieProps);
+ log.info("old props: {}", hoodieProps);
hoodieProps.put("hoodie.table.type", HoodieTableType.MERGE_ON_READ.name());
- LOG.info("new props: {}", hoodieProps);
+ log.info("new props: {}", hoodieProps);
StoragePath metaPathDir = new StoragePath(metaClient.getBasePath(),
HoodieTableMetaClient.METAFOLDER_NAME);
HoodieTableConfig.create(metaClient.getStorage(), metaPathDir,
hoodieProps);
@@ -3953,9 +3951,9 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
.build();
Properties hoodieProps = new Properties();
hoodieProps.load(fs.open(new Path(cfg.targetBasePath +
"/.hoodie/hoodie.properties")));
- LOG.info("old props: " + hoodieProps);
+ log.info("Old props: {}", hoodieProps);
hoodieProps.put("hoodie.table.type", HoodieTableType.COPY_ON_WRITE.name());
- LOG.info("new props: " + hoodieProps);
+ log.info("New props: {}", hoodieProps);
StoragePath metaPathDir = new StoragePath(metaClient.getBasePath(),
".hoodie");
HoodieTableConfig.create(metaClient.getStorage(), metaPathDir,
hoodieProps);
@@ -4249,13 +4247,13 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
/**
* Return empty table.
*/
+ @Slf4j
public static class DropAllTransformer implements Transformer {
- private static final Logger LOG =
LoggerFactory.getLogger(DropAllTransformer.class);
@Override
public Dataset apply(JavaSparkContext jsc, SparkSession sparkSession,
Dataset<Row> rowDataset,
TypedProperties properties) {
- LOG.info("DropAllTransformer called !!");
+ log.info("DropAllTransformer called !!");
return sparkSession.createDataFrame(jsc.emptyRDD(), rowDataset.schema());
}
}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerWithMultiWriter.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerWithMultiWriter.java
index 0fce13068676..08c355556ff7 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerWithMultiWriter.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerWithMultiWriter.java
@@ -38,13 +38,12 @@ import org.apache.hudi.utilities.config.SourceTestConfig;
import org.apache.hudi.utilities.sources.TestDataSource;
import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
+import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
@@ -68,10 +67,9 @@ import static
org.apache.hudi.config.HoodieWriteConfig.UPSERT_PARALLELISM_VALUE;
import static
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.CHECKPOINT_KEY;
import static
org.apache.hudi.utilities.deltastreamer.TestHoodieDeltaStreamer.deltaStreamerTestRunner;
+@Slf4j
public class TestHoodieDeltaStreamerWithMultiWriter extends
HoodieDeltaStreamerTestBase {
- private static final Logger LOG =
LoggerFactory.getLogger(TestHoodieDeltaStreamerWithMultiWriter.class);
-
String basePath;
String propsFilePath;
String tableBasePath;
@@ -413,7 +411,7 @@ public class TestHoodieDeltaStreamerWithMultiWriter extends
HoodieDeltaStreamerT
deltaStreamerTestRunner(ingestionJob, cfgIngestionJob,
conditionForRegularIngestion, jobId);
} catch (Throwable ex) {
continuousFailed.set(true);
- LOG.error("Continuous job failed " + ex.getMessage());
+ log.error("Continuous job failed {}", ex.getMessage());
throw new RuntimeException(ex);
}
});
@@ -424,7 +422,7 @@ public class TestHoodieDeltaStreamerWithMultiWriter extends
HoodieDeltaStreamerT
awaitCondition(new GetCommitsAfterInstant(tableBasePath,
lastSuccessfulCommit));
backfillJob.sync();
} catch (Throwable ex) {
- LOG.error("Backfilling job failed " + ex.getMessage());
+ log.error("Backfilling job failed {}", ex.getMessage());
backfillFailed.set(true);
throw new RuntimeException(ex);
}
@@ -443,7 +441,7 @@ public class TestHoodieDeltaStreamerWithMultiWriter extends
HoodieDeltaStreamerT
// expected ConcurrentModificationException since ingestion & backfill
will have overlapping writes
if (!continuousFailed.get()) {
// if backfill job failed, shutdown the continuous job.
- LOG.warn("Calling shutdown on ingestion job since the backfill job
has failed for " + jobId);
+ log.warn("Calling shutdown on ingestion job since the backfill job
has failed for {}", jobId);
ingestionJob.shutdownGracefully();
} else {
// both backfill and ingestion job cannot fail.
@@ -452,14 +450,14 @@ public class TestHoodieDeltaStreamerWithMultiWriter
extends HoodieDeltaStreamerT
} else if (expectConflict && continuousFailed.get() &&
e.getCause().getMessage().contains("Ingestion service was shut down with
exception")) {
// incase of regular ingestion job failing,
ConcurrentModificationException is not throw all the way.
if (!backfillFailed.get()) {
- LOG.warn("Calling shutdown on backfill job since the
ingstion/continuous job has failed for " + jobId);
+ log.warn("Calling shutdown on backfill job since the
ingstion/continuous job has failed for {}", jobId);
backfillJob.shutdownGracefully();
} else {
// both backfill and ingestion job cannot fail.
throw new HoodieException("Both backfilling and ingestion job failed
", e);
}
} else {
- LOG.error("Conflict happened, but not expected " +
e.getCause().getMessage());
+ log.error("Conflict happened, but not expected {}",
e.getCause().getMessage());
throw e;
}
} finally {
@@ -495,7 +493,7 @@ public class TestHoodieDeltaStreamerWithMultiWriter extends
HoodieDeltaStreamerT
soFar += 500;
}
}
- LOG.warn("Awaiting completed in " + (System.currentTimeMillis() -
startTime));
+ log.warn("Awaiting completed in {}", System.currentTimeMillis() -
startTime);
}
}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieMultiTableDeltaStreamer.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieMultiTableDeltaStreamer.java
index fc75e76275af..b1e15b494c7e 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieMultiTableDeltaStreamer.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieMultiTableDeltaStreamer.java
@@ -34,9 +34,8 @@ import org.apache.hudi.utilities.sources.TestDataSource;
import org.apache.hudi.utilities.streamer.TableExecutionContext;
import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
+import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Arrays;
@@ -49,10 +48,9 @@ import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
+@Slf4j
public class TestHoodieMultiTableDeltaStreamer extends
HoodieDeltaStreamerTestBase {
- private static final Logger LOG =
LoggerFactory.getLogger(TestHoodieMultiTableDeltaStreamer.class);
-
static class TestHelpers {
static HoodieMultiTableDeltaStreamer.Config getConfig(String fileName,
String configFolder, String sourceClassName, boolean enableHiveSync, boolean
enableMetaSync,
@@ -104,7 +102,7 @@ public class TestHoodieMultiTableDeltaStreamer extends
HoodieDeltaStreamerTestBa
Exception e = assertThrows(HoodieException.class, () -> {
new HoodieMultiTableDeltaStreamer(cfg, jsc);
}, "Should fail when hive sync table not provided with enableHiveSync
flag");
- LOG.debug("Expected error when creating table execution objects", e);
+ log.debug("Expected error when creating table execution objects", e);
assertTrue(e.getMessage().contains("Meta sync table field not provided!"));
}
@@ -114,7 +112,7 @@ public class TestHoodieMultiTableDeltaStreamer extends
HoodieDeltaStreamerTestBa
Exception e = assertThrows(IllegalArgumentException.class, () -> {
new HoodieMultiTableDeltaStreamer(cfg, jsc);
}, "Should fail when invalid props file is provided");
- LOG.debug("Expected error when creating table execution objects", e);
+ log.debug("Expected error when creating table execution objects", e);
assertTrue(e.getMessage().contains("Please provide valid common config
file path!"));
}
@@ -124,7 +122,7 @@ public class TestHoodieMultiTableDeltaStreamer extends
HoodieDeltaStreamerTestBa
Exception e = assertThrows(IllegalArgumentException.class, () -> {
new HoodieMultiTableDeltaStreamer(cfg, jsc);
}, "Should fail when invalid table config props file path is provided");
- LOG.debug("Expected error when creating table execution objects", e);
+ log.debug("Expected error when creating table execution objects", e);
assertTrue(e.getMessage().contains("Please provide valid table config file
path!"));
}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHiveSchemaProvider.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHiveSchemaProvider.java
index bbd9eae152dc..028e67c291b7 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHiveSchemaProvider.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHiveSchemaProvider.java
@@ -29,6 +29,7 @@ import org.apache.hudi.utilities.schema.HiveSchemaProvider;
import
org.apache.hudi.utilities.testutils.SparkClientFunctionalTestHarnessWithHiveSupport;
import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
+import lombok.extern.slf4j.Slf4j;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
import org.junit.jupiter.api.Assertions;
@@ -36,8 +37,6 @@ import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.IOException;
@@ -46,9 +45,10 @@ import static org.junit.jupiter.api.Assertions.assertNotNull;
/**
* Basic tests against {@link HiveSchemaProvider}.
*/
+@Slf4j
@Tag("functional")
public class TestHiveSchemaProvider extends
SparkClientFunctionalTestHarnessWithHiveSupport {
- private static final Logger LOG =
LoggerFactory.getLogger(TestHiveSchemaProvider.class);
+
private static final TypedProperties PROPS = new TypedProperties();
private static final String SOURCE_SCHEMA_TABLE_NAME =
"schema_registry.source_schema_tab";
private static final String TARGET_SCHEMA_TABLE_NAME =
"schema_registry.target_schema_tab";
@@ -75,7 +75,7 @@ public class TestHiveSchemaProvider extends
SparkClientFunctionalTestHarnessWith
assertNotNull(originalField);
}
} catch (HoodieException e) {
- LOG.error("Failed to get source schema. ", e);
+ log.error("Failed to get source schema. ", e);
throw e;
}
}
@@ -97,7 +97,7 @@ public class TestHiveSchemaProvider extends
SparkClientFunctionalTestHarnessWith
assertNotNull(originalField);
}
} catch (HoodieException e) {
- LOG.error("Failed to get source/target schema. ", e);
+ log.error("Failed to get source/target schema. ", e);
throw e;
}
}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestJdbcbasedSchemaProvider.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestJdbcbasedSchemaProvider.java
index a13a270b4c02..fc8755ff878c 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestJdbcbasedSchemaProvider.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestJdbcbasedSchemaProvider.java
@@ -26,11 +26,10 @@ import org.apache.hudi.utilities.UtilHelpers;
import org.apache.hudi.utilities.schema.JdbcbasedSchemaProvider;
import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
+import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.DriverManager;
@@ -43,10 +42,10 @@ import static
org.apache.hudi.utilities.testutils.JdbcTestUtils.JDBC_URL;
import static org.apache.hudi.utilities.testutils.JdbcTestUtils.JDBC_USER;
import static org.junit.jupiter.api.Assertions.assertEquals;
+@Slf4j
@Tag("functional")
public class TestJdbcbasedSchemaProvider extends
SparkClientFunctionalTestHarness {
- private static final Logger LOG =
LoggerFactory.getLogger(TestJdbcbasedSchemaProvider.class);
private static final TypedProperties PROPS = new TypedProperties();
@BeforeAll
@@ -67,7 +66,7 @@ public class TestJdbcbasedSchemaProvider extends
SparkClientFunctionalTestHarnes
HoodieSchema sourceSchema =
UtilHelpers.createSchemaProvider(JdbcbasedSchemaProvider.class.getName(),
PROPS, jsc()).getSourceHoodieSchema();
assertEquals(sourceSchema.toString().toUpperCase(),
HoodieSchema.parse(UtilitiesTestBase.Helpers.readFile("streamer-config/source-jdbc.avsc")).toString().toUpperCase());
} catch (HoodieException e) {
- LOG.error("Failed to get connection through jdbc. ", e);
+ log.error("Failed to get connection through jdbc. ", e);
}
}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/multitable/TestHoodieMultiTableServicesMain.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/multitable/TestHoodieMultiTableServicesMain.java
index b93b70342b1c..39e30475cbb0 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/multitable/TestHoodieMultiTableServicesMain.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/multitable/TestHoodieMultiTableServicesMain.java
@@ -50,6 +50,7 @@ import org.apache.hudi.table.storage.HoodieStorageLayout;
import org.apache.hudi.testutils.providers.SparkProvider;
import org.apache.hudi.utilities.HoodieCompactor;
+import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.fs.Path;
import org.apache.spark.HoodieSparkKryoRegistrar$;
import org.apache.spark.SparkConf;
@@ -61,8 +62,6 @@ import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
@@ -79,10 +78,9 @@ import static
org.apache.hudi.utilities.multitable.MultiTableServiceUtils.Consta
* Tests for HoodieMultiTableServicesMain
* @see HoodieMultiTableServicesMain
*/
+@Slf4j
class TestHoodieMultiTableServicesMain extends HoodieCommonTestHarness
implements SparkProvider {
- private static final Logger LOG =
LoggerFactory.getLogger(TestHoodieMultiTableServicesMain.class);
-
protected boolean initialized = false;
private static SparkSession spark;
@@ -155,10 +153,10 @@ class TestHoodieMultiTableServicesMain extends
HoodieCommonTestHarness implement
new Thread(() -> {
try {
Thread.sleep(10000);
- LOG.info("Shutdown the table services");
+ log.info("Shutdown the table services");
main.cancel();
} catch (InterruptedException e) {
- LOG.warn("InterruptedException: ", e);
+ log.warn("InterruptedException: ", e);
}
}).start();
main.startServices();
@@ -193,10 +191,10 @@ class TestHoodieMultiTableServicesMain extends
HoodieCommonTestHarness implement
new Thread(() -> {
try {
Thread.sleep(10000);
- LOG.info("Shutdown the table services");
+ log.info("Shutdown the table services");
main.cancel();
} catch (InterruptedException e) {
- LOG.warn("InterruptedException: ", e);
+ log.warn("InterruptedException: ", e);
}
}).start();
try {
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/BaseTestKafkaSource.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/BaseTestKafkaSource.java
index 243ad570ad75..5c836a95c8f4 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/BaseTestKafkaSource.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/BaseTestKafkaSource.java
@@ -33,6 +33,9 @@ import org.apache.hudi.utilities.streamer.SourceProfile;
import org.apache.hudi.utilities.streamer.SourceProfileSupplier;
import org.apache.hudi.utilities.testutils.KafkaTestUtils;
+import lombok.AccessLevel;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -323,28 +326,15 @@ public abstract class BaseTestKafkaSource extends
SparkClientFunctionalTestHarne
verify(metrics,
times(2)).updateStreamerSourceBytesToBeIngestedInSyncRound(Long.MAX_VALUE);
}
+ @AllArgsConstructor
+ @Getter
static class TestSourceProfile implements SourceProfile<Long> {
private final long maxSourceBytes;
private final int sourcePartitions;
+ @Getter(AccessLevel.NONE)
private final long numEvents;
- public TestSourceProfile(long maxSourceBytes, int sourcePartitions, long
numEvents) {
- this.maxSourceBytes = maxSourceBytes;
- this.sourcePartitions = sourcePartitions;
- this.numEvents = numEvents;
- }
-
- @Override
- public long getMaxSourceBytes() {
- return maxSourceBytes;
- }
-
- @Override
- public int getSourcePartitions() {
- return sourcePartitions;
- }
-
@Override
public Long getSourceSpecificContext() {
return numEvents;
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSourceHarness.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSourceHarness.java
index 5e43daaa2a9c..ecf34920218d 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSourceHarness.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSourceHarness.java
@@ -56,6 +56,9 @@ import
org.apache.hudi.utilities.streamer.SourceProfileSupplier;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
+import lombok.AccessLevel;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.spark.api.java.JavaRDD;
@@ -279,27 +282,15 @@ public class S3EventsHoodieIncrSourceHarness extends
SparkClientFunctionalTestHa
readAndAssert(missingCheckpointStrategy, checkpointToPull, sourceLimit,
expectedCheckpoint, typedProperties);
}
+ @AllArgsConstructor
+ @Getter
static class TestSourceProfile implements SourceProfile<Long> {
+
private final long maxSourceBytes;
private final int sourcePartitions;
+ @Getter(AccessLevel.NONE)
private final long bytesPerPartition;
- public TestSourceProfile(long maxSourceBytes, int sourcePartitions, long
bytesPerPartition) {
- this.maxSourceBytes = maxSourceBytes;
- this.sourcePartitions = sourcePartitions;
- this.bytesPerPartition = bytesPerPartition;
- }
-
- @Override
- public long getMaxSourceBytes() {
- return maxSourceBytes;
- }
-
- @Override
- public int getSourcePartitions() {
- return sourcePartitions;
- }
-
@Override
public Long getSourceSpecificContext() {
return bytesPerPartition;
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestDataSource.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestDataSource.java
index 923aba49f5e9..1ec7842fe4e1 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestDataSource.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestDataSource.java
@@ -24,12 +24,11 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.testutils.sources.AbstractBaseTestSource;
+import lombok.extern.slf4j.Slf4j;
import org.apache.avro.generic.GenericRecord;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.stream.Collectors;
@@ -37,9 +36,9 @@ import java.util.stream.Collectors;
/**
* An implementation of {@link Source}, that emits test upserts.
*/
+@Slf4j
public class TestDataSource extends AbstractBaseTestSource {
- private static final Logger LOG =
LoggerFactory.getLogger(TestDataSource.class);
public static boolean returnEmptyBatch = false;
public static Option<String> recordInstantTime = Option.empty();
private static int counter = 0;
@@ -56,14 +55,14 @@ public class TestDataSource extends AbstractBaseTestSource {
int nextCommitNum = lastCheckpoint.map(s ->
Integer.parseInt(s.getCheckpointKey()) + 1).orElse(0);
String instantTime = String.format("%05d", nextCommitNum);
- LOG.info("Source Limit is set to " + sourceLimit);
+ log.info("Source Limit is set to {}", sourceLimit);
// No new data.
if (sourceLimit <= 0 || returnEmptyBatch) {
- LOG.warn("Return no new data from Test Data source " + counter + ",
source limit " + sourceLimit);
+ log.warn("Return no new data from Test Data source {}, source limit {}",
counter, sourceLimit);
return new InputBatch<>(Option.empty(), lastCheckpoint.orElse(null));
} else {
- LOG.warn("Returning valid data from Test Data source " + counter + ",
source limit " + sourceLimit);
+ log.warn("Returning valid data from Test Data source {}, source limit
{}", counter, sourceLimit);
}
counter++;
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java
index 9e95d3e6d4f1..746760a86307 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java
@@ -57,6 +57,7 @@ import
org.apache.hudi.utilities.streamer.SourceProfileSupplier;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
+import lombok.extern.slf4j.Slf4j;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.spark.api.java.JavaRDD;
@@ -73,8 +74,6 @@ import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
@@ -96,6 +95,7 @@ import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+@Slf4j
public class TestGcsEventsHoodieIncrSource extends
SparkClientFunctionalTestHarness {
private static final HoodieSchema GCS_METADATA_SCHEMA =
SchemaTestUtil.getSchemaFromResource(
@@ -122,8 +122,6 @@ public class TestGcsEventsHoodieIncrSource extends
SparkClientFunctionalTestHarn
private HoodieTableMetaClient metaClient;
private JavaSparkContext jsc;
- private static final Logger LOG =
LoggerFactory.getLogger(TestGcsEventsHoodieIncrSource.class);
-
@BeforeEach
public void setUp() throws IOException {
metaClient = getHoodieMetaClient(storageConf(), basePath());
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java
index bdcf678146b2..997cb299496d 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java
@@ -67,6 +67,9 @@ import org.apache.hudi.utilities.streamer.SourceProfile;
import org.apache.hudi.utilities.streamer.SourceProfileSupplier;
import com.codahale.metrics.MetricRegistry;
+import lombok.AccessLevel;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
@@ -1092,47 +1095,28 @@ public class TestHoodieIncrSource extends
SparkClientFunctionalTestHarness {
);
}
+ @AllArgsConstructor
+ @Getter
static class TestSourceProfile implements SourceProfile<Integer> {
private final long maxSourceBytes;
private final int sourcePartitions;
+ @Getter(AccessLevel.NONE)
private final int numInstantsPerFetch;
- public TestSourceProfile(long maxSourceBytes, int sourcePartitions, int
numInstantsPerFetch) {
- this.maxSourceBytes = maxSourceBytes;
- this.sourcePartitions = sourcePartitions;
- this.numInstantsPerFetch = numInstantsPerFetch;
- }
-
- @Override
- public long getMaxSourceBytes() {
- return maxSourceBytes;
- }
-
- @Override
- public int getSourcePartitions() {
- return sourcePartitions;
- }
-
@Override
public Integer getSourceSpecificContext() {
return numInstantsPerFetch;
}
}
+ @AllArgsConstructor
+ @Getter
static class WriteResult {
+
private HoodieInstant instant;
private List<HoodieRecord> records;
- WriteResult(HoodieInstant instant, List<HoodieRecord> records) {
- this.instant = instant;
- this.records = records;
- }
-
- public HoodieInstant getInstant() {
- return instant;
- }
-
public String getInstantTime() {
return instant.requestedTime();
}
@@ -1140,9 +1124,5 @@ public class TestHoodieIncrSource extends
SparkClientFunctionalTestHarness {
public String getCompletionTime() {
return instant.getCompletionTime();
}
-
- public List<HoodieRecord> getRecords() {
- return records;
- }
}
}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/JdbcTestUtils.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/JdbcTestUtils.java
index 7f4f264b69c2..4cd96d24301c 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/JdbcTestUtils.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/JdbcTestUtils.java
@@ -23,9 +23,8 @@ import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import lombok.extern.slf4j.Slf4j;
import org.apache.avro.generic.GenericRecord;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.sql.Connection;
@@ -39,10 +38,9 @@ import java.util.Objects;
/**
* Helper class used in testing {@link
org.apache.hudi.utilities.sources.JdbcSource}.
*/
+@Slf4j
public class JdbcTestUtils {
- private static final Logger LOG =
LoggerFactory.getLogger(JdbcTestUtils.class);
-
public static final String JDBC_URL = "jdbc:h2:mem:test_mem";
public static final String JDBC_DRIVER = "org.h2.Driver";
public static final String JDBC_USER = "test";
@@ -99,7 +97,7 @@ public class JdbcTestUtils {
insertStatement.setDouble(9, Double.parseDouble(((GenericRecord)
record.get("fare")).get("amount").toString()));
insertStatement.addBatch();
} catch (SQLException e) {
- LOG.warn(e.getMessage());
+ log.warn(e.getMessage());
}
});
insertStatement.executeBatch();
@@ -137,7 +135,7 @@ public class JdbcTestUtils {
updateStatement.setString(10, r.get("_row_key").toString());
updateStatement.addBatch();
} catch (SQLException e) {
- LOG.warn(e.getMessage());
+ log.warn(e.getMessage());
}
});
updateStatement.executeBatch();
@@ -149,7 +147,7 @@ public class JdbcTestUtils {
try (Statement statement = connection.createStatement()) {
statement.executeUpdate(query);
} catch (SQLException e) {
- LOG.error(message);
+ log.error(message);
}
}
@@ -159,7 +157,7 @@ public class JdbcTestUtils {
statement.close();
}
} catch (SQLException e) {
- LOG.error("Error while closing statement. " + e.getMessage());
+ log.error("Error while closing statement. {}", e.getMessage());
}
}
@@ -169,7 +167,7 @@ public class JdbcTestUtils {
connection.close();
}
} catch (SQLException e) {
- LOG.error("Error while closing connection. " + e.getMessage());
+ log.error("Error while closing connection. {}", e.getMessage());
}
}
@@ -179,7 +177,7 @@ public class JdbcTestUtils {
rs.next();
return rs.getInt(1);
} catch (SQLException e) {
- LOG.warn("Error while counting records. " + e.getMessage());
+ log.warn("Error while counting records. {}", e.getMessage());
return 0;
}
}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java
index 8969737d38e9..eed8a9c42d2b 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java
@@ -53,6 +53,7 @@ import com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.dataformat.csv.CsvMapper;
import com.fasterxml.jackson.dataformat.csv.CsvSchema;
import com.fasterxml.jackson.dataformat.csv.CsvSchema.Builder;
+import lombok.extern.slf4j.Slf4j;
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericDatumWriter;
@@ -79,8 +80,6 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.io.TempDir;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.BufferedReader;
import java.io.FileInputStream;
@@ -110,8 +109,8 @@ import static
org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_TABLE_NAME;
* Abstract test that provides a dfs & spark contexts.
*
*/
+@Slf4j
public class UtilitiesTestBase {
- private static final Logger LOG =
LoggerFactory.getLogger(UtilitiesTestBase.class);
@TempDir
protected static java.nio.file.Path sharedTempDir;
protected static FileSystem fs;
@@ -256,7 +255,7 @@ public class UtilitiesTestBase {
}
if (!failedReleases.isEmpty()) {
- LOG.error("Exception happened during releasing: " + String.join(",",
failedReleases));
+ log.error("Exception happened during releasing: {}", String.join(",",
failedReleases));
}
}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/AbstractBaseTestSource.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/AbstractBaseTestSource.java
index e943545ddbc0..3036810d8475 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/AbstractBaseTestSource.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/AbstractBaseTestSource.java
@@ -30,13 +30,12 @@ import org.apache.hudi.utilities.config.SourceTestConfig;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.AvroSource;
+import lombok.extern.slf4j.Slf4j;
import org.apache.avro.generic.GenericRecord;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SparkSession;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
@@ -48,12 +47,11 @@ import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
+@Slf4j
public abstract class AbstractBaseTestSource extends AvroSource {
public static String schemaStr = HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA;
- private static final Logger LOG =
LoggerFactory.getLogger(AbstractBaseTestSource.class);
-
public static final int DEFAULT_PARTITION_NUM = 0;
// Static instance, helps with reuse across a test.
@@ -69,7 +67,7 @@ public abstract class AbstractBaseTestSource extends
AvroSource {
boolean useRocksForTestDataGenKeys =
ConfigUtils.getBooleanWithAltKeys(props,
SourceTestConfig.USE_ROCKSDB_FOR_TEST_DATAGEN_KEYS);
String baseStoreDir = ConfigUtils.getStringWithAltKeys(props,
SourceTestConfig.ROCKSDB_BASE_DIR_FOR_TEST_DATAGEN_KEYS,
File.createTempFile("test_data_gen", ".keys").getParent()) + "/" +
partition;
- LOG.info("useRocksForTestDataGenKeys={}, BaseStoreDir={}",
useRocksForTestDataGenKeys, baseStoreDir);
+ log.info("useRocksForTestDataGenKeys={}, BaseStoreDir={}",
useRocksForTestDataGenKeys, baseStoreDir);
dataGeneratorMap.put(partition, new
HoodieTestDataGenerator(HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS,
useRocksForTestDataGenKeys ? new RocksDBBasedMap<>(baseStoreDir) :
new HashMap<>()));
} catch (IOException e) {
@@ -114,11 +112,11 @@ public abstract class AbstractBaseTestSource extends
AvroSource {
// generate `sourceLimit` number of upserts each time.
int numExistingKeys = dataGenerator.getNumExistingKeys(schemaStr);
- LOG.info("NumExistingKeys={}", numExistingKeys);
+ log.info("NumExistingKeys={}", numExistingKeys);
int numUpdates = Math.min(numExistingKeys, sourceLimit / 2);
int numInserts = sourceLimit - numUpdates;
- LOG.info("Before adjustments => numInserts={}, numUpdates={}", numInserts,
numUpdates);
+ log.info("Before adjustments => numInserts={}, numUpdates={}", numInserts,
numUpdates);
boolean reachedMax = false;
if (numInserts + numExistingKeys > maxUniqueKeys) {
@@ -135,16 +133,16 @@ public abstract class AbstractBaseTestSource extends
AvroSource {
Stream<GenericRecord> deleteStream = Stream.empty();
Stream<GenericRecord> updateStream;
long memoryUsage1 = Runtime.getRuntime().totalMemory() -
Runtime.getRuntime().freeMemory();
- LOG.info("Before DataGen. Memory Usage={}, Total Memory={}, Free
Memory={}", memoryUsage1, Runtime.getRuntime().totalMemory(),
+ log.info("Before DataGen. Memory Usage={}, Total Memory={}, Free
Memory={}", memoryUsage1, Runtime.getRuntime().totalMemory(),
Runtime.getRuntime().freeMemory());
if (!reachedMax && numUpdates >= 50) {
- LOG.info("After adjustments => NumInserts={}, NumUpdates={},
NumDeletes=50, maxUniqueRecords={}", numInserts, (numUpdates - 50),
maxUniqueKeys);
+ log.info("After adjustments => NumInserts={}, NumUpdates={},
NumDeletes=50, maxUniqueRecords={}", numInserts, (numUpdates - 50),
maxUniqueKeys);
// if we generate update followed by deletes -> some keys in update
batch might be picked up for deletes. Hence generating delete batch followed by
updates
deleteStream =
dataGenerator.generateUniqueDeleteRecordStream(instantTime, 50, false,
schemaStr, 0L).map(AbstractBaseTestSource::toGenericRecord);
updateStream = dataGenerator.generateUniqueUpdatesStream(instantTime,
numUpdates - 50, schemaStr, 0L)
.map(AbstractBaseTestSource::toGenericRecord);
} else {
- LOG.info("After adjustments => NumInserts={}, NumUpdates={},
maxUniqueRecords={}", numInserts, numUpdates, maxUniqueKeys);
+ log.info("After adjustments => NumInserts={}, NumUpdates={},
maxUniqueRecords={}", numInserts, numUpdates, maxUniqueKeys);
updateStream = dataGenerator.generateUniqueUpdatesStream(instantTime,
numUpdates, schemaStr, 0L)
.map(AbstractBaseTestSource::toGenericRecord);
}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/DistributedTestDataSource.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/DistributedTestDataSource.java
index 224acd6016a9..017507f2f9ec 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/DistributedTestDataSource.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/DistributedTestDataSource.java
@@ -26,12 +26,11 @@ import org.apache.hudi.utilities.config.SourceTestConfig;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.InputBatch;
+import lombok.extern.slf4j.Slf4j;
import org.apache.avro.generic.GenericRecord;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@@ -39,10 +38,9 @@ import java.util.stream.IntStream;
/**
* A Test DataSource which scales test-data generation by using spark
parallelism.
*/
+@Slf4j
public class DistributedTestDataSource extends AbstractBaseTestSource {
- private static final Logger LOG =
LoggerFactory.getLogger(DistributedTestDataSource.class);
-
private final int numTestSourcePartitions;
public DistributedTestDataSource(TypedProperties props, JavaSparkContext
sparkContext, SparkSession sparkSession,
@@ -55,7 +53,7 @@ public class DistributedTestDataSource extends
AbstractBaseTestSource {
protected InputBatch<JavaRDD<GenericRecord>>
readFromCheckpoint(Option<Checkpoint> lastCheckpoint, long sourceLimit) {
int nextCommitNum = lastCheckpoint.map(s ->
Integer.parseInt(s.getCheckpointKey()) + 1).orElse(0);
String instantTime = String.format("%05d", nextCommitNum);
- LOG.info("Source Limit is set to {}", sourceLimit);
+ log.info("Source Limit is set to {}", sourceLimit);
// No new data.
if (sourceLimit <= 0) {
@@ -73,7 +71,7 @@ public class DistributedTestDataSource extends
AbstractBaseTestSource {
JavaRDD<GenericRecord> avroRDD =
sparkContext.parallelize(IntStream.range(0,
numTestSourcePartitions).boxed().collect(Collectors.toList()),
numTestSourcePartitions).mapPartitionsWithIndex((p, idx) -> {
- LOG.info("Initializing source with newProps={}", newProps);
+ log.info("Initializing source with newProps={}", newProps);
if (!dataGeneratorMap.containsKey(p)) {
initDataGen(newProps, p);
}