zhangyue19921010 commented on code in PR #13285:
URL: https://github.com/apache/hudi/pull/13285#discussion_r2086284161


##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java:
##########
@@ -350,6 +350,7 @@ private boolean flushBucket(RowDataBucket bucket) {
     final List<WriteStatus> writeStatus = writeRecords(instant, bucket);
     final WriteMetadataEvent event = WriteMetadataEvent.builder()
         .taskID(taskID)
+        .checkpointId(this.checkpointId)

Review Comment:
   `this.checkpointId -> checkpointId` Unify coding style



##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java:
##########
@@ -317,6 +310,28 @@ public void subtaskReady(int i, SubtaskGateway 
subtaskGateway) {
     this.gateways[i] = subtaskGateway;
   }
 
+  @Override
+  public CompletableFuture<CoordinationResponse> 
handleCoordinationRequest(CoordinationRequest request) {
+    Correspondent.InstantTimeRequest instantTimeRequest = 
(Correspondent.InstantTimeRequest) request;
+    long checkpointId = instantTimeRequest.getCheckpointId();
+    Pair<String, WriteMetadataEvent[]> instantTimeAndEventBuffer = 
this.eventBuffers.get(checkpointId);
+    final String instantTime;
+    if (instantTimeAndEventBuffer == null) {
+      synchronized (this.eventBuffers) {

Review Comment:
   Synchronization on a non-final field 'this.eventBuffers' ,  if the reference 
of the variable has changed, causing synchronized to not take effect. 
   
   Although the eventBuffers reference will not change in the current 
implementation, it is still recommended to use a final variable to avoid the 
risk of missing this in subsequent feature iterations.



##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java:
##########
@@ -352,42 +367,38 @@ private static void 
initMetadataTable(HoodieFlinkWriteClient<?> writeClient) {
     writeClient.initMetadataTable();
   }
 
-  private static CkpMetadata initCkpMetadata(HoodieWriteConfig writeConfig, 
Configuration conf) throws IOException {
-    CkpMetadata ckpMetadata = CkpMetadataFactory.getCkpMetadata(writeConfig, 
conf);
-    ckpMetadata.bootstrap();
-    return ckpMetadata;
-  }
-
   private void initClientIds(Configuration conf) {
     this.clientIds = ClientIds.builder().conf(conf).build();
     this.clientIds.start();
   }
 
-  private void reset() {
-    this.eventBuffer = new WriteMetadataEvent[this.parallelism];
+  private void reset(long checkpointId) {
+    this.eventBuffers.remove(checkpointId);
   }
 
   /**
    * Checks the buffer is ready to commit.
    */
-  private boolean allEventsReceived() {
+  private boolean allEventsReceived(WriteMetadataEvent[] eventBuffer) {
     return Arrays.stream(eventBuffer)
         // we do not use event.isReady to check the instant
         // because the write task may send an event eagerly for empty
         // data set, the even may have a timestamp of last committed instant.
         .allMatch(event -> event != null && event.isLastBatch());
   }
 
-  private void addEventToBuffer(WriteMetadataEvent event) {
-    if (this.eventBuffer[event.getTaskID()] != null
-        && 
this.eventBuffer[event.getTaskID()].getInstantTime().equals(event.getInstantTime()))
 {
-      this.eventBuffer[event.getTaskID()].mergeWith(event);
+  private WriteMetadataEvent[] addEventToBuffer(WriteMetadataEvent event) {
+    WriteMetadataEvent[] eventBuffer = 
this.eventBuffers.get(event.getCheckpointId()).getRight();
+    if (eventBuffer[event.getTaskID()] != null

Review Comment:
   do we need to lock `eventBuffers` here? 



##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java:
##########
@@ -397,75 +408,66 @@ private void startInstant() {
     this.instant = this.writeClient.startCommit(tableState.commitAction, 
this.metaClient);
     
this.metaClient.getActiveTimeline().transitionRequestedToInflight(tableState.commitAction,
 this.instant);
     this.writeClient.setWriteTimer(tableState.commitAction);
-    this.ckpMetadata.startInstant(this.instant);
     LOG.info("Create instant [{}] for table [{}] with type [{}]", this.instant,
         this.conf.getString(FlinkOptions.TABLE_NAME), 
conf.getString(FlinkOptions.TABLE_TYPE));
+    return this.instant;
   }
 
   /**
-   * Initializes the instant.
-   *
-   * <p>Recommits the last inflight instant if the write metadata checkpoint 
successfully
+   * Recommits the last inflight instant if the write metadata checkpoint 
successfully
    * but was not committed due to some rare cases.
-   *
-   * <p>Starts a new instant, a writer can not flush data buffer
-   * until it finds a new inflight instant on the timeline.
    */
-  private void initInstant(String instant) {
+  private void recommitInstant(long checkpointId, String instant, 
WriteMetadataEvent[] bootstrapBuffer) {
     HoodieTimeline completedTimeline = 
this.metaClient.getActiveTimeline().filterCompletedInstants();
     if (instant.equals(WriteMetadataEvent.BOOTSTRAP_INSTANT) || 
completedTimeline.containsInstant(instant)) {
       // the last instant committed successfully
-      reset();
+      // no-op
     } else {
       LOG.info("Recommit instant {}", instant);
-      // Recommit should start heartbeat for lazy failed writes clean policy 
to avoid aborting for heartbeat expired.
+      // Recommit should start heartbeat for lazy failed writes clean policy 
to avoid aborting for heartbeat expired;
+      // The following up checkpoints would recommit the instant.
       if (writeClient.getConfig().getFailedWritesCleanPolicy().isLazy()) {
         writeClient.getHeartbeatClient().start(instant);
       }
-      commitInstant(instant);
-    }
-    // stop the heartbeat for old instant
-    if (writeClient.getConfig().getFailedWritesCleanPolicy().isLazy() && 
!WriteMetadataEvent.BOOTSTRAP_INSTANT.equals(this.instant)) {
-      writeClient.getHeartbeatClient().stop(this.instant);
+      commitInstant(checkpointId, instant, bootstrapBuffer);
     }
-    // starts a new instant
-    startInstant();
     // upgrade downgrade
     this.writeClient.upgradeDowngrade(this.instant, this.metaClient);
   }
 
   private void handleBootstrapEvent(WriteMetadataEvent event) {
-    this.eventBuffer[event.getTaskID()] = event;
+    cleanLegacyEvent(event);
+    WriteMetadataEvent[] eventBuffer = getOrCreateBootstrapBuffer(event);
+    eventBuffer[event.getTaskID()] = event;
     if (Arrays.stream(eventBuffer).allMatch(evt -> evt != null && 
evt.isBootstrap())) {
-      // start to initialize the instant.
-      final String instant = Arrays.stream(eventBuffer)
-          .filter(evt -> evt.getWriteStatuses().size() > 0)
-          .findFirst().map(WriteMetadataEvent::getInstantTime)
-          .orElse(WriteMetadataEvent.BOOTSTRAP_INSTANT);
-
-      // if currentInstant is pending && bootstrap event instant is empty
-      // reuse currentInstant, reject bootstrap
-      if 
(this.metaClient.reloadActiveTimeline().filterInflightsAndRequested().containsInstant(this.instant)
-              && instant.equals(WriteMetadataEvent.BOOTSTRAP_INSTANT)
-              && this.tableState.operationType == WriteOperationType.INSERT) {
-        LOG.warn("Reuse current pending Instant {} with {} operationType, "
-                + "ignoring empty bootstrap event.", this.instant, 
WriteOperationType.INSERT.value());
-        reset();
-
-        // send commit act event to unblock write tasks
-        sendCommitAckEvents(-1L);
-        return;
-      }
+      // start to recommit the instant.
+      recommitInstant(event.getCheckpointId(), event.getInstantTime(), 
eventBuffer);
+    }
+  }
+
+  private void cleanLegacyEvent(WriteMetadataEvent event) {
+    this.eventBuffers.entrySet().stream()
+        .filter(entry -> entry.getKey().compareTo(event.getCheckpointId()) > 0)

Review Comment:
   `event.getCheckpointId()) > 0` ? I understand that this is to clean up all 
the remaining buffers before the current event. Is it to clean up the data of 
smaller checkpointID?



##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java:
##########
@@ -397,75 +408,66 @@ private void startInstant() {
     this.instant = this.writeClient.startCommit(tableState.commitAction, 
this.metaClient);
     
this.metaClient.getActiveTimeline().transitionRequestedToInflight(tableState.commitAction,
 this.instant);
     this.writeClient.setWriteTimer(tableState.commitAction);
-    this.ckpMetadata.startInstant(this.instant);
     LOG.info("Create instant [{}] for table [{}] with type [{}]", this.instant,
         this.conf.getString(FlinkOptions.TABLE_NAME), 
conf.getString(FlinkOptions.TABLE_TYPE));
+    return this.instant;
   }
 
   /**
-   * Initializes the instant.
-   *
-   * <p>Recommits the last inflight instant if the write metadata checkpoint 
successfully
+   * Recommits the last inflight instant if the write metadata checkpoint 
successfully
    * but was not committed due to some rare cases.
-   *
-   * <p>Starts a new instant, a writer can not flush data buffer
-   * until it finds a new inflight instant on the timeline.
    */
-  private void initInstant(String instant) {
+  private void recommitInstant(long checkpointId, String instant, 
WriteMetadataEvent[] bootstrapBuffer) {
     HoodieTimeline completedTimeline = 
this.metaClient.getActiveTimeline().filterCompletedInstants();
     if (instant.equals(WriteMetadataEvent.BOOTSTRAP_INSTANT) || 
completedTimeline.containsInstant(instant)) {
       // the last instant committed successfully
-      reset();
+      // no-op
     } else {
       LOG.info("Recommit instant {}", instant);
-      // Recommit should start heartbeat for lazy failed writes clean policy 
to avoid aborting for heartbeat expired.
+      // Recommit should start heartbeat for lazy failed writes clean policy 
to avoid aborting for heartbeat expired;
+      // The following up checkpoints would recommit the instant.

Review Comment:
   `writeClient.getHeartbeatClient().start(instant);` can not update heart beat 
directly. Because there is `instantToHeartbeatMap` cache in 
HoodieHeartBeatClient which may return directly instead of update heart beat 
time. 
   
   So that we need to 
   ```
         if (writeClient.getConfig().getFailedWritesCleanPolicy().isLazy()) {
           if (writeClient.getHeartbeatClient().getHeartbeat(instant) != null) {
             LOG.info("Removing heartbeat for instant " + instant);
             writeClient.getHeartbeatClient().stop(instant);
             writeClient.getHeartbeatClient().reomveHeartbeat(instant);
           }
           writeClient.getHeartbeatClient().start(instant);
         }
   ```



##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/event/WriteMetadataEvent.java:
##########
@@ -38,6 +38,7 @@ public class WriteMetadataEvent implements OperatorEvent {
 
   private List<WriteStatus> writeStatuses;
   private int taskID;
+  private long checkpointId;

Review Comment:
   Upgrading existing Flink ingestion job with the current PR may cause 
compatibility issues, such as state restore failure 



##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java:
##########
@@ -501,44 +503,22 @@ private void handleWriteMetaEvent(WriteMetadataEvent 
event) {
     addEventToBuffer(event);
   }
 
-  /**
-   * The coordinator reuses the instant if there is no data for this round of 
checkpoint,
-   * sends the commit ack events to unblock the flushing.
-   */
-  private void sendCommitAckEvents(long checkpointId) {
-    CompletableFuture<?>[] futures = 
Arrays.stream(this.gateways).filter(Objects::nonNull)
-        .map(gw -> gw.sendEvent(CommitAckEvent.getInstance(checkpointId)))
-        .toArray(CompletableFuture<?>[]::new);
-    CompletableFuture.allOf(futures).whenComplete((resp, error) -> {
-      if (!sendToFinishedTasks(error)) {
-        throw new HoodieException("Error while waiting for the commit ack 
events to finish sending", error);
-      }
-    });
-  }
-
-  /**
-   * Decides whether the given exception is caused by sending events to 
FINISHED tasks.
-   *
-   * <p>Ugly impl: the exception may change in the future.
-   */
-  private static boolean sendToFinishedTasks(Throwable throwable) {
-    return throwable.getCause() instanceof TaskNotRunningException
-        || throwable.getCause().getMessage().contains("running");
-  }
-
   /**
    * Commits the instant.
    */
-  private boolean commitInstant(String instant) {
-    return commitInstant(instant, -1);
+  private boolean commitInstants(long checkpointId) {
+    // use < instead of <= because the write metadata event sends the last 
known checkpoint id which is smaller than the current one.
+    List<Boolean> result = this.eventBuffers.entrySet().stream().filter(entry 
-> entry.getKey() < checkpointId)
+        .map(entry -> commitInstant(entry.getKey(), 
entry.getValue().getLeft(), 
entry.getValue().getRight())).collect(Collectors.toList());

Review Comment:
   It looks like a one-time commit of all instants before. Here need to ensure 
that all historical commits are complete and intact.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to