Skip to content

Commit

Permalink
[FLINK-9353] Added end to end test for standalone embedded job in kub…
Browse files Browse the repository at this point in the history
…ernetes
  • Loading branch information
dawidwys committed Jul 26, 2018
1 parent bf0f16c commit 1a0137e
Show file tree
Hide file tree
Showing 4 changed files with 147 additions and 0 deletions.
4 changes: 4 additions & 0 deletions flink-end-to-end-tests/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ You can also run tests individually via
$ FLINK_DIR=<flink dir> flink-end-to-end-tests/run-single-test.sh your_test.sh arg1 arg2
```

### Kubernetes test

Kubernetes test (test_kubernetes_embedded_job.sh) assumes a running minikube cluster.

## Writing Tests

### Examples
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http:https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

apiVersion: batch/v1
kind: Job
metadata:
name: flink-job-cluster
spec:
template:
metadata:
labels:
app: flink
component: job-cluster
spec:
restartPolicy: OnFailure
containers:
- name: flink-job-cluster
image: ${FLINK_IMAGE_NAME}
imagePullPolicy: Never
args: ["job-cluster", "--job-classname", "${FLINK_JOB}", "-Djobmanager.rpc.address=flink-job-cluster",
"-Dparallelism.default=${FLINK_JOB_PARALLELISM}", "-Dblob.server.port=6124", "-Dquery.server.ports=6125",
${FLINK_JOB_ARGUMENTS}]
ports:
- containerPort: 6123
name: rpc
- containerPort: 6124
name: blob
- containerPort: 6125
name: query
- containerPort: 8081
name: ui
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http:https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

apiVersion: extensions/v1beta1
kind: Deployment
metadata:
name: flink-task-manager
spec:
replicas: ${FLINK_JOB_PARALLELISM}
template:
metadata:
labels:
app: flink
component: task-manager
spec:
containers:
- name: flink-task-manager
image: ${FLINK_IMAGE_NAME}
imagePullPolicy: Never
args: ["task-manager", "-Djobmanager.rpc.address=flink-job-cluster"]
volumeMounts:
- mountPath: /cache
name: cache-volume
volumes:
- name: cache-volume
emptyDir: {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
#!/usr/bin/env bash
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http:https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

source "$(dirname "$0")"/common.sh

DOCKER_MODULE_DIR=${END_TO_END_DIR}/../flink-container/docker
KUBERNETES_MODULE_DIR=${END_TO_END_DIR}/../flink-container/kubernetes
CONTAINER_SCRIPTS=${END_TO_END_DIR}/test-scripts/container-scripts

export FLINK_JOB=org.apache.flink.examples.java.wordcount.WordCount
export FLINK_IMAGE_NAME=test_kubernetes_embedded_job
export OUTPUT_VOLUME=${TEST_DATA_DIR}/out
export OUTPUT_FILE=kubernetes_wc_out
export FLINK_JOB_PARALLELISM=1
export FLINK_JOB_ARGUMENTS='"--output", "/cache/kubernetes_wc_out"'

function cleanup {
kubectl delete job flink-job-cluster
kubectl delete service flink-job-cluster
kubectl delete deployment flink-task-manager
rm -rf ${OUTPUT_VOLUME}
}

trap cleanup EXIT

mkdir -p $OUTPUT_VOLUME

eval $(minikube docker-env)
cd "$DOCKER_MODULE_DIR"
./build.sh --from-local-dist --job-jar ${FLINK_DIR}/examples/batch/WordCount.jar --image-name ${FLINK_IMAGE_NAME}
cd "$END_TO_END_DIR"


kubectl create -f ${KUBERNETES_MODULE_DIR}/job-cluster-service.yaml
envsubst '${FLINK_IMAGE_NAME} ${FLINK_JOB} ${FLINK_JOB_PARALLELISM} ${FLINK_JOB_ARGUMENTS}' < ${CONTAINER_SCRIPTS}/job-cluster-job.yaml.template | kubectl create -f -
envsubst '${FLINK_IMAGE_NAME} ${FLINK_JOB_PARALLELISM}' < ${CONTAINER_SCRIPTS}/task-manager-deployment.yaml.template | kubectl create -f -
kubectl wait --for=condition=complete job/flink-job-cluster --timeout=1h
kubectl cp `kubectl get pods | awk '/task-manager/ {print $1}'`:/cache/${OUTPUT_FILE} ${OUTPUT_VOLUME}/${OUTPUT_FILE}

check_result_hash "WordCount" ${OUTPUT_VOLUME}/${OUTPUT_FILE} "e682ec6622b5e83f2eb614617d5ab2cf"

0 comments on commit 1a0137e

Please sign in to comment.