This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new b31d5f7a4409 refactor: rewrite executors tests to avoid code 
duplication  (#18005)
b31d5f7a4409 is described below

commit b31d5f7a44093afeb79142dcff373af857ac0c6d
Author: yaojiejia <[email protected]>
AuthorDate: Tue Mar 10 23:39:34 2026 -0400

    refactor: rewrite executors tests to avoid code duplication  (#18005)
---
 .../hudi/execution/BaseExecutorTestHarness.java    | 324 +++++++++++++++++++++
 .../TestBoundedInMemoryExecutorInSpark.java        | 136 +++------
 .../execution/TestDisruptorExecutionInSpark.java   | 137 ++-------
 .../hudi/execution/TestSimpleExecutionInSpark.java | 196 +------------
 4 files changed, 395 insertions(+), 398 deletions(-)

diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/BaseExecutorTestHarness.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/BaseExecutorTestHarness.java
new file mode 100644
index 000000000000..70f1207d9e89
--- /dev/null
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/BaseExecutorTestHarness.java
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.execution;
+
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.testutils.InProcessTimeGenerator;
+import org.apache.hudi.common.util.queue.BaseHoodieQueueBasedExecutor;
+import org.apache.hudi.common.util.queue.HoodieConsumer;
+import org.apache.hudi.common.util.queue.HoodieExecutor;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.testutils.HoodieSparkClientTestHarness;
+
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.spark.TaskContext;
+import org.apache.spark.TaskContext$;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Base test harness for write executor tests.
+ * Provides common test logic to avoid duplication across different executor 
implementations.
+ */
+public abstract class BaseExecutorTestHarness extends 
HoodieSparkClientTestHarness {
+
+  protected final String instantTime = 
InProcessTimeGenerator.createNewInstantTime();
+
+  @BeforeEach
+  public void setUp() throws Exception {
+    initTestDataGenerator();
+    if (requiresExecutorService()) {
+      initExecutorServiceWithFixedThreadPool(2);
+    }
+  }
+
+  @AfterEach
+  public void tearDown() throws Exception {
+    cleanupResources();
+  }
+
+  /**
+   * Create an executor instance for testing.
+   *
+   * @param records iterator of records to process
+   * @param consumer consumer to process records
+   * @return executor instance
+   */
+  protected abstract HoodieExecutor<Integer> createExecutor(
+      Iterator<HoodieRecord> records, HoodieConsumer<HoodieRecord, Integer> 
consumer);
+
+  /**
+   * Whether the executor requires setting up an executor service.
+   * @return true if executor service is needed
+   */
+  protected boolean requiresExecutorService() {
+    return true;
+  }
+
+  /**
+   * Whether the executor supports interrupt testing.
+   * @return true if interrupt test should be run
+   */
+  protected boolean supportsInterruptTest() {
+    return false;
+  }
+
+  /**
+   * Whether the executor supports running status check.
+   * @return true if running status check is supported
+   */
+  protected boolean supportsRunningStatusCheck() {
+    return false;
+  }
+
+  /**
+   * Get pre-execute runnable for Spark task context propagation.
+   * @return runnable to execute before consuming records
+   */
+  protected Runnable getPreExecuteRunnable() {
+    final TaskContext taskContext = TaskContext.get();
+    return () -> TaskContext$.MODULE$.setTaskContext(taskContext);
+  }
+
+  /**
+   * Create a simple counting consumer.
+   *
+   * @param consumedRecords list to store consumed records
+   * @param <T> record type
+   * @return consumer instance
+   */
+  protected <T> HoodieConsumer<T, Integer> createCountingConsumer(final 
List<T> consumedRecords) {
+    return new HoodieConsumer<T, Integer>() {
+      private int count = 0;
+
+      @Override
+      public void consume(T record) throws Exception {
+        consumedRecords.add(record);
+        count++;
+      }
+
+      @Override
+      public Integer finish() {
+        return count;
+      }
+    };
+  }
+
+  /**
+   * Create a consumer that waits indefinitely (for interrupt testing).
+   *
+   * @param <T> record type
+   * @return consumer instance
+   */
+  protected <T> HoodieConsumer<T, Integer> createWaitingConsumer() {
+    return new HoodieConsumer<T, Integer>() {
+      @Override
+      public void consume(T record) {
+        try {
+          synchronized (this) {
+            wait();
+          }
+        } catch (InterruptedException ie) {
+          // Expected for interrupt tests
+        }
+      }
+
+      @Override
+      public Integer finish() {
+        return 0;
+      }
+    };
+  }
+
+  @Test
+  public void testExecutor() {
+    final int numRecords = 128;
+    final List<HoodieRecord> hoodieRecords = 
dataGen.generateInserts(instantTime, numRecords);
+    final List<HoodieRecord> consumedRecords = new ArrayList<>();
+
+    HoodieConsumer<HoodieRecord, Integer> consumer = 
createCountingConsumer(consumedRecords);
+    HoodieExecutor<Integer> exec = null;
+
+    try {
+      exec = createExecutor(hoodieRecords.iterator(), consumer);
+      int result = exec.execute();
+
+      // Verify all records were processed
+      assertEquals(numRecords, result);
+
+      // Verify consumed records are identical to produced ones and ordering 
is preserved
+      assertEquals(hoodieRecords, consumedRecords);
+
+      // Check running status if supported
+      if (supportsRunningStatusCheck()) {
+        BaseHoodieQueueBasedExecutor queueExec = 
(BaseHoodieQueueBasedExecutor) exec;
+        assertFalse(queueExec.isRunning());
+      }
+
+    } finally {
+      if (exec != null) {
+        exec.shutdownNow();
+      }
+    }
+  }
+
+  @Test
+  @Timeout(value = 60)
+  public void testRecordReading() {
+    final int numRecords = 100;
+    final List<HoodieRecord> hoodieRecords = 
dataGen.generateInserts(instantTime, numRecords);
+    ArrayList<HoodieRecord> beforeRecord = new ArrayList<>();
+    ArrayList<IndexedRecord> beforeIndexedRecord = new ArrayList<>();
+    ArrayList<HoodieRecord> afterRecord = new ArrayList<>();
+    ArrayList<IndexedRecord> afterIndexedRecord = new ArrayList<>();
+
+    hoodieRecords.forEach(record -> {
+      beforeRecord.add(record);
+      beforeIndexedRecord.add((IndexedRecord) record.getData());
+    });
+
+    HoodieConsumer<HoodieRecord, Integer> consumer =
+        new HoodieConsumer<HoodieRecord, Integer>() {
+          private int count = 0;
+
+          @Override
+          public void consume(HoodieRecord record) {
+            count++;
+            afterRecord.add(record);
+            afterIndexedRecord.add((IndexedRecord) record.getData());
+          }
+
+          @Override
+          public Integer finish() {
+            return count;
+          }
+        };
+
+    HoodieExecutor<Integer> exec = null;
+
+    try {
+      exec = createExecutor(hoodieRecords.iterator(), consumer);
+      int result = exec.execute();
+      assertEquals(numRecords, result);
+
+      assertEquals(beforeRecord, afterRecord);
+      assertEquals(beforeIndexedRecord, afterIndexedRecord);
+
+    } finally {
+      if (exec != null) {
+        exec.shutdownNow();
+      }
+    }
+  }
+
+  @Test
+  @Timeout(value = 60)
+  public void testException() {
+    final int numRecords = 1000;
+    final String errorMessage = "Exception when iterating records!!!";
+
+    List<HoodieRecord> pRecs = dataGen.generateInserts(instantTime, 
numRecords);
+    Iterator<HoodieRecord> iterator = new ThrowingIterator(pRecs.iterator(), 
errorMessage, numRecords / 10);
+
+    HoodieConsumer<HoodieRecord, Integer> consumer =
+        new HoodieConsumer<HoodieRecord, Integer>() {
+          int count = 0;
+
+          @Override
+          public void consume(HoodieRecord payload) throws Exception {
+            count++;
+          }
+
+          @Override
+          public Integer finish() {
+            return count;
+          }
+        };
+
+    HoodieExecutor<Integer> exec = createExecutor(iterator, consumer);
+
+    final Throwable thrown = assertThrows(HoodieException.class, exec::execute,
+        "exception is expected");
+    assertTrue(thrown.getMessage().contains(errorMessage));
+  }
+
+  @Test
+  @Timeout(value = 60)
+  public void testInterruptExecutor() {
+    if (!supportsInterruptTest()) {
+      return;
+    }
+
+    final List<HoodieRecord> hoodieRecords = 
dataGen.generateInserts(instantTime, 100);
+    HoodieConsumer<HoodieRecord, Integer> consumer = createWaitingConsumer();
+
+    HoodieExecutor<Integer> executor = 
createExecutor(hoodieRecords.iterator(), consumer);
+
+    try {
+      Thread.currentThread().interrupt();
+      assertThrows(HoodieException.class, executor::execute);
+      assertTrue(Thread.interrupted());
+    } catch (Exception e) {
+      // Expected
+    }
+  }
+
+  /**
+   * Iterator that throws an exception after a certain number of elements.
+   */
+  protected static class ThrowingIterator implements Iterator<HoodieRecord> {
+    private final Iterator<HoodieRecord> iterator;
+    private final AtomicInteger count = new AtomicInteger(0);
+    private final String errorMessage;
+    private final int errorMessageCount;
+
+    public ThrowingIterator(Iterator<HoodieRecord> iterator, String 
errorMessage, int errorMessageCount) {
+      this.iterator = iterator;
+      this.errorMessage = errorMessage;
+      this.errorMessageCount = errorMessageCount;
+    }
+
+    @Override
+    public boolean hasNext() {
+      return iterator.hasNext();
+    }
+
+    @Override
+    public HoodieRecord next() {
+      if (count.get() == errorMessageCount) {
+        throw new HoodieException(errorMessage);
+      }
+      count.incrementAndGet();
+      return iterator.next();
+    }
+  }
+}
+
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryExecutorInSpark.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryExecutorInSpark.java
index 0ff12fe19e89..26b21661dbf0 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryExecutorInSpark.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryExecutorInSpark.java
@@ -19,71 +19,44 @@
 package org.apache.hudi.execution;
 
 import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.testutils.InProcessTimeGenerator;
-import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor;
 import org.apache.hudi.common.util.queue.ExecutorType;
 import org.apache.hudi.common.util.queue.HoodieConsumer;
+import org.apache.hudi.common.util.queue.HoodieExecutor;
 import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.exception.HoodieException;
-import org.apache.hudi.testutils.HoodieSparkClientTestHarness;
-
-import org.apache.avro.generic.IndexedRecord;
-import org.apache.spark.TaskContext;
-import org.apache.spark.TaskContext$;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
+
 import org.junit.jupiter.api.Test;
 
 import java.util.Iterator;
-import java.util.List;
 
-import scala.Tuple2;
 
 import static 
org.apache.hudi.common.testutils.HoodieTestDataGenerator.HOODIE_SCHEMA;
 import static 
org.apache.hudi.execution.HoodieLazyInsertIterable.getTransformerInternal;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
-public class TestBoundedInMemoryExecutorInSpark extends 
HoodieSparkClientTestHarness {
-
-  private final String instantTime = 
InProcessTimeGenerator.createNewInstantTime();
+/**
+ * Tests for {@link BoundedInMemoryExecutor}.
+ */
+public class TestBoundedInMemoryExecutorInSpark extends 
BaseExecutorTestHarness {
 
   private final HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder()
       .withExecutorType(ExecutorType.BOUNDED_IN_MEMORY.name())
       .withWriteBufferLimitBytes(1024)
       .build(false);
 
-  @BeforeEach
-  public void setUp() throws Exception {
-    initTestDataGenerator();
-  }
-
-  @AfterEach
-  public void tearDown() throws Exception {
-    cleanupResources();
-  }
-
-  private Runnable getPreExecuteRunnable() {
-    final TaskContext taskContext = TaskContext.get();
-    return () -> TaskContext$.MODULE$.setTaskContext(taskContext);
-  }
-
-  @Test
-  public void testExecutor() {
-
-    final int recordNumber = 100;
-    final List<HoodieRecord> hoodieRecords = 
dataGen.generateInserts(instantTime, recordNumber);
-
-    
HoodieConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>,
 Integer> consumer =
+  @Override
+  protected HoodieExecutor<Integer> createExecutor(
+      Iterator<HoodieRecord> records, HoodieConsumer<HoodieRecord, Integer> 
consumer) {
+    // BoundedInMemoryExecutor with getTransformerInternal produces 
HoodieInsertValueGenResult
+    @SuppressWarnings("rawtypes")
+    
HoodieConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>,
 Integer> wrappedConsumer =
         new 
HoodieConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>,
 Integer>() {
-
           private int count = 0;
 
           @Override
-          public void 
consume(HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord> 
record) {
+          public void 
consume(HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord> 
result) throws Exception {
+            // Extract the HoodieRecord from the result and pass to original 
consumer
+            consumer.consume(result.getResult());
             count++;
           }
 
@@ -93,62 +66,30 @@ public class TestBoundedInMemoryExecutorInSpark extends 
HoodieSparkClientTestHar
           }
         };
 
-    BoundedInMemoryExecutor<HoodieRecord, Tuple2<HoodieRecord, 
Option<IndexedRecord>>, Integer> executor = null;
-    try {
-      executor = new 
BoundedInMemoryExecutor(writeConfig.getWriteBufferLimitBytes(), 
hoodieRecords.iterator(), consumer,
-          getTransformerInternal(HOODIE_SCHEMA, writeConfig), 
getPreExecuteRunnable());
-      int result = executor.execute();
-
-      assertEquals(100, result);
-      // There should be no remaining records in the buffer
-      assertFalse(executor.isRunning());
-    } finally {
-      if (executor != null) {
-        executor.shutdownNow();
-        executor.awaitTermination();
-      }
-    }
+    @SuppressWarnings({"unchecked", "rawtypes"})
+    BoundedInMemoryExecutor executor = new BoundedInMemoryExecutor(
+        writeConfig.getWriteBufferLimitBytes(),
+        records,
+        wrappedConsumer,
+        getTransformerInternal(HOODIE_SCHEMA, writeConfig),
+        getPreExecuteRunnable());
+    
+    return executor;
   }
 
-  @Test
-  public void testInterruptExecutor() {
-    final List<HoodieRecord> hoodieRecords = 
dataGen.generateInserts(instantTime, 100);
-
-    
HoodieConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>,
 Integer> consumer =
-        new 
HoodieConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>,
 Integer>() {
-
-          @Override
-          public void 
consume(HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord> 
record) {
-            try {
-              while (true) {
-                Thread.sleep(1000);
-              }
-            } catch (InterruptedException ie) {
-              return;
-            }
-          }
-
-          @Override
-          public Integer finish() {
-            return 0;
-          }
-        };
-
-    BoundedInMemoryExecutor<HoodieRecord, Tuple2<HoodieRecord, 
Option<IndexedRecord>>, Integer> executor =
-        new BoundedInMemoryExecutor(writeConfig.getWriteBufferLimitBytes(), 
hoodieRecords.iterator(), consumer,
-            getTransformerInternal(HOODIE_SCHEMA, writeConfig), 
getPreExecuteRunnable());
-
-    // Interrupt the current thread (therefore triggering executor to throw as 
soon as it
-    // invokes [[get]] on the [[CompletableFuture]])
-    Thread.currentThread().interrupt();
-
-    assertThrows(HoodieException.class, executor::execute);
+  @Override
+  protected boolean requiresExecutorService() {
+    return false;
+  }
 
-    // Validate that interrupted flag is reset, after [[InterruptedException]] 
is thrown
-    assertTrue(Thread.interrupted());
+  @Override
+  protected boolean supportsInterruptTest() {
+    return true;
+  }
 
-    executor.shutdownNow();
-    executor.awaitTermination();
+  @Override
+  protected boolean supportsRunningStatusCheck() {
+    return true;
   }
 
   @Test
@@ -165,6 +106,7 @@ public class TestBoundedInMemoryExecutorInSpark extends 
HoodieSparkClientTestHar
       }
     };
 
+    @SuppressWarnings("rawtypes")
     
HoodieConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>,
 Integer> consumer =
         new 
HoodieConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>,
 Integer>() {
           @Override
@@ -177,8 +119,10 @@ public class TestBoundedInMemoryExecutorInSpark extends 
HoodieSparkClientTestHar
           }
         };
 
-    BoundedInMemoryExecutor<HoodieRecord, Tuple2<HoodieRecord, 
Option<IndexedRecord>>, Integer> executor =
-        new BoundedInMemoryExecutor(writeConfig.getWriteBufferLimitBytes(), 
unboundedRecordIter,
+    @SuppressWarnings({"unchecked", "rawtypes"})
+    BoundedInMemoryExecutor executor =
+        new BoundedInMemoryExecutor(
+            writeConfig.getWriteBufferLimitBytes(), unboundedRecordIter,
             consumer, getTransformerInternal(HOODIE_SCHEMA, writeConfig),
             getPreExecuteRunnable());
     executor.shutdownNow();
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestDisruptorExecutionInSpark.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestDisruptorExecutionInSpark.java
index 49e25c5db700..5cefabb8f7a8 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestDisruptorExecutionInSpark.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestDisruptorExecutionInSpark.java
@@ -19,138 +19,45 @@
 package org.apache.hudi.execution;
 
 import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.testutils.InProcessTimeGenerator;
 import org.apache.hudi.common.util.queue.DisruptorExecutor;
 import org.apache.hudi.common.util.queue.ExecutorType;
 import org.apache.hudi.common.util.queue.HoodieConsumer;
+import org.apache.hudi.common.util.queue.HoodieExecutor;
 import org.apache.hudi.common.util.queue.WaitStrategyFactory;
 import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.exception.HoodieException;
-import org.apache.hudi.testutils.HoodieSparkClientTestHarness;
 
-import org.apache.spark.TaskContext;
-import org.apache.spark.TaskContext$;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.Timeout;
-
-import java.util.ArrayList;
-import java.util.List;
+import java.util.Iterator;
 import java.util.function.Function;
 
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertThrows;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-public class TestDisruptorExecutionInSpark extends 
HoodieSparkClientTestHarness {
-
-  private final String instantTime = 
InProcessTimeGenerator.createNewInstantTime();
+/**
+ * Tests for {@link DisruptorExecutor}.
+ */
+public class TestDisruptorExecutionInSpark extends BaseExecutorTestHarness {
 
   private final HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder()
       .withExecutorType(ExecutorType.DISRUPTOR.name())
       .withWriteExecutorDisruptorWriteBufferLimitBytes(8)
       .build(false);
 
-  @BeforeEach
-  public void setUp() throws Exception {
-    initTestDataGenerator();
-    initExecutorServiceWithFixedThreadPool(2);
-  }
-
-  @AfterEach
-  public void tearDown() throws Exception {
-    cleanupResources();
+  @Override
+  protected HoodieExecutor<Integer> createExecutor(
+      Iterator<HoodieRecord> records, HoodieConsumer<HoodieRecord, Integer> 
consumer) {
+    return new DisruptorExecutor<>(
+        writeConfig.getWriteExecutorDisruptorWriteBufferLimitBytes(),
+        records,
+        consumer,
+        Function.identity(),
+        WaitStrategyFactory.DEFAULT_STRATEGY,
+        getPreExecuteRunnable());
   }
 
-  private Runnable getPreExecuteRunnable() {
-    final TaskContext taskContext = TaskContext.get();
-    return () -> TaskContext$.MODULE$.setTaskContext(taskContext);
+  @Override
+  protected boolean supportsInterruptTest() {
+    return true;
   }
 
-  @Test
-  public void testExecutor() {
-
-    final List<HoodieRecord> hoodieRecords = 
dataGen.generateInserts(instantTime, 128);
-    final List<HoodieRecord> consumedRecords = new ArrayList<>();
-
-    HoodieConsumer<HoodieRecord, Integer> consumer =
-        new HoodieConsumer<HoodieRecord, Integer>() {
-
-          private int count = 0;
-
-          @Override
-          public void consume(HoodieRecord record) {
-            consumedRecords.add(record);
-            count++;
-          }
-
-          @Override
-          public Integer finish() {
-            return count;
-          }
-        };
-    DisruptorExecutor<HoodieRecord, HoodieRecord, Integer> exec = null;
-
-    try {
-      exec = new 
DisruptorExecutor<>(writeConfig.getWriteExecutorDisruptorWriteBufferLimitBytes(),
 hoodieRecords.iterator(), consumer,
-          Function.identity(), WaitStrategyFactory.DEFAULT_STRATEGY, 
getPreExecuteRunnable());
-      int result = exec.execute();
-      // It should buffer and write 100 records
-      assertEquals(128, result);
-      // There should be no remaining records in the buffer
-      assertFalse(exec.isRunning());
-
-      // collect all records and assert that consumed records are identical to 
produced ones
-      // assert there's no tampering, and that the ordering is preserved
-      assertEquals(hoodieRecords, consumedRecords);
-      for (int i = 0; i < hoodieRecords.size(); i++) {
-        assertEquals(hoodieRecords.get(i), consumedRecords.get(i));
-      }
-
-    } finally {
-      if (exec != null) {
-        exec.shutdownNow();
-      }
-    }
-  }
-
-  @Test
-  @Timeout(value = 60)
-  public void testInterruptExecutor() {
-    final List<HoodieRecord> hoodieRecords = 
dataGen.generateInserts(instantTime, 100);
-
-    HoodieConsumer<HoodieRecord, Integer> consumer =
-        new HoodieConsumer<HoodieRecord, Integer>() {
-
-          @Override
-          public void consume(HoodieRecord record) {
-            try {
-              synchronized (this) {
-                wait();
-              }
-            } catch (InterruptedException ie) {
-              // ignore here
-            }
-          }
-
-          @Override
-          public Integer finish() {
-            return 0;
-          }
-        };
-
-    DisruptorExecutor<HoodieRecord, HoodieRecord, Integer>
-        executor = new DisruptorExecutor<>(1024, hoodieRecords.iterator(), 
consumer,
-        Function.identity(), WaitStrategyFactory.DEFAULT_STRATEGY, 
getPreExecuteRunnable());
-
-    try {
-      Thread.currentThread().interrupt();
-      assertThrows(HoodieException.class, executor::execute);
-      assertTrue(Thread.interrupted());
-    } catch (Exception e) {
-      // ignore here
-    }
+  @Override
+  protected boolean supportsRunningStatusCheck() {
+    return true;
   }
 }
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestSimpleExecutionInSpark.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestSimpleExecutionInSpark.java
index 56e56a15b50f..77b512f3aaf6 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestSimpleExecutionInSpark.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestSimpleExecutionInSpark.java
@@ -19,199 +19,21 @@
 package org.apache.hudi.execution;
 
 import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.testutils.InProcessTimeGenerator;
 import org.apache.hudi.common.util.queue.HoodieConsumer;
+import org.apache.hudi.common.util.queue.HoodieExecutor;
 import org.apache.hudi.common.util.queue.SimpleExecutor;
-import org.apache.hudi.exception.HoodieException;
-import org.apache.hudi.testutils.HoodieSparkClientTestHarness;
 
-import org.apache.avro.generic.IndexedRecord;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.Timeout;
-
-import java.util.ArrayList;
 import java.util.Iterator;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Function;
 
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertThrows;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-public class  TestSimpleExecutionInSpark extends HoodieSparkClientTestHarness {
-
-  private final String instantTime = 
InProcessTimeGenerator.createNewInstantTime();
-
-  @BeforeEach
-  public void setUp() throws Exception {
-    initTestDataGenerator();
-    initExecutorServiceWithFixedThreadPool(2);
-  }
-
-  @AfterEach
-  public void tearDown() throws Exception {
-    cleanupResources();
-  }
-
-  @Test
-  public void testExecutor() {
-
-    final List<HoodieRecord> hoodieRecords = 
dataGen.generateInserts(instantTime, 128);
-    final List<HoodieRecord> consumedRecords = new ArrayList<>();
-
-    HoodieConsumer<HoodieRecord, Integer> consumer =
-        new HoodieConsumer<HoodieRecord, Integer>() {
-          private int count = 0;
-
-          @Override
-          public void consume(HoodieRecord record) throws Exception {
-            consumedRecords.add(record);
-            count++;
-          }
-
-          @Override
-          public Integer finish() {
-            return count;
-          }
-        };
-    SimpleExecutor<HoodieRecord, HoodieRecord, Integer> exec = null;
-
-    try {
-      exec = new SimpleExecutor<>(hoodieRecords.iterator(), consumer, 
Function.identity());
-
-      int result = exec.execute();
-      // It should buffer and write 128 records
-      assertEquals(128, result);
-
-      // collect all records and assert that consumed records are identical to 
produced ones
-      // assert there's no tampering, and that the ordering is preserved
-      assertEquals(hoodieRecords, consumedRecords);
-
-    } finally {
-      if (exec != null) {
-        exec.shutdownNow();
-      }
-    }
-  }
-
-  // Test to ensure that we are reading all records from queue iterator in the 
same order
-  // without any exceptions.
-  @SuppressWarnings("unchecked")
-  @Test
-  @Timeout(value = 60)
-  public void testRecordReading() {
-
-    final List<HoodieRecord> hoodieRecords = 
dataGen.generateInserts(instantTime, 100);
-    ArrayList<HoodieRecord> beforeRecord = new ArrayList<>();
-    ArrayList<IndexedRecord> beforeIndexedRecord = new ArrayList<>();
-    ArrayList<HoodieRecord> afterRecord = new ArrayList<>();
-    ArrayList<IndexedRecord> afterIndexedRecord = new ArrayList<>();
-
-    hoodieRecords.forEach(record -> {
-      beforeRecord.add(record);
-      beforeIndexedRecord.add((IndexedRecord) record.getData());
-    });
-
-    HoodieConsumer<HoodieRecord, Integer> consumer =
-        new HoodieConsumer<HoodieRecord, Integer>() {
-          private int count = 0;
-
-          @Override
-          public void consume(HoodieRecord record) {
-            count++;
-            afterRecord.add(record);
-            afterIndexedRecord.add((IndexedRecord) record.getData());
-          }
-
-          @Override
-          public Integer finish() {
-            return count;
-          }
-        };
-
-    SimpleExecutor<HoodieRecord, HoodieRecord, Integer> exec = null;
-
-    try {
-      exec = new SimpleExecutor<>(hoodieRecords.iterator(), consumer, 
Function.identity());
-      int result = exec.execute();
-      assertEquals(100, result);
-
-      assertEquals(beforeRecord, afterRecord);
-      assertEquals(beforeIndexedRecord, afterIndexedRecord);
-
-    } finally {
-      if (exec != null) {
-        exec.shutdownNow();
-      }
-    }
-  }
-
-  /**
-   * Test to ensure exception happen in iterator then we need to stop the 
simple ingestion.
-   */
-  @SuppressWarnings("unchecked")
-  @Test
-  @Timeout(value = 60)
-  public void testException() {
-    final int numRecords = 1000;
-    final String errorMessage = "Exception when iterating records!!!";
-
-    List<HoodieRecord> pRecs = dataGen.generateInserts(instantTime, 
numRecords);
-    InnerIterator iterator = new InnerIterator(pRecs.iterator(), errorMessage, 
numRecords / 10);
-
-    HoodieConsumer<HoodieRecord, Integer> consumer =
-        new HoodieConsumer<HoodieRecord, Integer>() {
-          int count = 0;
-
-          @Override
-          public void consume(HoodieRecord payload) throws Exception {
-            // Read recs and ensure we have covered all producer recs.
-            count++;
-          }
-
-          @Override
-          public Integer finish() {
-            return count;
-          }
-        };
-
-    SimpleExecutor<HoodieRecord, HoodieRecord, Integer> exec =
-        new SimpleExecutor<>(iterator, consumer, Function.identity());
-
-    final Throwable thrown = assertThrows(HoodieException.class, exec::execute,
-        "exception is expected");
-    assertTrue(thrown.getMessage().contains(errorMessage));
-  }
-
-  class InnerIterator implements Iterator<HoodieRecord> {
-
-    private Iterator<HoodieRecord> iterator;
-    private AtomicInteger count = new AtomicInteger(0);
-    private String errorMessage;
-    private int errorMessageCount;
-
-    public InnerIterator(Iterator<HoodieRecord> iterator, String errorMessage, 
int errorMessageCount) {
-      this.iterator = iterator;
-      this.errorMessage = errorMessage;
-      this.errorMessageCount = errorMessageCount;
-    }
-
-    @Override
-    public boolean hasNext() {
-      return iterator.hasNext();
-    }
-
-    @Override
-    public HoodieRecord next() {
-      if (count.get() == errorMessageCount) {
-        throw new HoodieException(errorMessage);
-      }
+/**
+ * Tests for {@link SimpleExecutor}.
+ */
+public class TestSimpleExecutionInSpark extends BaseExecutorTestHarness {
 
-      count.incrementAndGet();
-      return iterator.next();
-    }
+  @Override
+  protected HoodieExecutor<Integer> createExecutor(
+      Iterator<HoodieRecord> records, HoodieConsumer<HoodieRecord, Integer> 
consumer) {
+    return new SimpleExecutor<>(records, consumer, Function.identity());
   }
 }

Reply via email to