guanziyue commented on a change in pull request #4264:
URL: https://github.com/apache/hudi/pull/4264#discussion_r830594176
##########
File path:
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryExecutorInSpark.java
##########
@@ -152,6 +165,144 @@ protected Integer getResult() {
} finally {
if (executor != null) {
executor.shutdownNow();
+ executor.awaitTermination();
+ }
+ }
+ }
+
+ @Test
+ public void testExecutorTermination() throws ExecutionException,
InterruptedException {
+ // HUDI-2875: sleep time in this UT is designed deliberately. It
represents the case that
+ // consumer is slower than producer and the queue connecting them is
non-empty.
+ // firstly test a nonSafe usage
+ ExecutorService executionThread = Executors.newSingleThreadExecutor();
+ Future<Boolean> testResult = executionThread.submit(new
ExecutorConcurrentUsageTask(false));
+ // let executor run some time
+ sleepUninterruptibly(2 * 1000);
+ executionThread.shutdownNow();
+ boolean concurrentSafe = !testResult.get();
+ assertFalse(concurrentSafe, "Should find concurrent issue");
+ // test a thread safe usage
+ executionThread = Executors.newSingleThreadExecutor();
+ testResult = executionThread.submit(new ExecutorConcurrentUsageTask(true));
+ sleepUninterruptibly(2 * 1000);
+ executionThread.shutdownNow();
+ concurrentSafe = !testResult.get();
+ assertTrue(concurrentSafe, "Should not find concurrent issue");
+ }
+
+ private static void sleepUninterruptibly(int milliseconds) {
+ long remainingNanos = TimeUnit.MILLISECONDS.toNanos(milliseconds);
+ long end = System.nanoTime() + remainingNanos;
+ while (true) {
+ try {
+ TimeUnit.NANOSECONDS.sleep(remainingNanos);
+ return;
+ } catch (InterruptedException interruptedException) {
+ remainingNanos = end - System.nanoTime();
+ }
+ }
+ }
+
+ private class ExecutorConcurrentUsageTask implements Callable<Boolean> {
+ private final boolean correct;
+
+ private ExecutorConcurrentUsageTask(boolean correct) {
+ this.correct = correct;
+ }
+
+ @Override
+ public Boolean call() throws Exception {
+ HoodieWriteConfig hoodieWriteConfig = mock(HoodieWriteConfig.class);
+ when(hoodieWriteConfig.getWriteBufferLimitBytes()).thenReturn(1024 *
1024);
+
+ Iterator<GenericRecord> unboundedRecordIter = new
Iterator<GenericRecord>() {
+ private final Random random = new Random();
Review comment:
Done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]