yihua commented on code in PR #5269:
URL: https://github.com/apache/hudi/pull/5269#discussion_r980459872
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java:
##########
@@ -732,31 +732,31 @@ public void setupWriteClient() throws IOException {
if ((null != schemaProvider)) {
Schema sourceSchema = schemaProvider.getSourceSchema();
Schema targetSchema = schemaProvider.getTargetSchema();
- reInitWriteClient(sourceSchema, targetSchema);
+ createNewWriteClientWithSchema(sourceSchema, targetSchema);
}
}
- private void reInitWriteClient(Schema sourceSchema, Schema targetSchema)
throws IOException {
+ private void createNewWriteClientWithSchema(Schema sourceSchema, Schema
targetSchema) throws IOException {
LOG.info("Setting up new Hoodie Write Client");
if (isDropPartitionColumns()) {
targetSchema = HoodieAvroUtils.removeFields(targetSchema,
getPartitionColumns(keyGenerator, props));
}
registerAvroSchemas(sourceSchema, targetSchema);
- HoodieWriteConfig hoodieCfg = getHoodieClientConfig(targetSchema);
- if (hoodieCfg.isEmbeddedTimelineServerEnabled()) {
+ HoodieWriteConfig writeConfig = getHoodieClientConfig(targetSchema);
+ if (writeConfig.isEmbeddedTimelineServerEnabled()) {
if (!embeddedTimelineService.isPresent()) {
- embeddedTimelineService =
EmbeddedTimelineServerHelper.createEmbeddedTimelineService(new
HoodieSparkEngineContext(jssc), hoodieCfg);
+ embeddedTimelineService =
EmbeddedTimelineServerHelper.createEmbeddedTimelineService(new
HoodieSparkEngineContext(jssc), writeConfig);
Review Comment:
We should keep one long-running timeline service in DeltaSync, instead of
shutting down (by closing the write client) and restarting the timeline server.
In the write client, we should change the behavior of shutting down the
timeline server. If the timeline server instance is passed into the
constructor externally (`public SparkRDDWriteClient(HoodieEngineContext
context, HoodieWriteConfig writeConfig, Option<EmbeddedTimelineService>
timelineService)`), the write client should not shut it down, and should let
the caller to handle the lifecycle.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/AsyncCompactService.java:
##########
@@ -81,8 +82,13 @@ protected Pair<CompletableFuture, ExecutorService>
startService() {
if (null != instant) {
LOG.info("Starting Compaction for instant " + instant);
+ synchronized (writeConfigUpdateLock) {
+ compactor = createCompactor();
Review Comment:
We should only reinitialize the async compactor/clusterer when the write
config is updated, not for every execution.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]