Skip to content

Commit

Permalink
[FLINK-12541][container] Add support for Python jobs in build script
Browse files Browse the repository at this point in the history
  • Loading branch information
dianfu committed Jun 4, 2019
1 parent 6033b4d commit 900f728
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 19 deletions.
26 changes: 22 additions & 4 deletions flink-container/docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -25,25 +25,43 @@ RUN apk add --no-cache bash snappy libc6-compat
ENV FLINK_INSTALL_PATH=/opt
ENV FLINK_HOME $FLINK_INSTALL_PATH/flink
ENV FLINK_LIB_DIR $FLINK_HOME/lib
ENV FLINK_OPT_DIR $FLINK_HOME/opt
ENV FLINK_JOB_ARTIFACTS_DIR $FLINK_INSTALL_PATH/artifacts
ENV PATH $PATH:$FLINK_HOME/bin

# flink-dist can point to a directory or a tarball on the local system
ARG flink_dist=NOT_SET
ARG job_jar=NOT_SET
ARG job_artifacts=NOT_SET
ARG python_version=NOT_SET
ARG opt_jars=NOT_SET
# hadoop jar is optional
ARG hadoop_jar=NOT_SET*

# Install Python
RUN \
if [ "$python_version" = "2" ]; then \
apk add --no-cache python; \
elif [ "$python_version" = "3" ]; then \
apk add --no-cache python3 && ln -s /usr/bin/python3 /usr/bin/python; \
fi

