[ 
https://issues.apache.org/jira/browse/FLINK-19864?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17230315#comment-17230315
 ] 

Kezhu Wang commented on FLINK-19864:
------------------------------------

I think this is probably caused by misuse of {{Thread.getState}} as 
synchronization tool in {{StreamTaskTestHarness.waitForInputProcessing}}. 

{code:java}
        public void waitForInputProcessing() throws Exception {
                while (true) {
                        checkForErrorInTaskThread()
                        if (allInputConsumed()) {
                                break
                        }
                }

                // then wait for the Task Thread to be in a blocked state
                // Check whether the state is blocked, this should be the case 
if it cannot
                // notifyNonEmpty more input, i.e. all currently available 
input has been processed.
                while (true) {
                        Thread.State state = taskThread.getState();
                        if (state == Thread.State.BLOCKED || state == 
Thread.State.TERMINATED ||
                                        state == Thread.State.WAITING || state 
== Thread.State.TIMED_WAITING) {
                                break;
                        }

                        try {
                                Thread.sleep(1);
                        } catch (InterruptedException ignored) {}
                }
        }
{code}

Herre is what javadoc says about {{Thread.getState}}:

{quote}
Returns the state of this thread. This method is designed for use in monitoring 
of the system state, not for synchronization control.
{quote}

Even though {{Thread.threadStatus}} is volatile in JDK, it is not in JVM side.
{code:c++}
// Write the thread status value to threadStatus field in java.lang.Thread java 
class.
void java_lang_Thread::set_thread_status(oop java_thread,
                                         java_lang_Thread::ThreadStatus status) 
{
  // The threadStatus is only present starting in 1.5
  if (_thread_status_offset > 0) {
    java_thread->int_field_put(_thread_status_offset, status);
  }
}
{code}

I can't give an reliable example to prove JVM code without help of additional 
synchronization tool, it is a is a chicken-and-egg problem in my know 
knowledge. This is also not the case we encounter here, as we have explicit 
synchronization tool in this test case: {{ConcurrentLinkedList.size}} and 
{{ConcurrentLinkedList.poll}}.

Also I didn't find explicit blocking statement after 
{{ConcurrentLinkedList.poll}} and before 
{{inputWatermarkGauge.setCurrentWatermark}}. But *there are implicit blocking 
entry points: concurrent class loading.* I writes following code to verify this:

{code:java}
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CountDownLatch;

public class Main {
    private static final List<String> unloadedClassNames = Arrays.asList(
        "java.sql.DriverManager",
        "java.io.Console",
        "java.io.FileInputStream",
        "java.io.FilePermission"
    );

    public static void main(String[] args) throws Exception {
        final CountDownLatch readyLatch = new CountDownLatch(1);
        final CountDownLatch classLoadingLatch = new CountDownLatch(1);
        final CountDownLatch doneLatch = new CountDownLatch(1);
        Thread pollingThread = new Thread(() -> {
            try {
                readyLatch.countDown();
                while (classLoadingLatch.getCount() != 0) {
                    Thread.yield();
                }
                unloadedClassNames.forEach(className -> {
                    try {
                        Class.forName(className);
                        Thread.yield();
                    } catch (Exception ex) {
                        ex.printStackTrace();
                        System.exit(2);
                    }
                });
                while (doneLatch.getCount() != 0) {
                    Thread.yield();
                }
            } catch (Exception ex) {
                ex.printStackTrace();
                System.exit(2);
            }
        });
        pollingThread.start();

        readyLatch.await();

        classLoadingLatch.countDown();

        unloadedClassNames.forEach(className -> {
            try {
                Class.forName(className);
            } catch (Exception ex) {
                ex.printStackTrace();
                System.exit(2);
            }
        });

        Thread.State pollingThreadState = pollingThread.getState();
        if (pollingThreadState != Thread.State.RUNNABLE) {
            System.err.format("polling thread state: %s\n", pollingThreadState);
            System.exit(1);
        }

        doneLatch.countDown();
        pollingThread.join();
    }
}
{code}

Here, I choose four classes, which both have static initialization block. The 
above code fails quite often, roughly rate 10%, in my local environment. This 
is probably why JDK declares that statement, *class loading is everywhere in 
java*.

> TwoInputStreamTaskTest.testWatermarkMetrics failed with "expected:<1> but 
> was:<-9223372036854775808>"
> -----------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-19864
>                 URL: https://issues.apache.org/jira/browse/FLINK-19864
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Metrics, Runtime / Task
>    Affects Versions: 1.12.0
>            Reporter: Dian Fu
>            Priority: Critical
>              Labels: test-stability
>             Fix For: 1.12.0
>
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=8541&view=logs&j=77a9d8e1-d610-59b3-fc2a-4766541e0e33&t=7c61167f-30b3-5893-cc38-a9e3d057e392
> {code}
> 2020-10-28T22:40:44.2528420Z [ERROR] 
> testWatermarkMetrics(org.apache.flink.streaming.runtime.tasks.TwoInputStreamTaskTest)
>  Time elapsed: 1.528 s <<< FAILURE! 2020-10-28T22:40:44.2529225Z 
> java.lang.AssertionError: expected:<1> but was:<-9223372036854775808> 
> 2020-10-28T22:40:44.2541228Z at org.junit.Assert.fail(Assert.java:88) 
> 2020-10-28T22:40:44.2542157Z at 
> org.junit.Assert.failNotEquals(Assert.java:834) 2020-10-28T22:40:44.2542954Z 
> at org.junit.Assert.assertEquals(Assert.java:645) 
> 2020-10-28T22:40:44.2543456Z at 
> org.junit.Assert.assertEquals(Assert.java:631) 2020-10-28T22:40:44.2544002Z 
> at 
> org.apache.flink.streaming.runtime.tasks.TwoInputStreamTaskTest.testWatermarkMetrics(TwoInputStreamTaskTest.java:540)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to