Skip to content

Commit

Permalink
[FLINK-21346][build system] Using FLINK_LOG_DIR in e2e.
Browse files Browse the repository at this point in the history
This closes apache#15546
  • Loading branch information
Arvid Heise authored and dawidwys committed Apr 16, 2021
1 parent f472d2d commit 6b430e6
Show file tree
Hide file tree
Showing 19 changed files with 83 additions and 77 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
Expand Down Expand Up @@ -79,12 +80,18 @@ public static final class AutoClosableProcessBuilder {
private final String[] commands;
private Consumer<String> stdoutProcessor = LOG::debug;
private Consumer<String> stderrProcessor = LOG::debug;
private Consumer<Map<String, String>> envProcessor = map -> {};
private @Nullable String[] stdInputs;

AutoClosableProcessBuilder(final String... commands) {
this.commands = commands;
}

public AutoClosableProcessBuilder setEnv(final Consumer<Map<String, String>> envProcessor) {
this.envProcessor = envProcessor;
return this;
}

public AutoClosableProcessBuilder setStdoutProcessor(
final Consumer<String> stdoutProcessor) {
this.stdoutProcessor = stdoutProcessor;
Expand Down Expand Up @@ -119,6 +126,7 @@ public void runBlocking(final Duration timeout) throws IOException {
stderrProcessor.accept(line);
printer.println(line);
},
envProcessor,
stdInputs);

try (AutoClosableProcess autoProcess = new AutoClosableProcess(process)) {
Expand Down Expand Up @@ -165,19 +173,22 @@ public void runBlockingWithRetry(

public AutoClosableProcess runNonBlocking() throws IOException {
return new AutoClosableProcess(
createProcess(commands, stdoutProcessor, stderrProcessor, stdInputs));
createProcess(
commands, stdoutProcessor, stderrProcessor, envProcessor, stdInputs));
}
}

private static Process createProcess(
final String[] commands,
Consumer<String> stdoutProcessor,
Consumer<String> stderrProcessor,
Consumer<Map<String, String>> envProcessor,
@Nullable String[] stdInputs)
throws IOException {
final ProcessBuilder processBuilder = new ProcessBuilder();
LOG.debug("Creating process: {}", Arrays.toString(commands));
processBuilder.command(commands);
envProcessor.accept(processBuilder.environment());

final Process process = processBuilder.start();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,11 @@ public void setRootLogLevel(Level logLevel) throws IOException {

public void startFlinkCluster() throws IOException {
LOG.info("Starting Flink cluster.");
AutoClosableProcess.runBlocking(
bin.resolve("start-cluster.sh").toAbsolutePath().toString());
AutoClosableProcess.create(bin.resolve("start-cluster.sh").toAbsolutePath().toString())
// ignore the variable, we assume we log to the distribution directory
// and we copy the logs over in case of failure
.setEnv(env -> env.remove("FLINK_LOG_DIR"))
.runBlocking();

final OkHttpClient client = new OkHttpClient();

Expand Down
38 changes: 16 additions & 22 deletions flink-end-to-end-tests/run-nightly-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -32,30 +32,24 @@ if [ -z "$FLINK_DIR" ] ; then
exit 1
fi

if [ -z "$FLINK_LOG_DIR" ] ; then
export FLINK_LOG_DIR="$FLINK_DIR/logs"
fi

# On Azure CI, use artifacts dir
if [ -z "$DEBUG_FILES_OUTPUT_DIR"] ; then
export DEBUG_FILES_OUTPUT_DIR="$FLINK_LOG_DIR"
fi

source "${END_TO_END_DIR}/../tools/ci/maven-utils.sh"
source "${END_TO_END_DIR}/test-scripts/test-runner-common.sh"

# On Azure CI, set artifacts dir
if [ ! -z "$TF_BUILD" ] ; then
export ARTIFACTS_DIR="${END_TO_END_DIR}/artifacts"
mkdir -p $ARTIFACTS_DIR || { echo "FAILURE: cannot create log directory '${ARTIFACTS_DIR}'." ; exit 1; }

function run_on_exit {
collect_coredumps $(pwd) $ARTIFACTS_DIR
collect_dmesg $ARTIFACTS_DIR
compress_logs
}

# compress and register logs for publication on exit
function compress_logs {
echo "COMPRESSING build artifacts."
COMPRESSED_ARCHIVE=${BUILD_BUILDNUMBER}.tgz
mkdir compressed-archive-dir
tar -zcvf compressed-archive-dir/${COMPRESSED_ARCHIVE} -C $ARTIFACTS_DIR .
echo "##vso[task.setvariable variable=ARTIFACT_DIR]$(pwd)/compressed-archive-dir"
}
on_exit run_on_exit
fi
function run_on_exit {
collect_coredumps $(pwd) $DEBUG_FILES_OUTPUT_DIR
collect_dmesg $DEBUG_FILES_OUTPUT_DIR
}

on_exit run_on_exit

if [[ ${PROFILE} == *"enable-adaptive-scheduler"* ]]; then
echo "Enabling adaptive scheduler properties"
Expand Down Expand Up @@ -267,7 +261,7 @@ printf "========================================================================

LOG4J_PROPERTIES=${END_TO_END_DIR}/../tools/ci/log4j.properties

MVN_LOGGING_OPTIONS="-Dlog.dir=${ARTIFACTS_DIR} -DlogBackupDir=${ARTIFACTS_DIR} -Dlog4j.configurationFile=file:https://$LOG4J_PROPERTIES"
MVN_LOGGING_OPTIONS="-Dlog.dir=${DEBUG_FILES_OUTPUT_DIR} -DlogBackupDir=${DEBUG_FILES_OUTPUT_DIR} -Dlog4j.configurationFile=file:https://$LOG4J_PROPERTIES"
MVN_COMMON_OPTIONS="-Dflink.forkCount=2 -Dflink.forkCountTestPackage=2 -Dfast -Pskip-webui-build"
e2e_modules=$(find flink-end-to-end-tests -mindepth 2 -maxdepth 5 -name 'pom.xml' -printf '%h\n' | sort -u | tr '\n' ',')
e2e_modules="${e2e_modules},$(find flink-walkthroughs -mindepth 2 -maxdepth 2 -name 'pom.xml' -printf '%h\n' | sort -u | tr '\n' ',')"
Expand Down
4 changes: 4 additions & 0 deletions flink-end-to-end-tests/run-single-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ if [ -z "$FLINK_DIR" ] ; then
exit 1
fi

if [ -z "$FLINK_LOG_DIR" ] ; then
export FLINK_LOG_DIR="$FLINK_DIR/logs"
fi

source "${END_TO_END_DIR}/../tools/ci/maven-utils.sh"
source "${END_TO_END_DIR}/test-scripts/test-runner-common.sh"

Expand Down
36 changes: 20 additions & 16 deletions flink-end-to-end-tests/test-scripts/common.sh
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ if [[ -z $FLINK_DIR ]]; then
exit 1
fi

if [ -z "$FLINK_LOG_DIR" ] ; then
export FLINK_LOG_DIR="$FLINK_DIR/logs"
fi

case "$(uname -s)" in
Linux*) OS_TYPE=linux;;
Darwin*) OS_TYPE=mac;;
Expand Down Expand Up @@ -341,7 +345,7 @@ function wait_for_number_of_running_tms {

function check_logs_for_errors {
echo "Checking for errors..."
error_count=$(grep -rv "GroupCoordinatorNotAvailableException" $FLINK_DIR/log \
error_count=$(grep -rv "GroupCoordinatorNotAvailableException" $FLINK_LOG_DIR \
| grep -v "RetriableCommitFailedException" \
| grep -v "NoAvailableBrokersException" \
| grep -v "Async Kafka commit failed" \
Expand All @@ -368,7 +372,7 @@ function check_logs_for_errors {
| grep -ic "error" || true)
if [[ ${error_count} -gt 0 ]]; then
echo "Found error in log files; printing first 500 lines; see full logs for details:"
find $FLINK_DIR/log/ -type f -exec head -n 500 {} \;
find $FLINK_LOG_DIR/ -type f -exec head -n 500 {} \;
EXIT_CODE=1
else
echo "No errors in log files."
Expand All @@ -377,7 +381,7 @@ function check_logs_for_errors {

function check_logs_for_exceptions {
echo "Checking for exceptions..."
exception_count=$(grep -rv "GroupCoordinatorNotAvailableException" $FLINK_DIR/log \
exception_count=$(grep -rv "GroupCoordinatorNotAvailableException" $FLINK_LOG_DIR \
| grep -v "RetriableCommitFailedException" \
| grep -v "NoAvailableBrokersException" \
| grep -v "Async Kafka commit failed" \
Expand Down Expand Up @@ -406,7 +410,7 @@ function check_logs_for_exceptions {
| grep -ic "exception" || true)
if [[ ${exception_count} -gt 0 ]]; then
echo "Found exception in log files; printing first 500 lines; see full logs for details:"
find $FLINK_DIR/log/ -type f -exec head -n 500 {} \;
find $FLINK_LOG_DIR/ -type f -exec head -n 500 {} \;
EXIT_CODE=1
else
echo "No exceptions in log files."
Expand All @@ -424,11 +428,11 @@ function check_logs_for_non_empty_out_files {
-e "WARNING: Use --illegal-access"\
-e "WARNING: All illegal access"\
-e "Picked up JAVA_TOOL_OPTIONS"\
$FLINK_DIR/log/*.out\
$FLINK_LOG_DIR/*.out\
| grep "." \
> /dev/null; then
echo "Found non-empty .out files; printing first 500 lines; see full logs for details:"
find $FLINK_DIR/log/ -type f -name '*.out' -exec head -n 500 {} \;
find $FLINK_LOG_DIR/ -type f -name '*.out' -exec head -n 500 {} \;
EXIT_CODE=1
else
echo "No non-empty .out files."
Expand Down Expand Up @@ -462,7 +466,7 @@ function wait_for_job_state_transition {
echo "Waiting for job ($job) to switch from state ${initial_state} to state ${next_state} ..."

while : ; do
N=$(grep -o "($job) switched from state ${initial_state} to ${next_state}" $FLINK_DIR/log/*standalonesession*.log | tail -1)
N=$(grep -o "($job) switched from state ${initial_state} to ${next_state}" $FLINK_LOG_DIR/*standalonesession*.log | tail -1)

if [[ -z $N ]]; then
sleep 1
Expand Down Expand Up @@ -497,7 +501,7 @@ function wait_job_terminal_state {
echo "Waiting for job ($job) to reach terminal state $expected_terminal_state ..."

while : ; do
local N=$(grep -o "Job $job reached globally terminal state .*" $FLINK_DIR/log/*$log_file_name*.log | tail -1 || true)
local N=$(grep -o "Job $job reached globally terminal state .*" $FLINK_LOG_DIR/*$log_file_name*.log | tail -1 || true)
if [[ -z $N ]]; then
sleep 1
else
Expand Down Expand Up @@ -619,7 +623,7 @@ function get_job_metric {
function get_metric_processed_records {
OPERATOR=$1
JOB_NAME="${2:-General purpose test job}"
N=$(grep ".${JOB_NAME}.$OPERATOR.numRecordsIn:" $FLINK_DIR/log/*taskexecutor*.log | sed 's/.* //g' | tail -1)
N=$(grep ".${JOB_NAME}.$OPERATOR.numRecordsIn:" $FLINK_LOG_DIR/*taskexecutor*.log | sed 's/.* //g' | tail -1)
if [ -z $N ]; then
N=0
fi
Expand All @@ -629,7 +633,7 @@ function get_metric_processed_records {
function get_num_metric_samples {
OPERATOR=$1
JOB_NAME="${2:-General purpose test job}"
N=$(grep ".${JOB_NAME}.$OPERATOR.numRecordsIn:" $FLINK_DIR/log/*taskexecutor*.log | wc -l)
N=$(grep ".${JOB_NAME}.$OPERATOR.numRecordsIn:" $FLINK_LOG_DIR/*taskexecutor*.log | wc -l)
if [ -z $N ]; then
N=0
fi
Expand Down Expand Up @@ -679,7 +683,7 @@ function wait_num_of_occurence_in_logs {
echo "Waiting for text ${text} to appear ${number} of times in logs..."

while : ; do
N=$(grep -o "${text}" $FLINK_DIR/log/*${logs}*.log | wc -l)
N=$(grep -o "${text}" $FLINK_LOG_DIR/*${logs}*.log | wc -l)

if [ -z $N ]; then
N=0
Expand Down Expand Up @@ -708,7 +712,7 @@ function wait_num_checkpoints {
echo "Waiting for job ($JOB) to have at least $NUM_CHECKPOINTS completed checkpoints ..."

while : ; do
N=$(grep -o "Completed checkpoint [1-9]* for job $JOB" $FLINK_DIR/log/*standalonesession*.log | awk '{print $3}' | tail -1)
N=$(grep -o "Completed checkpoint [1-9]* for job $JOB" $FLINK_LOG_DIR/*standalonesession*.log | awk '{print $3}' | tail -1)

if [ -z $N ]; then
N=0
Expand Down Expand Up @@ -739,8 +743,8 @@ function end_timer {
}

function clean_stdout_files {
rm ${FLINK_DIR}/log/*.out
echo "Deleted all stdout files under ${FLINK_DIR}/log/"
rm $FLINK_LOG_DIR/*.out
echo "Deleted all stdout files under $FLINK_LOG_DIR/"
}

# Expect a string to appear in the log files of the task manager before a given timeout
Expand All @@ -750,7 +754,7 @@ function expect_in_taskmanager_logs {
local expected="$1"
local timeout=$2
local i=0
local logfile="${FLINK_DIR}/log/flink*taskexecutor*log"
local logfile="$FLINK_LOG_DIR/flink*taskexecutor*log"


while ! grep "${expected}" ${logfile} > /dev/null; do
Expand Down Expand Up @@ -858,7 +862,7 @@ internal_run_with_timeout() {

run_on_test_failure() {
echo "Printing Flink logs and killing it:"
cat ${FLINK_DIR}/log/*
cat $FLINK_LOG_DIR/*
}

run_test_with_timeout() {
Expand Down
2 changes: 1 addition & 1 deletion flink-end-to-end-tests/test-scripts/common_ha.sh
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ function verify_num_occurences_in_logs() {
local text="$2"
local expected_no="$3"

local actual_no=$(grep -r --include "*${log_pattern}*.log" -e "${text}" "${FLINK_DIR}/log/" | cut -d ":" -f 1 | uniq | wc -l)
local actual_no=$(grep -r --include "*${log_pattern}*.log" -e "${text}" "$FLINK_LOG_DIR/" | cut -d ":" -f 1 | uniq | wc -l)
[[ "${expected_no}" -eq "${actual_no}" ]]
}

Expand Down
2 changes: 1 addition & 1 deletion flink-end-to-end-tests/test-scripts/common_mesos_docker.sh
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ start_time=$(date +%s)

# make sure we stop our cluster at the end
function cluster_shutdown {
docker exec mesos-master bash -c "chmod -R ogu+rw ${FLINK_DIR}/log/ ${TEST_DATA_DIR}"
docker exec mesos-master bash -c "chmod -R ogu+rw $FLINK_LOG_DIR/ ${TEST_DATA_DIR}"
docker-compose -f $END_TO_END_DIR/test-scripts/docker-mesos-cluster/docker-compose.yml down
}
on_exit cluster_shutdown
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,15 @@ services:
volumes:
- ${END_TO_END_DIR}:${END_TO_END_DIR}
- ${FLINK_DIR}:${FLINK_DIR}
- ${FLINK_LOG_DIR}:${FLINK_LOG_DIR}
- ${MVN_REPO}:${MVN_REPO}
environment:
MESOS_PORT: 5050
MESOS_QUORUM: 1
MESOS_REGISTRY: in_memory
MESOS_LOG_DIR: /var/log/mesos
MESOS_WORK_DIR: /var/tmp/mesos
FLINK_LOG_DIR: ${FLINK_LOG_DIR}

slave:
image: flink/docker-mesos-cluster:latest
Expand All @@ -64,3 +66,4 @@ services:
MESOS_LOG_DIR: /var/log/mesos
MESOS_WORK_DIR: /var/tmp/mesos
MESOS_SYSTEMD_ENABLE_SUPPORT: 'false'
FLINK_LOG_DIR: ${FLINK_LOG_DIR}
4 changes: 2 additions & 2 deletions flink-end-to-end-tests/test-scripts/queryable_state_base.sh
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ function link_queryable_state_lib {

# Returns the ip address of the queryable state server
function get_queryable_state_server_ip {
local ip=$(cat ${FLINK_DIR}/log/flink*taskexecutor*log \
local ip=$(cat $FLINK_LOG_DIR/flink*taskexecutor*log \
| grep "Started Queryable State Server" \
| head -1 \
| grep -Eo "\.*[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.*")
Expand All @@ -35,7 +35,7 @@ function get_queryable_state_server_ip {

# Returns the ip address of the queryable state server
function get_queryable_state_proxy_port {
local port=$(cat ${FLINK_DIR}/log/flink*taskexecutor*log \
local port=$(cat $FLINK_LOG_DIR/flink*taskexecutor*log \
| grep "Started Queryable State Proxy Server" \
| head -1 \
| grep -Eo "\.*\:([0-9]{3,5})\.*" | tr -d ":.")
Expand Down
11 changes: 2 additions & 9 deletions flink-end-to-end-tests/test-scripts/test-runner-common.sh
Original file line number Diff line number Diff line change
Expand Up @@ -99,13 +99,6 @@ function post_test_validation {
log_environment_info
else
log_environment_info
# make logs available if ARTIFACTS_DIR is set
if [[ ${ARTIFACTS_DIR} != "" ]]; then
mkdir ${ARTIFACTS_DIR}/e2e-flink-logs
cp $FLINK_DIR/log/* ${ARTIFACTS_DIR}/e2e-flink-logs/
echo "Published e2e logs into debug logs artifact:"
ls ${ARTIFACTS_DIR}/e2e-flink-logs/
fi
exit "${exit_code}"
fi
}
Expand Down Expand Up @@ -134,8 +127,8 @@ function cleanup_proc {

# Cleans up all temporary folders and files
function cleanup_tmp_files {
rm -f ${FLINK_DIR}/log/*
echo "Deleted all files under ${FLINK_DIR}/log/"
rm -f $FLINK_LOG_DIR/*
echo "Deleted all files under $FLINK_LOG_DIR/"

rm -rf ${TEST_DATA_DIR} 2> /dev/null
echo "Deleted ${TEST_DATA_DIR}"
Expand Down
2 changes: 1 addition & 1 deletion flink-end-to-end-tests/test-scripts/test_cli.sh
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ function extract_valid_job_list_by_type_from_job_list_return() {
}

function extract_task_manager_slot_request_count() {
COUNT=`grep "Receive slot request" $FLINK_DIR/log/*taskexecutor*.log | wc -l`
COUNT=`grep "Receive slot request" $FLINK_LOG_DIR/*taskexecutor*.log | wc -l`
echo $COUNT
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ fi

export USER_LIB=${FLINK_DIR}/examples/batch
docker-compose -f ${DOCKER_SCRIPTS}/docker-compose.test.yml up --force-recreate --abort-on-container-exit --exit-code-from job-cluster &> /dev/null
docker-compose -f ${DOCKER_SCRIPTS}/docker-compose.test.yml logs job-cluster > ${FLINK_DIR}/log/jobmanager.log
docker-compose -f ${DOCKER_SCRIPTS}/docker-compose.test.yml logs taskmanager > ${FLINK_DIR}/log/taskmanager.log
docker-compose -f ${DOCKER_SCRIPTS}/docker-compose.test.yml logs job-cluster > $FLINK_LOG_DIR/jobmanager.log
docker-compose -f ${DOCKER_SCRIPTS}/docker-compose.test.yml logs taskmanager > $FLINK_LOG_DIR/taskmanager.log
docker-compose -f ${DOCKER_SCRIPTS}/docker-compose.test.yml rm -f

check_result_hash "WordCount" $OUTPUT_VOLUME/docker_wc_out "${RESULT_HASH}"
Loading

0 comments on commit 6b430e6

Please sign in to comment.