# Install build dependencies and flink
ADD $flink_dist $hadoop_jar $FLINK_INSTALL_PATH/
ADD $job_jar $FLINK_INSTALL_PATH/job.jar
ADD $job_artifacts/* $FLINK_JOB_ARTIFACTS_DIR/

RUN set -x && \
ln -s $FLINK_INSTALL_PATH/flink-[0-9]* $FLINK_HOME && \
ln -s $FLINK_INSTALL_PATH/job.jar $FLINK_LIB_DIR && \
for jar in $FLINK_JOB_ARTIFACTS_DIR/*.jar; do [ -f "$jar" ] || continue; ln -s $jar $FLINK_LIB_DIR; done && \
if [ -n "$python_version" ]; then ln -s $FLINK_OPT_DIR/flink-python-*-java-binding.jar $FLINK_LIB_DIR; fi && \
OIFS=$IFS && IFS=',' && \
for expected_jar in $opt_jars; do \
for jar in $FLINK_OPT_DIR/*.jar; do if echo "$jar" | grep -q "$expected_jar"; then ln -s $jar $FLINK_LIB_DIR; fi done \
done && \
IFS=$OIFS && \
if [ -f ${FLINK_INSTALL_PATH}/flink-shaded-hadoop* ]; then ln -s ${FLINK_INSTALL_PATH}/flink-shaded-hadoop* $FLINK_LIB_DIR; fi && \
addgroup -S flink && adduser -D -S -H -G flink -h $FLINK_HOME flink && \
chown -R flink:flink ${FLINK_INSTALL_PATH}/flink-* && \
chown -R flink:flink ${FLINK_INSTALL_PATH}/job.jar && \
chown -R flink:flink ${FLINK_JOB_ARTIFACTS_DIR}/ && \
chown -h flink:flink $FLINK_HOME

COPY docker-entrypoint.sh /
Expand Down
13 changes: 9 additions & 4 deletions flink-container/docker/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,32 +13,37 @@ Install the most recent stable version of [Docker](https://docs.docker.com/insta
Images are based on the official Java Alpine (OpenJDK 8) image.

Before building the image, one needs to build the user code jars for the job.
Assume that the job jar is stored under `<PATH_TO_JOB_JAR>`
Assume that the job jar is stored under `<COMMA_SEPARATED_PATH_TO_JOB_ARTIFACTS>`

If you want to build the Flink image from the version you have checked out locally run:

build.sh --from-local-dist --job-jar <PATH_TO_JOB_JAR> --image-name <IMAGE_NAME>
build.sh --from-local-dist --job-artifacts <COMMA_SEPARATED_PATH_TO_JOB_ARTIFACTS> [--with-python2|--with-python3] [--opt-jars <COMMA_SEPARATED_OPT_JARS>] --image-name <IMAGE_NAME>

Note that you first need to call `mvn package -pl flink-dist -am` to build the Flink binaries.

If you want to build the Flink image from an archive stored under `<PATH_TO_ARCHIVE>` run:

build.sh --from-archive <PATH_TO_ARCHIVE> --job-jar <PATH_TO_JOB_JAR> --image-name <IMAGE_NAME>
build.sh --from-archive <PATH_TO_ARCHIVE> --job-artifacts <COMMA_SEPARATED_PATH_TO_JOB_ARTIFACTS> [--with-python2|--with-python3] [--opt-jars <COMMA_SEPARATED_OPT_JARS>] --image-name <IMAGE_NAME>

If you want to build the Flink image for a specific version of Flink/Hadoop/Scala run:

build.sh --from-release --flink-version 1.6.0 --hadoop-version 2.8 --scala-version 2.11 --image-name <IMAGE_NAME>
build.sh --from-release --flink-version 1.6.0 --hadoop-version 2.8 --scala-version 2.11 --job-artifacts <COMMA_SEPARATED_PATH_TO_JOB_ARTIFACTS> [--with-python2|--with-python3] [--opt-jars <COMMA_SEPARATED_OPT_JARS>] --image-name <IMAGE_NAME>

Please note that from Flink-1.8, hadoop version is optional and you could build the Flink image without providing any hadoop version.

The script will try to download the released version from the Apache archive.

The artifacts specified in <COMMA_SEPARATED_PATH_TO_JOB_ARTIFACTS> will be copied to directory /opt/artifacts of the built image.

<COMMA_SEPARATED_OPT_JARS> specifies the optional jars to be used in directory <FLINK_HOME>/opt/. For example, `--opt-jars table` can be specified if the table jar is needed.

## Deploying via Docker compose

The `docker-compose.yml` contains the following parameters:

* `FLINK_DOCKER_IMAGE_NAME` - Image name to use for the deployment (default: `flink-job:latest`)
* `FLINK_JOB` - Name of the Flink job to execute (default: none)
* `FLINK_PYTHON_ARTIFACTS` - Python artifacts of the Flink job to execute (default: /opt/artifacts/)
* `DEFAULT_PARALLELISM` - Default parallelism with which to start the job (default: 1)
* `FLINK_JOB_ARGUMENTS` - Additional arguments which will be passed to the job cluster (default: none)
* `SAVEPOINT_OPTIONS` - Savepoint options to start the cluster with (default: none)
Expand Down
34 changes: 26 additions & 8 deletions flink-container/docker/build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@
usage() {
cat <<HERE
Usage:
build.sh --job-jar <path-to-job-jar> --from-local-dist [--image-name <image>]
build.sh --job-jar <path-to-job-jar> --from-archive <path-to-dist-archive> [--image-name <image>]
build.sh --job-jar <path-to-job-jar> --from-release --flink-version <x.x.x> --scala-version <x.xx> [--hadoop-version <x.x>] [--image-name <image>]
build.sh --job-artifacts <comma-separated-paths-to-job-artifacts> [--with-python2|--with-python3] [--opt-jars <comma-separated-opt-jars>] --from-local-dist [--image-name <image>]
build.sh --job-artifacts <comma-separated-paths-to-job-artifacts> [--with-python2|--with-python3] [--opt-jars <comma-separated-opt-jars>] --from-archive <path-to-dist-archive> [--image-name <image>]
build.sh --job-artifacts <comma-separated-paths-to-job-artifacts> [--with-python2|--with-python3] [--opt-jars <comma-separated-opt-jars>] --from-release --flink-version <x.x.x> --scala-version <x.xx> [--hadoop-version <x.x>] [--image-name <image>]
build.sh --help
If the --image-name flag is not used the built image name will be 'flink-job'.
Expand All @@ -35,8 +35,18 @@ while [[ $# -ge 1 ]]
do
key="$1"
case $key in
--job-jar)
JOB_JAR_PATH="$2"
--job-artifacts)
JOB_ARTIFACTS_PATH="$2"
shift
;;
--with-python2)
PYTHON_VERSION="2"
;;
--with-python3)
PYTHON_VERSION="3"
;;
--opt-jars)
OPT_JARS="$2"
shift
;;
--from-local-dist)
Expand Down Expand Up @@ -93,8 +103,16 @@ trap cleanup EXIT

mkdir -p "${TMPDIR}"

JOB_JAR_TARGET="${TMPDIR}/job.jar"
cp ${JOB_JAR_PATH} ${JOB_JAR_TARGET}
JOB_ARTIFACTS_TARGET="${TMPDIR}/artifacts"
mkdir -p ${JOB_ARTIFACTS_TARGET}

OLD_IFS="$IFS"
IFS=","
job_artifacts_array=(${JOB_ARTIFACTS_PATH})
IFS="$OLD_IFS"
for artifact in ${job_artifacts_array[@]}; do
cp ${artifact} ${JOB_ARTIFACTS_TARGET}/
done

checkUrlAvailable() {
curl --output /dev/null --silent --head --fail $1
Expand Down Expand Up @@ -170,4 +188,4 @@ else

fi

docker build --build-arg flink_dist="${FLINK_DIST}" --build-arg job_jar="${JOB_JAR_TARGET}" --build-arg hadoop_jar="${SHADED_HADOOP}" -t "${IMAGE_NAME}" .
docker build --build-arg flink_dist="${FLINK_DIST}" --build-arg job_artifacts="${JOB_ARTIFACTS_TARGET}" --build-arg opt_jars="${OPT_JARS}" --build-arg hadoop_jar="${SHADED_HADOOP}" --build-arg python_version="${PYTHON_VERSION}" -t "${IMAGE_NAME}" .
3 changes: 2 additions & 1 deletion flink-container/docker/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
# Parameters:
# * FLINK_DOCKER_IMAGE_NAME - Image name to use for the deployment (default: flink-job:latest)
# * FLINK_JOB - Name of the Flink job to execute (default: none)
# * FLINK_PYTHON_ARTIFACTS - Python artifacts of the Flink job to execute (default: /opt/artifacts/)
# * DEFAULT_PARALLELISM - Default parallelism with which to start the job (default: 1)
# * FLINK_JOB_ARGUMENTS - Additional arguments which will be passed to the job cluster (default: none)
# * SAVEPOINT_OPTIONS - Savepoint options to start the cluster with (default: none)
Expand All @@ -31,7 +32,7 @@ services:
image: ${FLINK_DOCKER_IMAGE_NAME:-flink-job}
ports:
- "8081:8081"
command: job-cluster --job-classname ${FLINK_JOB} -Djobmanager.rpc.address=job-cluster -Dparallelism.default=${DEFAULT_PARALLELISM:-1} ${SAVEPOINT_OPTIONS} ${FLINK_JOB_ARGUMENTS}
command: job-cluster --job-entrypoint ${FLINK_JOB} --job-python-artifacts ${FLINK_PYTHON_ARTIFACTS:-/opt/artifacts/} -Djobmanager.rpc.address=job-cluster -Dparallelism.default=${DEFAULT_PARALLELISM:-1} ${SAVEPOINT_OPTIONS} ${FLINK_JOB_ARGUMENTS}

taskmanager:
image: ${FLINK_DOCKER_IMAGE_NAME:-flink-job}
Expand Down
5 changes: 3 additions & 2 deletions flink-container/kubernetes/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ In non HA mode, you should first start the job cluster service:

In order to deploy the job cluster entrypoint run:

`FLINK_IMAGE_NAME=<IMAGE_NAME> FLINK_JOB=<JOB_NAME> FLINK_JOB_PARALLELISM=<PARALLELISM> envsubst < job-cluster-job.yaml.template | kubectl create -f -`
`FLINK_IMAGE_NAME=<IMAGE_NAME> FLINK_JOB_PARALLELISM=<PARALLELISM> envsubst < job-cluster-job.yaml.template | kubectl create -f -`

Now you should see the `flink-job-cluster` job being started by calling `kubectl get job`.

Expand All @@ -41,8 +41,9 @@ At last, you should start the task manager deployment:

You can provide the following additional command line arguments to the cluster entrypoint:

- `--job-classname <job class name>`: Class name of the job to run. By default, the Flink class path is scanned for a JAR with a `Main-Class` or `program-class` manifest entry and chosen as the job class. Use this command line argument to manually set the job class. This argument is required in case that no or more than one JAR with such a manifest entry is available on the class path.
- `--job-entrypoint <job entrypoint name>`: Class name of the Java job to run or module name of the Python job to run. By default, the Flink class path is scanned for a JAR with a `Main-Class` or `program-class` manifest entry and chosen as the job class. Use this command line argument to manually set the job class. This argument is required in case that no or more than one JAR with such a manifest entry is available on the class path.
- `--job-id <job id>`: Manually set a Flink job ID for the job (default: `00000000000000000000000000000000`)
- `--job-python-artifacts`: Comma separated python artifacts of the Python job to run. The artifacts can be files or directories.

## Resuming from a savepoint

Expand Down

0 comments on commit 900f728

Please sign in to comment.