codope commented on a change in pull request #5138: URL: https://github.com/apache/hudi/pull/5138#discussion_r838204510
########## File path: hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java ########## @@ -404,6 +423,79 @@ public void testLoadArchiveTimelineWithDamagedPlanFile(boolean enableArchiveMerg assertThrows(HoodieException.class, () -> metaClient.getArchivedTimeline().reload()); } + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testArchivalWithMultiWriters(boolean enableMetadata) throws Exception { + HoodieWriteConfig writeConfig = initTestTableAndGetWriteConfig(enableMetadata, 2, 4, 5, 2, + HoodieTableType.COPY_ON_WRITE, false, 10, 209715200, + HoodieFailedWritesCleaningPolicy.LAZY, WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL); + + final ExecutorService executors = Executors.newFixedThreadPool(2); + List<CompletableFuture<Boolean>> completableFutureList = new ArrayList<>(); + CountDownLatch countDownLatch = new CountDownLatch(1); + IntStream.range(0, 2).forEach(index -> { + completableFutureList.add(CompletableFuture.supplyAsync(() -> { + HoodieTable table = HoodieSparkTable.create(writeConfig, context, metaClient); + try { + countDownLatch.await(30, TimeUnit.SECONDS); + } catch (InterruptedException e) { + e.printStackTrace(); + } + metaClient.reloadActiveTimeline(); + while (!metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().lastInstant().get().getTimestamp().endsWith("29") + || metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().countInstants() > 4) { + try { + //System.out.println("Archiving " + index + ", total completed instants " + metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().countInstants()); Review comment: let's remove these ########## File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java ########## @@ -468,14 +469,16 @@ protected void preWrite(String instantTime, WriteOperationType writeOperationTyp * @param metadata Commit Metadata corresponding to committed instant * @param instantTime Instant Time * @param extraMetadata Additional Metadata passed by user + * @param acquireLockForArchival true if lock has to be acquired for archival. false otherwise. */ - protected void postCommit(HoodieTable table, HoodieCommitMetadata metadata, String instantTime, Option<Map<String, String>> extraMetadata) { + protected void postCommit(HoodieTable table, HoodieCommitMetadata metadata, String instantTime, Option<Map<String, String>> extraMetadata, + boolean acquireLockForArchival) { Review comment: is there a need to pass this boolean argument? can't we deduce base on config? typically, for multi-writer scenario users would have already enabled OCC mode. so why not take a lock whenever this mode is on? ########## File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java ########## @@ -167,6 +178,9 @@ public boolean archiveIfRequired(HoodieEngineContext context) throws IOException return success; } finally { close(); + if (acquireLock) { + txnManager.close(); Review comment: should we have finally block and end transaction? ########## File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java ########## @@ -143,11 +147,18 @@ private void close() { } } + public boolean archiveIfRequired(HoodieEngineContext context) throws IOException { + return archiveIfRequired(context, false); + } + /** * Check if commits need to be archived. If yes, archive commits. */ - public boolean archiveIfRequired(HoodieEngineContext context) throws IOException { + public boolean archiveIfRequired(HoodieEngineContext context, boolean acquireLock) throws IOException { try { + if (acquireLock) { + txnManager.beginTransaction(); Review comment: we just beginTransaction() in any case.. if there is no lock then beginTransaction() is a no-op anyway right? ########## File path: hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java ########## @@ -404,6 +423,79 @@ public void testLoadArchiveTimelineWithDamagedPlanFile(boolean enableArchiveMerg assertThrows(HoodieException.class, () -> metaClient.getArchivedTimeline().reload()); } + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testArchivalWithMultiWriters(boolean enableMetadata) throws Exception { + HoodieWriteConfig writeConfig = initTestTableAndGetWriteConfig(enableMetadata, 2, 4, 5, 2, + HoodieTableType.COPY_ON_WRITE, false, 10, 209715200, + HoodieFailedWritesCleaningPolicy.LAZY, WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL); + + final ExecutorService executors = Executors.newFixedThreadPool(2); + List<CompletableFuture<Boolean>> completableFutureList = new ArrayList<>(); + CountDownLatch countDownLatch = new CountDownLatch(1); + IntStream.range(0, 2).forEach(index -> { + completableFutureList.add(CompletableFuture.supplyAsync(() -> { + HoodieTable table = HoodieSparkTable.create(writeConfig, context, metaClient); + try { + countDownLatch.await(30, TimeUnit.SECONDS); + } catch (InterruptedException e) { + e.printStackTrace(); + } + metaClient.reloadActiveTimeline(); + while (!metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().lastInstant().get().getTimestamp().endsWith("29") + || metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().countInstants() > 4) { + try { + //System.out.println("Archiving " + index + ", total completed instants " + metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().countInstants()); + //System.out.println("Last active instant " + metaClient.reloadActiveTimeline().getCommitsTimeline().filterCompletedInstants().lastInstant().get().toString()); + HoodieTimelineArchiver archiver = new HoodieTimelineArchiver(writeConfig, table); + archiver.archiveIfRequired(context, true); + // if not for below sleep, both archiving threads acquires lock in quick succession and does not give space for main thread + // to complete the write operation when metadata table is enabled. + if (enableMetadata) { + Thread.sleep(2); Review comment: why is this delay required only when metadata is enabled? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org