GJL commented on a change in pull request #12350:
URL: https://github.com/apache/flink/pull/12350#discussion_r431301866
##########
File path: flink-end-to-end-tests/test-scripts/test_resume_savepoint.sh
##########
@@ -43,82 +43,106 @@ fi
source "$(dirname "$0")"/common.sh
+TEST_TIMEOUT_SECONDS=900
+
ORIGINAL_DOP=$1
NEW_DOP=$2
STATE_BACKEND_TYPE=${3:-file}
STATE_BACKEND_FILE_ASYNC=${4:-true}
STATE_BACKEND_ROCKS_TIMER_SERVICE_TYPE=${5:-rocks}
-if (( $ORIGINAL_DOP >= $NEW_DOP )); then
- NUM_SLOTS=$ORIGINAL_DOP
-else
- NUM_SLOTS=$NEW_DOP
-fi
-
-set_config_key "taskmanager.numberOfTaskSlots" "${NUM_SLOTS}"
-
-if [ $STATE_BACKEND_ROCKS_TIMER_SERVICE_TYPE == 'heap' ]; then
- set_config_key "state.backend.rocksdb.timer-service.factory" "heap"
-fi
-set_config_key "metrics.fetcher.update-interval" "2000"
-
-setup_flink_slf4j_metric_reporter
-
-start_cluster
-
-CHECKPOINT_DIR="file://$TEST_DATA_DIR/savepoint-e2e-test-chckpt-dir"
-
-# run the DataStream allroundjob
-TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-datastream-allround-test/target/DataStreamAllroundTestProgram.jar
-DATASTREAM_JOB=$($FLINK_DIR/bin/flink run -d -p $ORIGINAL_DOP
$TEST_PROGRAM_JAR \
- --test.semantics exactly-once \
- --environment.parallelism $ORIGINAL_DOP \
- --state_backend $STATE_BACKEND_TYPE \
- --state_backend.checkpoint_directory $CHECKPOINT_DIR \
- --state_backend.file.async $STATE_BACKEND_FILE_ASYNC \
- --sequence_generator_source.sleep_time 15 \
- --sequence_generator_source.sleep_after_elements 1 \
- | grep "Job has been submitted with JobID" | sed 's/.* //g')
-
-wait_job_running $DATASTREAM_JOB
-
-wait_oper_metric_num_in_records SemanticsCheckMapper.0 200
-
-# take a savepoint of the state machine job
-SAVEPOINT_PATH=$(stop_with_savepoint $DATASTREAM_JOB $TEST_DATA_DIR \
- | grep "Savepoint completed. Path:" | sed 's/.* //g')
-
-wait_job_terminal_state "${DATASTREAM_JOB}" "FINISHED"
-
-# isolate the path without the scheme ("file:") and do the necessary checks
-SAVEPOINT_DIR=${SAVEPOINT_PATH#"file:"}
-
-if [ -z "$SAVEPOINT_DIR" ]; then
- echo "Savepoint location was empty. This may mean that the
stop-with-savepoint failed."
- exit 1
-elif [ ! -d "$SAVEPOINT_DIR" ]; then
- echo "Savepoint $SAVEPOINT_PATH does not exist."
- exit 1
-fi
-
-# Since it is not possible to differentiate reporter output between the first
and second execution,
-# we remember the number of metrics sampled in the first execution so that
they can be ignored in the following monitorings
-OLD_NUM_METRICS=$(get_num_metric_samples)
-
-# resume state machine job with savepoint
-DATASTREAM_JOB=$($FLINK_DIR/bin/flink run -s $SAVEPOINT_PATH -p $NEW_DOP -d
$TEST_PROGRAM_JAR \
- --test.semantics exactly-once \
- --environment.parallelism $NEW_DOP \
- --state_backend $STATE_BACKEND_TYPE \
- --state_backend.checkpoint_directory $CHECKPOINT_DIR \
- --state_backend.file.async $STATE_BACKEND_FILE_ASYNC \
- --sequence_generator_source.sleep_time 15 \
- --sequence_generator_source.sleep_after_elements 1 \
- | grep "Job has been submitted with JobID" | sed 's/.* //g')
-
-wait_job_running $DATASTREAM_JOB
-
-wait_oper_metric_num_in_records SemanticsCheckMapper.0 200
-# if state is errorneous and the state machine job produces alerting state
transitions,
-# output would be non-empty and the test will not pass
+function run_resume_savepoint_test() {
+ if (( $ORIGINAL_DOP >= $NEW_DOP )); then
+ NUM_SLOTS=$ORIGINAL_DOP
+ else
+ NUM_SLOTS=$NEW_DOP
+ fi
+
+ set_config_key "taskmanager.numberOfTaskSlots" "${NUM_SLOTS}"
+
+ if [ $STATE_BACKEND_ROCKS_TIMER_SERVICE_TYPE == 'heap' ]; then
+ set_config_key "state.backend.rocksdb.timer-service.factory" "heap"
+ fi
+ set_config_key "metrics.fetcher.update-interval" "2000"
+
+ setup_flink_slf4j_metric_reporter
+
+ start_cluster
+
+ CHECKPOINT_DIR="file://$TEST_DATA_DIR/savepoint-e2e-test-chckpt-dir"
+
+ # run the DataStream allroundjob
+
TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-datastream-allround-test/target/DataStreamAllroundTestProgram.jar
+ DATASTREAM_JOB=$($FLINK_DIR/bin/flink run -d -p $ORIGINAL_DOP
$TEST_PROGRAM_JAR \
+ --test.semantics exactly-once \
+ --environment.parallelism $ORIGINAL_DOP \
+ --state_backend $STATE_BACKEND_TYPE \
+ --state_backend.checkpoint_directory $CHECKPOINT_DIR \
+ --state_backend.file.async $STATE_BACKEND_FILE_ASYNC \
+ --sequence_generator_source.sleep_time 15 \
+ --sequence_generator_source.sleep_after_elements 1 \
+ | grep "Job has been submitted with JobID" | sed 's/.* //g')
+
+ wait_job_running $DATASTREAM_JOB
+
+ wait_oper_metric_num_in_records SemanticsCheckMapper.0 200
+
+ # take a savepoint of the state machine job
+ SAVEPOINT_PATH=$(stop_with_savepoint $DATASTREAM_JOB $TEST_DATA_DIR \
+ | grep "Savepoint completed. Path:" | sed 's/.* //g')
+
+ wait_job_terminal_state "${DATASTREAM_JOB}" "FINISHED"
+
+ # isolate the path without the scheme ("file:") and do the necessary checks
+ SAVEPOINT_DIR=${SAVEPOINT_PATH#"file:"}
+
+ if [ -z "$SAVEPOINT_DIR" ]; then
+ echo "Savepoint location was empty. This may mean that the
stop-with-savepoint failed."
+ exit 1
+ elif [ ! -d "$SAVEPOINT_DIR" ]; then
+ echo "Savepoint $SAVEPOINT_PATH does not exist."
+ exit 1
+ fi
+
+ # Since it is not possible to differentiate reporter output between the
first and second execution,
+ # we remember the number of metrics sampled in the first execution so that
they can be ignored in the following monitorings
+ OLD_NUM_METRICS=$(get_num_metric_samples)
+
+ # resume state machine job with savepoint
+ DATASTREAM_JOB=$($FLINK_DIR/bin/flink run -s $SAVEPOINT_PATH -p $NEW_DOP -d
$TEST_PROGRAM_JAR \
+ --test.semantics exactly-once \
+ --environment.parallelism $NEW_DOP \
+ --state_backend $STATE_BACKEND_TYPE \
+ --state_backend.checkpoint_directory $CHECKPOINT_DIR \
+ --state_backend.file.async $STATE_BACKEND_FILE_ASYNC \
+ --sequence_generator_source.sleep_time 15 \
+ --sequence_generator_source.sleep_after_elements 1 \
+ | grep "Job has been submitted with JobID" | sed 's/.* //g')
+
+ wait_job_running $DATASTREAM_JOB
+
+ wait_oper_metric_num_in_records SemanticsCheckMapper.0 200
+
+ # if state is errorneous and the state machine job produces alerting state
transitions,
+ # output would be non-empty and the test will not pass
+}
+
+function kill_test_watchdog() {
+ local watchdog_pid=`cat $TEST_DATA_DIR/job_watchdog.pid`
+ echo "Stopping job timeout watchdog (with pid=$watchdog_pid)"
+ kill $watchdog_pid
+}
+on_exit kill_test_watchdog
+
+(
+ cmdpid=$BASHPID;
+ (sleep $TEST_TIMEOUT_SECONDS; # set a timeout for this test
Review comment:
What does the `;` at the end of this line do? The semicolon permits
putting two or more commands on the same line. Since we have only line here it
should not be needed. Same in line 139.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]