[ https://issues.apache.org/jira/browse/FLINK-10842?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16707407#comment-16707407 ]
ASF GitHub Bot commented on FLINK-10842: ---------------------------------------- tillrohrmann closed pull request #7221: [FLINK-10842][E2E tests] fix broken waiting loops in common.sh URL: https://github.com/apache/flink/pull/7221 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-end-to-end-tests/test-scripts/common.sh b/flink-end-to-end-tests/test-scripts/common.sh index ffa7f439bbe..c9a174149dc 100644 --- a/flink-end-to-end-tests/test-scripts/common.sh +++ b/flink-end-to-end-tests/test-scripts/common.sh @@ -38,6 +38,8 @@ echo "Flink dist directory: $FLINK_DIR" FLINK_VERSION=$(cat ${END_TO_END_DIR}/pom.xml | sed -n 's/.*<version>\(.*\)<\/version>/\1/p') +NODENAME=`hostname -f` + USE_SSL=OFF # set via set_conf_ssl(), reset via revert_default_config() TEST_ROOT=`pwd -P` TEST_INFRA_DIR="$END_TO_END_DIR/test-scripts/" @@ -176,7 +178,6 @@ function set_conf_ssl { fi mkdir -p "${TEST_DATA_DIR}/ssl" - NODENAME=`hostname -f` SANSTRING="dns:${NODENAME}" for NODEIP in $(get_node_ip) ; do SANSTRING="${SANSTRING},ip:${NODEIP}" @@ -185,16 +186,19 @@ function set_conf_ssl { echo "Using SAN ${SANSTRING}" # create certificates - keytool -genkeypair -alias ca -keystore "${TEST_DATA_DIR}/ssl/ca.keystore" -dname "CN=Sample CA" -storepass password -keypass password -keyalg RSA -ext bc=ca:true + keytool -genkeypair -alias ca -keystore "${TEST_DATA_DIR}/ssl/ca.keystore" -dname "CN=Sample CA" -storepass password -keypass password -keyalg RSA -ext bc=ca:true -storetype PKCS12 keytool -keystore "${TEST_DATA_DIR}/ssl/ca.keystore" -storepass password -alias ca -exportcert > "${TEST_DATA_DIR}/ssl/ca.cer" keytool -importcert -keystore "${TEST_DATA_DIR}/ssl/ca.truststore" -alias ca -storepass password -noprompt -file "${TEST_DATA_DIR}/ssl/ca.cer" - keytool -genkeypair -alias node -keystore "${TEST_DATA_DIR}/ssl/node.keystore" -dname "CN=${NODENAME}" -ext SAN=${SANSTRING} -storepass password -keypass password -keyalg RSA + keytool -genkeypair -alias node -keystore "${TEST_DATA_DIR}/ssl/node.keystore" -dname "CN=${NODENAME}" -ext SAN=${SANSTRING} -storepass password -keypass password -keyalg RSA -storetype PKCS12 keytool -certreq -keystore "${TEST_DATA_DIR}/ssl/node.keystore" -storepass password -alias node -file "${TEST_DATA_DIR}/ssl/node.csr" keytool -gencert -keystore "${TEST_DATA_DIR}/ssl/ca.keystore" -storepass password -alias ca -ext SAN=${SANSTRING} -infile "${TEST_DATA_DIR}/ssl/node.csr" -outfile "${TEST_DATA_DIR}/ssl/node.cer" keytool -importcert -keystore "${TEST_DATA_DIR}/ssl/node.keystore" -storepass password -file "${TEST_DATA_DIR}/ssl/ca.cer" -alias ca -noprompt keytool -importcert -keystore "${TEST_DATA_DIR}/ssl/node.keystore" -storepass password -file "${TEST_DATA_DIR}/ssl/node.cer" -alias node -noprompt + # keystore is converted into a pem format to use it as node.pem with curl in Flink REST API queries, see also $CURL_SSL_ARGS + openssl pkcs12 -passin pass:password -in "${TEST_DATA_DIR}/ssl/node.keystore" -out "${TEST_DATA_DIR}/ssl/node.pem" -nodes + # adapt config # (here we rely on security.ssl.enabled enabling SSL for all components and internal as well as # external communication channels) @@ -242,26 +246,23 @@ function start_cluster { "$FLINK_DIR"/bin/start-cluster.sh # wait at most 10 seconds until the dispatcher is up - local QUERY_URL - if [ "x$USE_SSL" = "xON" ]; then - QUERY_URL="http://localhost:8081/taskmanagers" - else - QUERY_URL="https://localhost:8081/taskmanagers" - fi - for i in {1..10}; do + local TIMEOUT=10 + for i in $(seq 1 ${TIMEOUT}); do # without the || true this would exit our script if the JobManager is not yet up - QUERY_RESULT=$(curl "$QUERY_URL" 2> /dev/null || true) + QUERY_RESULT=$(query_running_tms 2> /dev/null || true) # ensure the taskmanagers field is there at all and is not empty if [[ ${QUERY_RESULT} =~ \{\"taskmanagers\":\[.+\]\} ]]; then echo "Dispatcher REST endpoint is up." - break + return fi echo "Waiting for dispatcher REST endpoint to come up..." sleep 1 done + echo "Dispatcher REST endpoint has not started within a timeout of ${TIMEOUT} sec" + exit 1 } function start_taskmanagers { @@ -274,31 +275,56 @@ function start_taskmanagers { } function start_and_wait_for_tm { - - tm_query_result=$(curl -s "http://localhost:8081/taskmanagers") - + tm_query_result=`query_running_tms` # we assume that the cluster is running if ! [[ ${tm_query_result} =~ \{\"taskmanagers\":\[.*\]\} ]]; then echo "Your cluster seems to be unresponsive at the moment: ${tm_query_result}" 1>&2 exit 1 fi - running_tms=`curl -s "http://localhost:8081/taskmanagers" | grep -o "id" | wc -l` - + running_tms=`query_number_of_running_tms` ${FLINK_DIR}/bin/taskmanager.sh start + wait_for_number_of_running_tms $((running_tms+1)) +} - for i in {1..10}; do - local new_running_tms=`curl -s "http://localhost:8081/taskmanagers" | grep -o "id" | wc -l` - if [ $((new_running_tms-running_tms)) -eq 0 ]; then - echo "TaskManager is not yet up." +function query_running_tms { + local QUERY_URL + local CURL_SSL_ARGS + if [ "x$USE_SSL" = "xON" ]; then + QUERY_URL="https://${NODENAME}:8081/taskmanagers" + CURL_SSL_ARGS="--cacert ${TEST_DATA_DIR}/ssl/node.pem" + else + QUERY_URL="http://${NODENAME}:8081/taskmanagers" + CURL_SSL_ARGS="" + fi + curl ${CURL_SSL_ARGS} -s "${QUERY_URL}" +} + +function query_number_of_running_tms { + query_running_tms | grep -o "id" | wc -l +} + +function wait_for_number_of_running_tms { + local TM_NUM_TO_WAIT=${1} + local TIMEOUT_COUNTER=10 + local TIMEOUT_INC=4 + local TIMEOUT=$(( $TIMEOUT_COUNTER * $TIMEOUT_INC )) + local TM_NUM_TEXT="Number of running task managers" + for i in $(seq 1 ${TIMEOUT_COUNTER}); do + local TM_NUM=`query_number_of_running_tms` + if [ $((TM_NUM - TM_NUM_TO_WAIT)) -eq 0 ]; then + echo "${TM_NUM_TEXT} has reached ${TM_NUM_TO_WAIT}." + return else - echo "TaskManager is up." - break + echo "${TM_NUM_TEXT} ${TM_NUM} is not yet ${TM_NUM_TO_WAIT}." fi - sleep 4 + sleep ${TIMEOUT_INC} done + echo "${TM_NUM_TEXT} has not reached ${TM_NUM_TO_WAIT} within a timeout of ${TIMEOUT} sec" + exit 1 } + function check_logs_for_errors { error_count=$(grep -rv "GroupCoordinatorNotAvailableException" $FLINK_DIR/log \ | grep -v "RetriableCommitFailedException" \ @@ -392,17 +418,20 @@ function wait_for_job_state_transition { } function wait_job_running { - for i in {1..10}; do + local TIMEOUT=10 + for i in $(seq 1 ${TIMEOUT}); do JOB_LIST_RESULT=$("$FLINK_DIR"/bin/flink list -r | grep "$1") if [[ "$JOB_LIST_RESULT" == "" ]]; then echo "Job ($1) is not yet running." else echo "Job ($1) is running." - break + return fi sleep 1 done + echo "Job ($1) has not started within a timeout of ${TIMEOUT} sec" + exit 1 } function wait_job_terminal_state { diff --git a/flink-end-to-end-tests/test-scripts/test_queryable_state_restart_tm.sh b/flink-end-to-end-tests/test-scripts/test_queryable_state_restart_tm.sh index 56d811e14a1..db0174a2160 100755 --- a/flink-end-to-end-tests/test-scripts/test_queryable_state_restart_tm.sh +++ b/flink-end-to-end-tests/test-scripts/test_queryable_state_restart_tm.sh @@ -86,6 +86,7 @@ function run_test() { fi kill_random_taskmanager + wait_for_number_of_running_tms 0 latest_snapshot_count=$(cat $FLINK_DIR/log/*out* | grep "on snapshot" | tail -n 1 | awk '{print $4}') echo "Latest snapshot count was ${latest_snapshot_count}" diff --git a/flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh b/flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh index 35fe30b6b25..48d68c98e9d 100755 --- a/flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh +++ b/flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh @@ -100,11 +100,10 @@ fi DATASTREAM_JOB=$($JOB_CMD | grep "Job has been submitted with JobID" | sed 's/.* //g') -wait_job_running $DATASTREAM_JOB - if [[ $SIMULATE_FAILURE == "true" ]]; then wait_job_terminal_state $DATASTREAM_JOB FAILED else + wait_job_running $DATASTREAM_JOB wait_num_checkpoints $DATASTREAM_JOB 1 wait_oper_metric_num_in_records SemanticsCheckMapper.0 200 ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Waiting loops are broken in e2e/common.sh > ----------------------------------------- > > Key: FLINK-10842 > URL: https://issues.apache.org/jira/browse/FLINK-10842 > Project: Flink > Issue Type: Bug > Components: E2E Tests > Affects Versions: 1.7.0 > Reporter: Andrey Zagrebin > Assignee: Andrey Zagrebin > Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > There are 3 loops in flink-end-to-end-tests/test-scripts/common.sh where the > script waits for some event to happen (for i in \{1..10}; do): > - wait_dispatcher_running > - start_and_wait_for_tm > - wait_job_running > All loops have 10 iterations and the loop breaks if the awaited event > happens. If timeout occurs then the script does not fail and the function > just continues after 10 iterations ignoring that the awaited event did not > happen. -- This message was sent by Atlassian JIRA (v7.6.3#76005)