[ https://issues.apache.org/jira/browse/FLINK-19864?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17230315#comment-17230315 ]
Kezhu Wang commented on FLINK-19864: ------------------------------------ I think this is probably caused by misuse of {{Thread.getState}} as synchronization tool in {{StreamTaskTestHarness.waitForInputProcessing}}. {code:java} public void waitForInputProcessing() throws Exception { while (true) { checkForErrorInTaskThread() if (allInputConsumed()) { break } } // then wait for the Task Thread to be in a blocked state // Check whether the state is blocked, this should be the case if it cannot // notifyNonEmpty more input, i.e. all currently available input has been processed. while (true) { Thread.State state = taskThread.getState(); if (state == Thread.State.BLOCKED || state == Thread.State.TERMINATED || state == Thread.State.WAITING || state == Thread.State.TIMED_WAITING) { break; } try { Thread.sleep(1); } catch (InterruptedException ignored) {} } } {code} Herre is what javadoc says about {{Thread.getState}}: {quote} Returns the state of this thread. This method is designed for use in monitoring of the system state, not for synchronization control. {quote} Even though {{Thread.threadStatus}} is volatile in JDK, it is not in JVM side. {code:c++} // Write the thread status value to threadStatus field in java.lang.Thread java class. void java_lang_Thread::set_thread_status(oop java_thread, java_lang_Thread::ThreadStatus status) { // The threadStatus is only present starting in 1.5 if (_thread_status_offset > 0) { java_thread->int_field_put(_thread_status_offset, status); } } {code} I can't give an reliable example to prove JVM code without help of additional synchronization tool, it is a is a chicken-and-egg problem in my know knowledge. This is also not the case we encounter here, as we have explicit synchronization tool in this test case: {{ConcurrentLinkedList.size}} and {{ConcurrentLinkedList.poll}}. Also I didn't find explicit blocking statement after {{ConcurrentLinkedList.poll}} and before {{inputWatermarkGauge.setCurrentWatermark}}. But *there are implicit blocking entry points: concurrent class loading.* I writes following code to verify this: {code:java} import java.util.Arrays; import java.util.List; import java.util.concurrent.CountDownLatch; public class Main { private static final List<String> unloadedClassNames = Arrays.asList( "java.sql.DriverManager", "java.io.Console", "java.io.FileInputStream", "java.io.FilePermission" ); public static void main(String[] args) throws Exception { final CountDownLatch readyLatch = new CountDownLatch(1); final CountDownLatch classLoadingLatch = new CountDownLatch(1); final CountDownLatch doneLatch = new CountDownLatch(1); Thread pollingThread = new Thread(() -> { try { readyLatch.countDown(); while (classLoadingLatch.getCount() != 0) { Thread.yield(); } unloadedClassNames.forEach(className -> { try { Class.forName(className); Thread.yield(); } catch (Exception ex) { ex.printStackTrace(); System.exit(2); } }); while (doneLatch.getCount() != 0) { Thread.yield(); } } catch (Exception ex) { ex.printStackTrace(); System.exit(2); } }); pollingThread.start(); readyLatch.await(); classLoadingLatch.countDown(); unloadedClassNames.forEach(className -> { try { Class.forName(className); } catch (Exception ex) { ex.printStackTrace(); System.exit(2); } }); Thread.State pollingThreadState = pollingThread.getState(); if (pollingThreadState != Thread.State.RUNNABLE) { System.err.format("polling thread state: %s\n", pollingThreadState); System.exit(1); } doneLatch.countDown(); pollingThread.join(); } } {code} Here, I choose four classes, which both have static initialization block. The above code fails quite often, roughly rate 10%, in my local environment. This is probably why JDK declares that statement, *class loading is everywhere in java*. > TwoInputStreamTaskTest.testWatermarkMetrics failed with "expected:<1> but > was:<-9223372036854775808>" > ----------------------------------------------------------------------------------------------------- > > Key: FLINK-19864 > URL: https://issues.apache.org/jira/browse/FLINK-19864 > Project: Flink > Issue Type: Bug > Components: Runtime / Metrics, Runtime / Task > Affects Versions: 1.12.0 > Reporter: Dian Fu > Priority: Critical > Labels: test-stability > Fix For: 1.12.0 > > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=8541&view=logs&j=77a9d8e1-d610-59b3-fc2a-4766541e0e33&t=7c61167f-30b3-5893-cc38-a9e3d057e392 > {code} > 2020-10-28T22:40:44.2528420Z [ERROR] > testWatermarkMetrics(org.apache.flink.streaming.runtime.tasks.TwoInputStreamTaskTest) > Time elapsed: 1.528 s <<< FAILURE! 2020-10-28T22:40:44.2529225Z > java.lang.AssertionError: expected:<1> but was:<-9223372036854775808> > 2020-10-28T22:40:44.2541228Z at org.junit.Assert.fail(Assert.java:88) > 2020-10-28T22:40:44.2542157Z at > org.junit.Assert.failNotEquals(Assert.java:834) 2020-10-28T22:40:44.2542954Z > at org.junit.Assert.assertEquals(Assert.java:645) > 2020-10-28T22:40:44.2543456Z at > org.junit.Assert.assertEquals(Assert.java:631) 2020-10-28T22:40:44.2544002Z > at > org.apache.flink.streaming.runtime.tasks.TwoInputStreamTaskTest.testWatermarkMetrics(TwoInputStreamTaskTest.java:540) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)