From 6b430e64359b7a646439ae3b9ddba0577ec5933c Mon Sep 17 00:00:00 2001 From: Arvid Heise Date: Tue, 9 Feb 2021 14:38:35 +0100 Subject: [PATCH] [FLINK-21346][build system] Using FLINK_LOG_DIR in e2e. This closes #15546 --- .../flink/tests/util/AutoClosableProcess.java | 13 ++++++- .../tests/util/flink/FlinkDistribution.java | 7 +++- flink-end-to-end-tests/run-nightly-tests.sh | 38 ++++++++----------- flink-end-to-end-tests/run-single-test.sh | 4 ++ flink-end-to-end-tests/test-scripts/common.sh | 36 ++++++++++-------- .../test-scripts/common_ha.sh | 2 +- .../test-scripts/common_mesos_docker.sh | 2 +- .../docker-mesos-cluster/docker-compose.yml | 3 ++ .../test-scripts/queryable_state_base.sh | 4 +- .../test-scripts/test-runner-common.sh | 11 +----- .../test-scripts/test_cli.sh | 2 +- .../test-scripts/test_docker_embedded_job.sh | 4 +- .../test_local_recovery_and_scheduling.sh | 6 +-- .../test-scripts/test_nat.sh | 6 +-- .../test-scripts/test_pyflink.sh | 4 +- .../test_queryable_state_restart_tm.sh | 2 +- .../test_rocksdb_state_memory_control.sh | 2 +- tools/azure-pipelines/debug_files_utils.sh | 12 +----- tools/azure-pipelines/uploading_watchdog.sh | 2 + 19 files changed, 83 insertions(+), 77 deletions(-) diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/AutoClosableProcess.java b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/AutoClosableProcess.java index c0f8d4a1fe298..cc65241ff9379 100644 --- a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/AutoClosableProcess.java +++ b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/AutoClosableProcess.java @@ -37,6 +37,7 @@ import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.Arrays; +import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.function.Consumer; @@ -79,12 +80,18 @@ public static final class AutoClosableProcessBuilder { private final String[] commands; private Consumer stdoutProcessor = LOG::debug; private Consumer stderrProcessor = LOG::debug; + private Consumer> envProcessor = map -> {}; private @Nullable String[] stdInputs; AutoClosableProcessBuilder(final String... commands) { this.commands = commands; } + public AutoClosableProcessBuilder setEnv(final Consumer> envProcessor) { + this.envProcessor = envProcessor; + return this; + } + public AutoClosableProcessBuilder setStdoutProcessor( final Consumer stdoutProcessor) { this.stdoutProcessor = stdoutProcessor; @@ -119,6 +126,7 @@ public void runBlocking(final Duration timeout) throws IOException { stderrProcessor.accept(line); printer.println(line); }, + envProcessor, stdInputs); try (AutoClosableProcess autoProcess = new AutoClosableProcess(process)) { @@ -165,7 +173,8 @@ public void runBlockingWithRetry( public AutoClosableProcess runNonBlocking() throws IOException { return new AutoClosableProcess( - createProcess(commands, stdoutProcessor, stderrProcessor, stdInputs)); + createProcess( + commands, stdoutProcessor, stderrProcessor, envProcessor, stdInputs)); } } @@ -173,11 +182,13 @@ private static Process createProcess( final String[] commands, Consumer stdoutProcessor, Consumer stderrProcessor, + Consumer> envProcessor, @Nullable String[] stdInputs) throws IOException { final ProcessBuilder processBuilder = new ProcessBuilder(); LOG.debug("Creating process: {}", Arrays.toString(commands)); processBuilder.command(commands); + envProcessor.accept(processBuilder.environment()); final Process process = processBuilder.start(); diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkDistribution.java b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkDistribution.java index 0b1d398996468..4f95903c3a9b7 100644 --- a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkDistribution.java +++ b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkDistribution.java @@ -112,8 +112,11 @@ public void setRootLogLevel(Level logLevel) throws IOException { public void startFlinkCluster() throws IOException { LOG.info("Starting Flink cluster."); - AutoClosableProcess.runBlocking( - bin.resolve("start-cluster.sh").toAbsolutePath().toString()); + AutoClosableProcess.create(bin.resolve("start-cluster.sh").toAbsolutePath().toString()) + // ignore the variable, we assume we log to the distribution directory + // and we copy the logs over in case of failure + .setEnv(env -> env.remove("FLINK_LOG_DIR")) + .runBlocking(); final OkHttpClient client = new OkHttpClient(); diff --git a/flink-end-to-end-tests/run-nightly-tests.sh b/flink-end-to-end-tests/run-nightly-tests.sh index 04c5085fb1ba4..a359cbbccaa18 100755 --- a/flink-end-to-end-tests/run-nightly-tests.sh +++ b/flink-end-to-end-tests/run-nightly-tests.sh @@ -32,30 +32,24 @@ if [ -z "$FLINK_DIR" ] ; then exit 1 fi +if [ -z "$FLINK_LOG_DIR" ] ; then + export FLINK_LOG_DIR="$FLINK_DIR/logs" +fi + +# On Azure CI, use artifacts dir +if [ -z "$DEBUG_FILES_OUTPUT_DIR"] ; then + export DEBUG_FILES_OUTPUT_DIR="$FLINK_LOG_DIR" +fi + source "${END_TO_END_DIR}/../tools/ci/maven-utils.sh" source "${END_TO_END_DIR}/test-scripts/test-runner-common.sh" -# On Azure CI, set artifacts dir -if [ ! -z "$TF_BUILD" ] ; then - export ARTIFACTS_DIR="${END_TO_END_DIR}/artifacts" - mkdir -p $ARTIFACTS_DIR || { echo "FAILURE: cannot create log directory '${ARTIFACTS_DIR}'." ; exit 1; } - - function run_on_exit { - collect_coredumps $(pwd) $ARTIFACTS_DIR - collect_dmesg $ARTIFACTS_DIR - compress_logs - } - - # compress and register logs for publication on exit - function compress_logs { - echo "COMPRESSING build artifacts." - COMPRESSED_ARCHIVE=${BUILD_BUILDNUMBER}.tgz - mkdir compressed-archive-dir - tar -zcvf compressed-archive-dir/${COMPRESSED_ARCHIVE} -C $ARTIFACTS_DIR . - echo "##vso[task.setvariable variable=ARTIFACT_DIR]$(pwd)/compressed-archive-dir" - } - on_exit run_on_exit -fi +function run_on_exit { + collect_coredumps $(pwd) $DEBUG_FILES_OUTPUT_DIR + collect_dmesg $DEBUG_FILES_OUTPUT_DIR +} + +on_exit run_on_exit if [[ ${PROFILE} == *"enable-adaptive-scheduler"* ]]; then echo "Enabling adaptive scheduler properties" @@ -267,7 +261,7 @@ printf "======================================================================== LOG4J_PROPERTIES=${END_TO_END_DIR}/../tools/ci/log4j.properties -MVN_LOGGING_OPTIONS="-Dlog.dir=${ARTIFACTS_DIR} -DlogBackupDir=${ARTIFACTS_DIR} -Dlog4j.configurationFile=file://$LOG4J_PROPERTIES" +MVN_LOGGING_OPTIONS="-Dlog.dir=${DEBUG_FILES_OUTPUT_DIR} -DlogBackupDir=${DEBUG_FILES_OUTPUT_DIR} -Dlog4j.configurationFile=file://$LOG4J_PROPERTIES" MVN_COMMON_OPTIONS="-Dflink.forkCount=2 -Dflink.forkCountTestPackage=2 -Dfast -Pskip-webui-build" e2e_modules=$(find flink-end-to-end-tests -mindepth 2 -maxdepth 5 -name 'pom.xml' -printf '%h\n' | sort -u | tr '\n' ',') e2e_modules="${e2e_modules},$(find flink-walkthroughs -mindepth 2 -maxdepth 2 -name 'pom.xml' -printf '%h\n' | sort -u | tr '\n' ',')" diff --git a/flink-end-to-end-tests/run-single-test.sh b/flink-end-to-end-tests/run-single-test.sh index f84fd46cf185f..961366999ca5f 100755 --- a/flink-end-to-end-tests/run-single-test.sh +++ b/flink-end-to-end-tests/run-single-test.sh @@ -46,6 +46,10 @@ if [ -z "$FLINK_DIR" ] ; then exit 1 fi +if [ -z "$FLINK_LOG_DIR" ] ; then + export FLINK_LOG_DIR="$FLINK_DIR/logs" +fi + source "${END_TO_END_DIR}/../tools/ci/maven-utils.sh" source "${END_TO_END_DIR}/test-scripts/test-runner-common.sh" diff --git a/flink-end-to-end-tests/test-scripts/common.sh b/flink-end-to-end-tests/test-scripts/common.sh index 788f3c780ce31..2ec1cbdd86cf2 100644 --- a/flink-end-to-end-tests/test-scripts/common.sh +++ b/flink-end-to-end-tests/test-scripts/common.sh @@ -26,6 +26,10 @@ if [[ -z $FLINK_DIR ]]; then exit 1 fi +if [ -z "$FLINK_LOG_DIR" ] ; then + export FLINK_LOG_DIR="$FLINK_DIR/logs" +fi + case "$(uname -s)" in Linux*) OS_TYPE=linux;; Darwin*) OS_TYPE=mac;; @@ -341,7 +345,7 @@ function wait_for_number_of_running_tms { function check_logs_for_errors { echo "Checking for errors..." - error_count=$(grep -rv "GroupCoordinatorNotAvailableException" $FLINK_DIR/log \ + error_count=$(grep -rv "GroupCoordinatorNotAvailableException" $FLINK_LOG_DIR \ | grep -v "RetriableCommitFailedException" \ | grep -v "NoAvailableBrokersException" \ | grep -v "Async Kafka commit failed" \ @@ -368,7 +372,7 @@ function check_logs_for_errors { | grep -ic "error" || true) if [[ ${error_count} -gt 0 ]]; then echo "Found error in log files; printing first 500 lines; see full logs for details:" - find $FLINK_DIR/log/ -type f -exec head -n 500 {} \; + find $FLINK_LOG_DIR/ -type f -exec head -n 500 {} \; EXIT_CODE=1 else echo "No errors in log files." @@ -377,7 +381,7 @@ function check_logs_for_errors { function check_logs_for_exceptions { echo "Checking for exceptions..." - exception_count=$(grep -rv "GroupCoordinatorNotAvailableException" $FLINK_DIR/log \ + exception_count=$(grep -rv "GroupCoordinatorNotAvailableException" $FLINK_LOG_DIR \ | grep -v "RetriableCommitFailedException" \ | grep -v "NoAvailableBrokersException" \ | grep -v "Async Kafka commit failed" \ @@ -406,7 +410,7 @@ function check_logs_for_exceptions { | grep -ic "exception" || true) if [[ ${exception_count} -gt 0 ]]; then echo "Found exception in log files; printing first 500 lines; see full logs for details:" - find $FLINK_DIR/log/ -type f -exec head -n 500 {} \; + find $FLINK_LOG_DIR/ -type f -exec head -n 500 {} \; EXIT_CODE=1 else echo "No exceptions in log files." @@ -424,11 +428,11 @@ function check_logs_for_non_empty_out_files { -e "WARNING: Use --illegal-access"\ -e "WARNING: All illegal access"\ -e "Picked up JAVA_TOOL_OPTIONS"\ - $FLINK_DIR/log/*.out\ + $FLINK_LOG_DIR/*.out\ | grep "." \ > /dev/null; then echo "Found non-empty .out files; printing first 500 lines; see full logs for details:" - find $FLINK_DIR/log/ -type f -name '*.out' -exec head -n 500 {} \; + find $FLINK_LOG_DIR/ -type f -name '*.out' -exec head -n 500 {} \; EXIT_CODE=1 else echo "No non-empty .out files." @@ -462,7 +466,7 @@ function wait_for_job_state_transition { echo "Waiting for job ($job) to switch from state ${initial_state} to state ${next_state} ..." while : ; do - N=$(grep -o "($job) switched from state ${initial_state} to ${next_state}" $FLINK_DIR/log/*standalonesession*.log | tail -1) + N=$(grep -o "($job) switched from state ${initial_state} to ${next_state}" $FLINK_LOG_DIR/*standalonesession*.log | tail -1) if [[ -z $N ]]; then sleep 1 @@ -497,7 +501,7 @@ function wait_job_terminal_state { echo "Waiting for job ($job) to reach terminal state $expected_terminal_state ..." while : ; do - local N=$(grep -o "Job $job reached globally terminal state .*" $FLINK_DIR/log/*$log_file_name*.log | tail -1 || true) + local N=$(grep -o "Job $job reached globally terminal state .*" $FLINK_LOG_DIR/*$log_file_name*.log | tail -1 || true) if [[ -z $N ]]; then sleep 1 else @@ -619,7 +623,7 @@ function get_job_metric { function get_metric_processed_records { OPERATOR=$1 JOB_NAME="${2:-General purpose test job}" - N=$(grep ".${JOB_NAME}.$OPERATOR.numRecordsIn:" $FLINK_DIR/log/*taskexecutor*.log | sed 's/.* //g' | tail -1) + N=$(grep ".${JOB_NAME}.$OPERATOR.numRecordsIn:" $FLINK_LOG_DIR/*taskexecutor*.log | sed 's/.* //g' | tail -1) if [ -z $N ]; then N=0 fi @@ -629,7 +633,7 @@ function get_metric_processed_records { function get_num_metric_samples { OPERATOR=$1 JOB_NAME="${2:-General purpose test job}" - N=$(grep ".${JOB_NAME}.$OPERATOR.numRecordsIn:" $FLINK_DIR/log/*taskexecutor*.log | wc -l) + N=$(grep ".${JOB_NAME}.$OPERATOR.numRecordsIn:" $FLINK_LOG_DIR/*taskexecutor*.log | wc -l) if [ -z $N ]; then N=0 fi @@ -679,7 +683,7 @@ function wait_num_of_occurence_in_logs { echo "Waiting for text ${text} to appear ${number} of times in logs..." while : ; do - N=$(grep -o "${text}" $FLINK_DIR/log/*${logs}*.log | wc -l) + N=$(grep -o "${text}" $FLINK_LOG_DIR/*${logs}*.log | wc -l) if [ -z $N ]; then N=0 @@ -708,7 +712,7 @@ function wait_num_checkpoints { echo "Waiting for job ($JOB) to have at least $NUM_CHECKPOINTS completed checkpoints ..." while : ; do - N=$(grep -o "Completed checkpoint [1-9]* for job $JOB" $FLINK_DIR/log/*standalonesession*.log | awk '{print $3}' | tail -1) + N=$(grep -o "Completed checkpoint [1-9]* for job $JOB" $FLINK_LOG_DIR/*standalonesession*.log | awk '{print $3}' | tail -1) if [ -z $N ]; then N=0 @@ -739,8 +743,8 @@ function end_timer { } function clean_stdout_files { - rm ${FLINK_DIR}/log/*.out - echo "Deleted all stdout files under ${FLINK_DIR}/log/" + rm $FLINK_LOG_DIR/*.out + echo "Deleted all stdout files under $FLINK_LOG_DIR/" } # Expect a string to appear in the log files of the task manager before a given timeout @@ -750,7 +754,7 @@ function expect_in_taskmanager_logs { local expected="$1" local timeout=$2 local i=0 - local logfile="${FLINK_DIR}/log/flink*taskexecutor*log" + local logfile="$FLINK_LOG_DIR/flink*taskexecutor*log" while ! grep "${expected}" ${logfile} > /dev/null; do @@ -858,7 +862,7 @@ internal_run_with_timeout() { run_on_test_failure() { echo "Printing Flink logs and killing it:" - cat ${FLINK_DIR}/log/* + cat $FLINK_LOG_DIR/* } run_test_with_timeout() { diff --git a/flink-end-to-end-tests/test-scripts/common_ha.sh b/flink-end-to-end-tests/test-scripts/common_ha.sh index b43f94ec8524c..13c179a22ba78 100644 --- a/flink-end-to-end-tests/test-scripts/common_ha.sh +++ b/flink-end-to-end-tests/test-scripts/common_ha.sh @@ -49,7 +49,7 @@ function verify_num_occurences_in_logs() { local text="$2" local expected_no="$3" - local actual_no=$(grep -r --include "*${log_pattern}*.log" -e "${text}" "${FLINK_DIR}/log/" | cut -d ":" -f 1 | uniq | wc -l) + local actual_no=$(grep -r --include "*${log_pattern}*.log" -e "${text}" "$FLINK_LOG_DIR/" | cut -d ":" -f 1 | uniq | wc -l) [[ "${expected_no}" -eq "${actual_no}" ]] } diff --git a/flink-end-to-end-tests/test-scripts/common_mesos_docker.sh b/flink-end-to-end-tests/test-scripts/common_mesos_docker.sh index 83aca21aeb5e0..e2fdbaf26249f 100644 --- a/flink-end-to-end-tests/test-scripts/common_mesos_docker.sh +++ b/flink-end-to-end-tests/test-scripts/common_mesos_docker.sh @@ -33,7 +33,7 @@ start_time=$(date +%s) # make sure we stop our cluster at the end function cluster_shutdown { - docker exec mesos-master bash -c "chmod -R ogu+rw ${FLINK_DIR}/log/ ${TEST_DATA_DIR}" + docker exec mesos-master bash -c "chmod -R ogu+rw $FLINK_LOG_DIR/ ${TEST_DATA_DIR}" docker-compose -f $END_TO_END_DIR/test-scripts/docker-mesos-cluster/docker-compose.yml down } on_exit cluster_shutdown diff --git a/flink-end-to-end-tests/test-scripts/docker-mesos-cluster/docker-compose.yml b/flink-end-to-end-tests/test-scripts/docker-mesos-cluster/docker-compose.yml index 366d858f3fd2b..f9b01c7e6dc4f 100644 --- a/flink-end-to-end-tests/test-scripts/docker-mesos-cluster/docker-compose.yml +++ b/flink-end-to-end-tests/test-scripts/docker-mesos-cluster/docker-compose.yml @@ -36,6 +36,7 @@ services: volumes: - ${END_TO_END_DIR}:${END_TO_END_DIR} - ${FLINK_DIR}:${FLINK_DIR} + - ${FLINK_LOG_DIR}:${FLINK_LOG_DIR} - ${MVN_REPO}:${MVN_REPO} environment: MESOS_PORT: 5050 @@ -43,6 +44,7 @@ services: MESOS_REGISTRY: in_memory MESOS_LOG_DIR: /var/log/mesos MESOS_WORK_DIR: /var/tmp/mesos + FLINK_LOG_DIR: ${FLINK_LOG_DIR} slave: image: flink/docker-mesos-cluster:latest @@ -64,3 +66,4 @@ services: MESOS_LOG_DIR: /var/log/mesos MESOS_WORK_DIR: /var/tmp/mesos MESOS_SYSTEMD_ENABLE_SUPPORT: 'false' + FLINK_LOG_DIR: ${FLINK_LOG_DIR} diff --git a/flink-end-to-end-tests/test-scripts/queryable_state_base.sh b/flink-end-to-end-tests/test-scripts/queryable_state_base.sh index 277d1264182cb..87957ec3edf1a 100644 --- a/flink-end-to-end-tests/test-scripts/queryable_state_base.sh +++ b/flink-end-to-end-tests/test-scripts/queryable_state_base.sh @@ -25,7 +25,7 @@ function link_queryable_state_lib { # Returns the ip address of the queryable state server function get_queryable_state_server_ip { - local ip=$(cat ${FLINK_DIR}/log/flink*taskexecutor*log \ + local ip=$(cat $FLINK_LOG_DIR/flink*taskexecutor*log \ | grep "Started Queryable State Server" \ | head -1 \ | grep -Eo "\.*[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.*") @@ -35,7 +35,7 @@ function get_queryable_state_server_ip { # Returns the ip address of the queryable state server function get_queryable_state_proxy_port { - local port=$(cat ${FLINK_DIR}/log/flink*taskexecutor*log \ + local port=$(cat $FLINK_LOG_DIR/flink*taskexecutor*log \ | grep "Started Queryable State Proxy Server" \ | head -1 \ | grep -Eo "\.*\:([0-9]{3,5})\.*" | tr -d ":.") diff --git a/flink-end-to-end-tests/test-scripts/test-runner-common.sh b/flink-end-to-end-tests/test-scripts/test-runner-common.sh index a97351db8e767..487a8b4024348 100644 --- a/flink-end-to-end-tests/test-scripts/test-runner-common.sh +++ b/flink-end-to-end-tests/test-scripts/test-runner-common.sh @@ -99,13 +99,6 @@ function post_test_validation { log_environment_info else log_environment_info - # make logs available if ARTIFACTS_DIR is set - if [[ ${ARTIFACTS_DIR} != "" ]]; then - mkdir ${ARTIFACTS_DIR}/e2e-flink-logs - cp $FLINK_DIR/log/* ${ARTIFACTS_DIR}/e2e-flink-logs/ - echo "Published e2e logs into debug logs artifact:" - ls ${ARTIFACTS_DIR}/e2e-flink-logs/ - fi exit "${exit_code}" fi } @@ -134,8 +127,8 @@ function cleanup_proc { # Cleans up all temporary folders and files function cleanup_tmp_files { - rm -f ${FLINK_DIR}/log/* - echo "Deleted all files under ${FLINK_DIR}/log/" + rm -f $FLINK_LOG_DIR/* + echo "Deleted all files under $FLINK_LOG_DIR/" rm -rf ${TEST_DATA_DIR} 2> /dev/null echo "Deleted ${TEST_DATA_DIR}" diff --git a/flink-end-to-end-tests/test-scripts/test_cli.sh b/flink-end-to-end-tests/test-scripts/test_cli.sh index bc4654b13124f..985ce677200a4 100755 --- a/flink-end-to-end-tests/test-scripts/test_cli.sh +++ b/flink-end-to-end-tests/test-scripts/test_cli.sh @@ -65,7 +65,7 @@ function extract_valid_job_list_by_type_from_job_list_return() { } function extract_task_manager_slot_request_count() { - COUNT=`grep "Receive slot request" $FLINK_DIR/log/*taskexecutor*.log | wc -l` + COUNT=`grep "Receive slot request" $FLINK_LOG_DIR/*taskexecutor*.log | wc -l` echo $COUNT } diff --git a/flink-end-to-end-tests/test-scripts/test_docker_embedded_job.sh b/flink-end-to-end-tests/test-scripts/test_docker_embedded_job.sh index 9c7dac51af691..4bee725efc652 100755 --- a/flink-end-to-end-tests/test-scripts/test_docker_embedded_job.sh +++ b/flink-end-to-end-tests/test-scripts/test_docker_embedded_job.sh @@ -62,8 +62,8 @@ fi export USER_LIB=${FLINK_DIR}/examples/batch docker-compose -f ${DOCKER_SCRIPTS}/docker-compose.test.yml up --force-recreate --abort-on-container-exit --exit-code-from job-cluster &> /dev/null -docker-compose -f ${DOCKER_SCRIPTS}/docker-compose.test.yml logs job-cluster > ${FLINK_DIR}/log/jobmanager.log -docker-compose -f ${DOCKER_SCRIPTS}/docker-compose.test.yml logs taskmanager > ${FLINK_DIR}/log/taskmanager.log +docker-compose -f ${DOCKER_SCRIPTS}/docker-compose.test.yml logs job-cluster > $FLINK_LOG_DIR/jobmanager.log +docker-compose -f ${DOCKER_SCRIPTS}/docker-compose.test.yml logs taskmanager > $FLINK_LOG_DIR/taskmanager.log docker-compose -f ${DOCKER_SCRIPTS}/docker-compose.test.yml rm -f check_result_hash "WordCount" $OUTPUT_VOLUME/docker_wc_out "${RESULT_HASH}" diff --git a/flink-end-to-end-tests/test-scripts/test_local_recovery_and_scheduling.sh b/flink-end-to-end-tests/test-scripts/test_local_recovery_and_scheduling.sh index 233e62ebc01bb..dbd55dc10c899 100755 --- a/flink-end-to-end-tests/test-scripts/test_local_recovery_and_scheduling.sh +++ b/flink-end-to-end-tests/test-scripts/test_local_recovery_and_scheduling.sh @@ -27,10 +27,10 @@ function check_logs { (( expected_count=parallelism * (attempts + 1) )) # Search for the log message that indicates restore problem from existing local state for the keyed backend. - local failed_local_recovery=$(grep '^.*Creating keyed state backend.* from alternative (2/2)\.$' $FLINK_DIR/log/* | wc -l | tr -d ' ') + local failed_local_recovery=$(grep '^.*Creating keyed state backend.* from alternative (2/2)\.$' $FLINK_LOG_DIR/* | wc -l | tr -d ' ') # Search for attempts to recover locally. - local attempt_local_recovery=$(grep '^.*Creating keyed state backend.* from alternative (1/2)\.$' $FLINK_DIR/log/* | wc -l | tr -d ' ') + local attempt_local_recovery=$(grep '^.*Creating keyed state backend.* from alternative (1/2)\.$' $FLINK_LOG_DIR/* | wc -l | tr -d ' ') if [ ${failed_local_recovery} -ne 0 ] then @@ -80,7 +80,7 @@ function run_local_recovery_test { # Ensure that each TM only has one operator(chain) set_config_key "taskmanager.numberOfTaskSlots" "1" - rm $FLINK_DIR/log/* 2> /dev/null + rm $FLINK_LOG_DIR/* 2> /dev/null # Start HA server start_local_zk diff --git a/flink-end-to-end-tests/test-scripts/test_nat.sh b/flink-end-to-end-tests/test-scripts/test_nat.sh index 2bab53a8c8aeb..b5d1f550f4cc1 100755 --- a/flink-end-to-end-tests/test-scripts/test_nat.sh +++ b/flink-end-to-end-tests/test-scripts/test_nat.sh @@ -65,9 +65,9 @@ popd export USER_LIB=${FLINK_DIR}/examples/batch docker-compose -f ${DOCKER_SCRIPTS}/docker-compose.nat.yml up --force-recreate --abort-on-container-exit --exit-code-from job-cluster &> /dev/null -docker-compose -f ${DOCKER_SCRIPTS}/docker-compose.nat.yml logs job-cluster > ${FLINK_DIR}/log/jobmanager.log -docker-compose -f ${DOCKER_SCRIPTS}/docker-compose.nat.yml logs taskmanager1 > ${FLINK_DIR}/log/taskmanager1.log -docker-compose -f ${DOCKER_SCRIPTS}/docker-compose.nat.yml logs taskmanager2 > ${FLINK_DIR}/log/taskmanager2.log +docker-compose -f ${DOCKER_SCRIPTS}/docker-compose.nat.yml logs job-cluster > $FLINK_LOG_DIR/jobmanager.log +docker-compose -f ${DOCKER_SCRIPTS}/docker-compose.nat.yml logs taskmanager1 > $FLINK_LOG_DIR/taskmanager1.log +docker-compose -f ${DOCKER_SCRIPTS}/docker-compose.nat.yml logs taskmanager2 > $FLINK_LOG_DIR/taskmanager2.log docker-compose -f ${DOCKER_SCRIPTS}/docker-compose.nat.yml rm -f check_result_hash "WordCount" ${OUTPUT_VOLUME}/${OUTPUT_PREFIX}/ "${RESULT_HASH}" diff --git a/flink-end-to-end-tests/test-scripts/test_pyflink.sh b/flink-end-to-end-tests/test-scripts/test_pyflink.sh index 884d02c75aca2..24ffc59ee0179 100755 --- a/flink-end-to-end-tests/test-scripts/test_pyflink.sh +++ b/flink-end-to-end-tests/test-scripts/test_pyflink.sh @@ -255,11 +255,11 @@ function read_msg_from_kafka { function cat_jm_logs { local log_file_name=${3:-standalonesession} - cat $FLINK_DIR/log/*$log_file_name*.log + cat $FLINK_LOG_DIR/*$log_file_name*.log } function cat_tm_logs { - local logfile="${FLINK_DIR}/log/flink*taskexecutor*log" + local logfile="$FLINK_LOG_DIR/flink*taskexecutor*log" cat ${logfile} } diff --git a/flink-end-to-end-tests/test-scripts/test_queryable_state_restart_tm.sh b/flink-end-to-end-tests/test-scripts/test_queryable_state_restart_tm.sh index 166d9e6aa3496..95dfb6399b4b9 100755 --- a/flink-end-to-end-tests/test-scripts/test_queryable_state_restart_tm.sh +++ b/flink-end-to-end-tests/test-scripts/test_queryable_state_restart_tm.sh @@ -88,7 +88,7 @@ function run_test() { kill_random_taskmanager wait_for_number_of_running_tms 0 - latest_snapshot_count=$(cat $FLINK_DIR/log/*out* | grep "on snapshot" | tail -n 1 | awk '{print $4}') + latest_snapshot_count=$(cat $FLINK_LOG_DIR/*out* | grep "on snapshot" | tail -n 1 | awk '{print $4}') echo "Latest snapshot count was ${latest_snapshot_count}" start_and_wait_for_tm diff --git a/flink-end-to-end-tests/test-scripts/test_rocksdb_state_memory_control.sh b/flink-end-to-end-tests/test-scripts/test_rocksdb_state_memory_control.sh index 5d15d0e0228e0..66bfa43e36453 100755 --- a/flink-end-to-end-tests/test-scripts/test_rocksdb_state_memory_control.sh +++ b/flink-end-to-end-tests/test-scripts/test_rocksdb_state_memory_control.sh @@ -80,7 +80,7 @@ function buildBaseJobCmd { function find_max_block_cache_usage() { OPERATOR=$1 JOB_NAME="${2:-General purpose test job}" - N=$(grep ".${JOB_NAME}.$OPERATOR.rocksdb.block-cache-usage:" $FLINK_DIR/log/*taskexecutor*.log | sed 's/.* //g' | sort -rn | head -n 1) + N=$(grep ".${JOB_NAME}.$OPERATOR.rocksdb.block-cache-usage:" $FLINK_LOG_DIR/*taskexecutor*.log | sed 's/.* //g' | sort -rn | head -n 1) if [ -z $N ]; then N=0 fi diff --git a/tools/azure-pipelines/debug_files_utils.sh b/tools/azure-pipelines/debug_files_utils.sh index 1b0d068f30503..50c1b4c5a4d6a 100755 --- a/tools/azure-pipelines/debug_files_utils.sh +++ b/tools/azure-pipelines/debug_files_utils.sh @@ -19,17 +19,9 @@ function prepare_debug_files { MODULE=$@ - export DEBUG_FILES_OUTPUT_DIR="$AGENT_TEMPDIRECTORY/debug_files/" + export DEBUG_FILES_OUTPUT_DIR="$AGENT_TEMPDIRECTORY/debug_files" export DEBUG_FILES_NAME="$(echo $MODULE | tr -c '[:alnum:]\n\r' '_')-$(date +%s)" echo "##vso[task.setvariable variable=DEBUG_FILES_OUTPUT_DIR]$DEBUG_FILES_OUTPUT_DIR" echo "##vso[task.setvariable variable=DEBUG_FILES_NAME]$DEBUG_FILES_NAME" - mkdir -p $DEBUG_FILES_OUTPUT_DIR || { echo "FAILURE: cannot create log directory '${DEBUG_FILES_OUTPUT_DIR}'." ; exit 1; } -} - -function compress_debug_files { - echo "Compressing debug files" - tar -zcvf /tmp/$DEBUG_FILES_NAME.tgz -C $DEBUG_FILES_OUTPUT_DIR . - # clean directory - rm -rf $DEBUG_FILES_OUTPUT_DIR ; mkdir -p $DEBUG_FILES_OUTPUT_DIR - mv /tmp/$DEBUG_FILES_NAME.tgz $DEBUG_FILES_OUTPUT_DIR + mkdir -p $DEBUG_FILES_OUTPUT_DIR || { echo "FAILURE: cannot create debug files directory '${DEBUG_FILES_OUTPUT_DIR}'." ; exit 1; } } diff --git a/tools/azure-pipelines/uploading_watchdog.sh b/tools/azure-pipelines/uploading_watchdog.sh index 543bc2ec32fe9..c49f093200a64 100755 --- a/tools/azure-pipelines/uploading_watchdog.sh +++ b/tools/azure-pipelines/uploading_watchdog.sh @@ -31,6 +31,8 @@ source "${HERE}/../ci/controller_utils.sh" source ./tools/azure-pipelines/debug_files_utils.sh prepare_debug_files "$AGENT_JOBNAME" +export FLINK_LOG_DIR="$DEBUG_FILES_OUTPUT_DIR/flink-logs" +mkdir $FLINK_LOG_DIR || { echo "FAILURE: cannot create log directory '${FLINK_LOG_DIR}'." ; exit 1; } sudo apt-get install -y moreutils REAL_START_SECONDS=$(date +"%s")