diff --git a/flink-dist/pom.xml b/flink-dist/pom.xml
index 59ada96f974fb..87718bee3f55e 100644
--- a/flink-dist/pom.xml
+++ b/flink-dist/pom.xml
@@ -241,6 +241,14 @@ under the License.
they are not included into the 'flink-dist' uber jar.
-->
+
+
+ org.apache.flink
+ flink-external-resource-gpu
+ ${project.version}
+ provided
+
+
org.apache.flink
diff --git a/flink-dist/src/main/assemblies/opt.xml b/flink-dist/src/main/assemblies/opt.xml
index d515d587b0329..d8e46b3ac6e13 100644
--- a/flink-dist/src/main/assemblies/opt.xml
+++ b/flink-dist/src/main/assemblies/opt.xml
@@ -75,6 +75,28 @@
0644
+
+
+ ../flink-external-resources/flink-external-resource-gpu/target/flink-external-resource-gpu-${project.version}.jar
+ opt/external-resource-gpu/
+ flink-external-resource-gpu-${project.version}.jar
+ 0644
+
+
+
+ ../flink-external-resources/flink-external-resource-gpu/src/main/resources/gpu-discovery-common.sh
+ opt/external-resource-gpu/
+ gpu-discovery-common.sh
+ 0755
+
+
+
+ ../flink-external-resources/flink-external-resource-gpu/src/main/resources/nvidia-gpu-discovery.sh
+ opt/external-resource-gpu/
+ nvidia-gpu-discovery.sh
+ 0755
+
+
../flink-metrics/flink-metrics-graphite/target/flink-metrics-graphite-${project.version}.jar
diff --git a/flink-external-resources/flink-external-resource-gpu/pom.xml b/flink-external-resources/flink-external-resource-gpu/pom.xml
new file mode 100644
index 0000000000000..c318e0021655c
--- /dev/null
+++ b/flink-external-resources/flink-external-resource-gpu/pom.xml
@@ -0,0 +1,53 @@
+
+
+
+
+ 4.0.0
+
+
+ flink-external-resources
+ org.apache.flink
+ 1.11-SNAPSHOT
+ ..
+
+
+ flink-external-resource-gpu
+ flink-external-resource-gpu
+
+ jar
+
+
+
+ org.apache.flink
+ flink-core
+ ${project.version}
+ provided
+
+
+
+ org.apache.flink
+ flink-test-utils-junit
+ ${project.version}
+ test
+
+
+
+
diff --git a/flink-external-resources/flink-external-resource-gpu/src/main/java/org/apache/flink/externalresource/gpu/GPUDriver.java b/flink-external-resources/flink-external-resource-gpu/src/main/java/org/apache/flink/externalresource/gpu/GPUDriver.java
new file mode 100644
index 0000000000000..25215e0877b47
--- /dev/null
+++ b/flink-external-resources/flink-external-resource-gpu/src/main/java/org/apache/flink/externalresource/gpu/GPUDriver.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.externalresource.gpu;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.externalresource.ExternalResourceDriver;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ExternalResourceOptions;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StringUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.InputStreamReader;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/**
+ * Driver takes the responsibility to discover GPU resources and provide the GPU resource information.
+ * It retrieves the GPU information by executing a user-defined discovery script.
+ */
+class GPUDriver implements ExternalResourceDriver {
+
+ private static final Logger LOG = LoggerFactory.getLogger(GPUDriver.class);
+
+ private static final long DISCOVERY_SCRIPT_TIMEOUT_MS = 10000;
+
+ @VisibleForTesting
+ static final ConfigOption DISCOVERY_SCRIPT_PATH =
+ key("discovery-script.path")
+ .stringType()
+ .defaultValue(String.format("%s/external-resource-gpu/nvidia-gpu-discovery.sh", ConfigConstants.DEFAULT_FLINK_PLUGINS_DIRS));
+
+ @VisibleForTesting
+ static final ConfigOption DISCOVERY_SCRIPT_ARG =
+ key("discovery-script.args")
+ .stringType()
+ .noDefaultValue();
+
+ private final File discoveryScriptFile;
+ private final String args;
+
+ GPUDriver(Configuration config) throws Exception {
+ final String discoveryScriptPathStr = config.getString(DISCOVERY_SCRIPT_PATH);
+ if (StringUtils.isNullOrWhitespaceOnly(discoveryScriptPathStr)) {
+ throw new IllegalConfigurationException(
+ String.format("GPU discovery script ('%s') is not configured.", ExternalResourceOptions.genericKeyWithSuffix(DISCOVERY_SCRIPT_PATH.key())));
+ }
+
+ Path discoveryScriptPath = Paths.get(discoveryScriptPathStr);
+ if (!discoveryScriptPath.isAbsolute()) {
+ discoveryScriptPath = Paths.get(System.getenv().getOrDefault(ConfigConstants.ENV_FLINK_HOME_DIR, "."), discoveryScriptPathStr);
+ }
+ discoveryScriptFile = discoveryScriptPath.toFile();
+
+ if (!discoveryScriptFile.exists()) {
+ throw new FileNotFoundException(String.format("The gpu discovery script does not exist in path %s.", discoveryScriptFile.getAbsolutePath()));
+ }
+ if (!discoveryScriptFile.canExecute()) {
+ throw new FlinkException(String.format("The discovery script %s is not executable.", discoveryScriptFile.getAbsolutePath()));
+ }
+
+ args = config.getString(DISCOVERY_SCRIPT_ARG);
+ }
+
+ @Override
+ public Set retrieveResourceInfo(long gpuAmount) throws Exception {
+ Preconditions.checkArgument(gpuAmount > 0, "The gpuAmount should be positive when retrieving the GPU resource information.");
+
+ final Set gpuResources = new HashSet<>();
+ String output = executeDiscoveryScript(discoveryScriptFile, gpuAmount, args);
+ if (!output.isEmpty()) {
+ String[] indexes = output.split(",");
+ for (String index : indexes) {
+ if (!StringUtils.isNullOrWhitespaceOnly(index)) {
+ gpuResources.add(new GPUInfo(index.trim()));
+ }
+ }
+ }
+ LOG.info("Discover GPU resources: {}.", gpuResources);
+ return Collections.unmodifiableSet(gpuResources);
+ }
+
+ private String executeDiscoveryScript(File discoveryScript, long gpuAmount, String args) throws Exception {
+ final String cmd = discoveryScript.getAbsolutePath() + " " + gpuAmount + " " + args;
+ final Process process = Runtime.getRuntime().exec(cmd);
+ try (final BufferedReader stdoutReader = new BufferedReader(new InputStreamReader(process.getInputStream()));
+ final BufferedReader stderrReader = new BufferedReader(new InputStreamReader(process.getErrorStream()))) {
+ final boolean hasProcessTerminated = process.waitFor(DISCOVERY_SCRIPT_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+ if (!hasProcessTerminated) {
+ throw new TimeoutException(String.format("The discovery script executed for over %d ms.", DISCOVERY_SCRIPT_TIMEOUT_MS));
+ }
+
+ final int exitVal = process.exitValue();
+ if (exitVal != 0) {
+ final String stdout = stdoutReader.lines().collect(StringBuilder::new, StringBuilder::append, StringBuilder::append).toString();
+ final String stderr = stderrReader.lines().collect(StringBuilder::new, StringBuilder::append, StringBuilder::append).toString();
+ LOG.warn("Discovery script exit with {}.\\nSTDOUT: {}\\nSTDERR: {}", exitVal, stdout, stderr);
+ throw new FlinkException(String.format("Discovery script exit with non-zero return code: %s.", exitVal));
+ }
+ Object[] stdout = stdoutReader.lines().toArray();
+ if (stdout.length > 1) {
+ LOG.warn(
+ "The output of the discovery script should only contain one single line. Finding {} lines with content: {}. Will only keep the first line.", stdout.length, Arrays.toString(stdout));
+ }
+ if (stdout.length == 0) {
+ return "";
+ }
+ return (String) stdout[0];
+ } finally {
+ process.destroyForcibly();
+ }
+ }
+}
diff --git a/flink-external-resources/flink-external-resource-gpu/src/main/java/org/apache/flink/externalresource/gpu/GPUDriverFactory.java b/flink-external-resources/flink-external-resource-gpu/src/main/java/org/apache/flink/externalresource/gpu/GPUDriverFactory.java
new file mode 100644
index 0000000000000..d5bd571360090
--- /dev/null
+++ b/flink-external-resources/flink-external-resource-gpu/src/main/java/org/apache/flink/externalresource/gpu/GPUDriverFactory.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.externalresource.gpu;
+
+import org.apache.flink.api.common.externalresource.ExternalResourceDriver;
+import org.apache.flink.api.common.externalresource.ExternalResourceDriverFactory;
+import org.apache.flink.configuration.Configuration;
+
+/**
+ * Factory for creating {@link GPUDriver}.
+ */
+public class GPUDriverFactory implements ExternalResourceDriverFactory {
+ @Override
+ public ExternalResourceDriver createExternalResourceDriver(Configuration config) throws Exception {
+ return new GPUDriver(config);
+ }
+}
diff --git a/flink-external-resources/flink-external-resource-gpu/src/main/java/org/apache/flink/externalresource/gpu/GPUInfo.java b/flink-external-resources/flink-external-resource-gpu/src/main/java/org/apache/flink/externalresource/gpu/GPUInfo.java
new file mode 100644
index 0000000000000..aa5f4fb3965b2
--- /dev/null
+++ b/flink-external-resources/flink-external-resource-gpu/src/main/java/org/apache/flink/externalresource/gpu/GPUInfo.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.externalresource.gpu;
+
+import org.apache.flink.api.common.externalresource.ExternalResourceInfo;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StringUtils;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Optional;
+
+/**
+ * Information for GPU resource. Currently only including the GPU index.
+ */
+public class GPUInfo implements ExternalResourceInfo {
+
+ private static final String PROPERTY_KEY_INDEX = "index";
+
+ private final String index;
+
+ GPUInfo(String index) {
+ Preconditions.checkArgument(!StringUtils.isNullOrWhitespaceOnly(index));
+ this.index = index;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("GPU Device(%s)", index);
+ }
+
+ @Override
+ public int hashCode() {
+ return index.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == this) {
+ return true;
+ } else if (obj instanceof GPUInfo) {
+ final GPUInfo other = (GPUInfo) obj;
+ return this.index.equals(other.index);
+ }
+ return false;
+ }
+
+ @Override
+ public Optional getProperty(String key) {
+ if (key.equals(PROPERTY_KEY_INDEX)) {
+ return Optional.of(index);
+ } else {
+ return Optional.empty();
+ }
+ }
+
+ @Override
+ public Collection getKeys() {
+ return Collections.singleton(PROPERTY_KEY_INDEX);
+ }
+}
diff --git a/flink-external-resources/flink-external-resource-gpu/src/main/resources/META-INF/services/org.apache.flink.api.common.externalresource.ExternalResourceDriverFactory b/flink-external-resources/flink-external-resource-gpu/src/main/resources/META-INF/services/org.apache.flink.api.common.externalresource.ExternalResourceDriverFactory
new file mode 100644
index 0000000000000..7d366fe21ed14
--- /dev/null
+++ b/flink-external-resources/flink-external-resource-gpu/src/main/resources/META-INF/services/org.apache.flink.api.common.externalresource.ExternalResourceDriverFactory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.externalresource.gpu.GPUDriverFactory
diff --git a/flink-external-resources/flink-external-resource-gpu/src/main/resources/gpu-discovery-common.sh b/flink-external-resources/flink-external-resource-gpu/src/main/resources/gpu-discovery-common.sh
new file mode 100755
index 0000000000000..015835846e8b4
--- /dev/null
+++ b/flink-external-resources/flink-external-resource-gpu/src/main/resources/gpu-discovery-common.sh
@@ -0,0 +1,92 @@
+#!/usr/bin/env bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+non_coordination_allocate() {
+ indexes=($1)
+ amount=$2
+ to_occupy_indexes=(${indexes[@]:0:$amount})
+ if [ $amount -gt ${#to_occupy_indexes[@]} ]; then
+ echo "Could not get enough GPU resources."
+ exit 1
+ fi
+ echo ${to_occupy_indexes[@]} | sed 's/ /,/g'
+}
+
+coordination_allocate() {
+ indexes=($1)
+ amount=$2
+ coordination_file=${3:-/var/tmp/flink-gpu-coordination}
+ (
+ flock -x 200
+ # GPU indexes to be occupied.
+ to_occupy_indexes=()
+ # GPU indexes which are already recorded in the coordination file. These indexes should not be occupied unless the associated
+ # processes are no longer alive.
+ recorded_indexes=()
+ for i in ${indexes[@]}
+ do
+ if [ ${#to_occupy_indexes[@]} -eq $amount ]; then
+ break
+ elif [ `grep -c "^$i " $coordination_file` -ne 0 ]; then
+ recorded_indexes[${#recorded_indexes[@]}]=$i
+ else
+ to_occupy_indexes[${#to_occupy_indexes[@]}]=$i
+ fi
+ done
+
+ # If there are not enough indexes, we will try to occupy indexes whose associated processes are dead.
+ for i in ${!recorded_indexes[@]}
+ do
+ if [ ${#to_occupy_indexes[@]} -eq $amount ];then
+ break
+ fi
+ owner=`grep "^${recorded_indexes[$i]} " $coordination_file | awk '{print $2}'`
+ if [ -n $owner ] && [ `ps -p $owner | grep -c $owner` -eq 0 ]; then
+ # The owner does not exist anymore. We could occupy it.
+ sed -i "/${recorded_indexes[$i]} /d" $coordination_file
+ to_occupy_indexes[${#to_occupy_indexes[@]}]=${recorded_indexes[$i]}
+ unset recorded_indexes[$i]
+ fi
+ done
+
+ if [ $amount -gt ${#to_occupy_indexes[@]} ]; then
+ echo "Could not get enough GPU resources."
+ exit 1
+ fi
+
+ for i in "${to_occupy_indexes[@]}"
+ do
+ echo "$i $PPID" >> $coordination_file
+ done
+
+ echo ${to_occupy_indexes[@]} | sed 's/ /,/g'
+ ) 200<> $coordination_file
+}
+
+gpu_discovery() {
+ indexes=$1
+ amount=$2
+ coordination_mode=$3
+ coordination_file=${4:-/var/tmp/flink-gpu-coordination}
+ if [ "$coordination_mode" == "coordination" ]; then
+ coordination_allocate "$indexes" $amount $coordination_file
+ else
+ non_coordination_allocate "$indexes" $amount
+ fi
+}
diff --git a/flink-external-resources/flink-external-resource-gpu/src/main/resources/nvidia-gpu-discovery.sh b/flink-external-resources/flink-external-resource-gpu/src/main/resources/nvidia-gpu-discovery.sh
new file mode 100755
index 0000000000000..e16e575cc008f
--- /dev/null
+++ b/flink-external-resources/flink-external-resource-gpu/src/main/resources/nvidia-gpu-discovery.sh
@@ -0,0 +1,60 @@
+#!/usr/bin/env bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+source "$(dirname "$0")"/gpu-discovery-common.sh
+
+if [ $# -lt 1 ]; then
+ echo "Usage: ./nvidia-gpu-discovery.sh gpu-amount [--enable-coordination-mode] [--coordination-file filePath]"
+ exit 1
+fi
+
+AMOUNT=$1
+shift
+COORDINATION_FILE="/var/tmp/flink-gpu-coordination"
+COORDINATION_MODE=""
+EXIT_NON_ZERO=""
+
+while [[ $# -ge 1 ]]
+do
+key="$1"
+shift
+ case $key in
+ --enable-coordination-mode)
+ COORDINATION_MODE="coordination"
+ ;;
+ --coordination-file)
+ COORDINATION_FILE="$1"
+ shift
+ ;;
+ *)
+ # unknown option
+ ;;
+ esac
+done
+
+if [ $AMOUNT -eq 0 ]; then
+ exit 0
+fi
+
+csv_index=`nvidia-smi --query-gpu=index --format=csv,noheader`
+if [ $? -ne 0 ]; then
+ exit 1
+fi
+IFS=',' read -r -a indexes <<< $(echo $csv_index)
+gpu_discovery "${indexes[*]}" $AMOUNT $COORDINATION_MODE $COORDINATION_FILE
diff --git a/flink-external-resources/flink-external-resource-gpu/src/test/java/org/apache/flink/externalresource/gpu/GPUDiscoveryScriptTest.java b/flink-external-resources/flink-external-resource-gpu/src/test/java/org/apache/flink/externalresource/gpu/GPUDiscoveryScriptTest.java
new file mode 100644
index 0000000000000..7f338404513bb
--- /dev/null
+++ b/flink-external-resources/flink-external-resource-gpu/src/test/java/org/apache/flink/externalresource/gpu/GPUDiscoveryScriptTest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.externalresource.gpu;
+
+import org.apache.flink.util.OperatingSystem;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+
+import static org.junit.Assume.assumeTrue;
+
+/**
+ * Tests for the gpu-discovery-common.sh.
+ */
+public class GPUDiscoveryScriptTest extends TestLogger {
+
+ private static final String TEST_SCRIPT_PATH = "src/test/resources/test-coordination-mode.sh";
+
+ @Test
+ public void testNonCoordinationMode() throws Exception {
+ assumeTrue(OperatingSystem.isLinux());
+ testExistWithNonZero("test_non_coordination_mode");
+ }
+
+ @Test
+ public void testCoordinateIndexes() throws Exception {
+ assumeTrue(OperatingSystem.isLinux());
+ testExistWithNonZero("test_coordinate_indexes");
+ }
+
+ @Test
+ public void testPreemptFromDeadProcesses() throws Exception {
+ assumeTrue(OperatingSystem.isLinux());
+ testExistWithNonZero("test_preempt_from_dead_processes");
+ }
+
+ @Test
+ public void testSetCoordinationFile() throws Exception {
+ assumeTrue(OperatingSystem.isLinux());
+ testExistWithNonZero("test_coordination_file");
+ }
+
+ private void testExistWithNonZero(String cmd) throws Exception {
+ final ProcessBuilder processBuilder = new ProcessBuilder(TEST_SCRIPT_PATH, cmd);
+ processBuilder.redirectErrorStream(true);
+ final Process process = processBuilder.start();
+ try (final BufferedReader stdoutReader = new BufferedReader(new InputStreamReader(process.getInputStream()))) {
+ final int exitValue = process.waitFor();
+ if (exitValue != 0) {
+ final String stdout = stdoutReader.lines().collect(StringBuilder::new, StringBuilder::append, StringBuilder::append).toString();
+ throw new Exception(String.format("Script exist with non-zero %d.\\n OUTPUT: %s", exitValue, stdout));
+ }
+ } finally {
+ process.destroyForcibly();
+ }
+ }
+}
diff --git a/flink-external-resources/flink-external-resource-gpu/src/test/java/org/apache/flink/externalresource/gpu/GPUDriverTest.java b/flink-external-resources/flink-external-resource-gpu/src/test/java/org/apache/flink/externalresource/gpu/GPUDriverTest.java
new file mode 100644
index 0000000000000..296f3d624a9d4
--- /dev/null
+++ b/flink-external-resources/flink-external-resource-gpu/src/test/java/org/apache/flink/externalresource/gpu/GPUDriverTest.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.externalresource.gpu;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.util.Set;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for {@link GPUDriver}.
+ */
+public class GPUDriverTest extends TestLogger {
+
+ private static final String TESTING_DISCOVERY_SCRIPT_PATH = "src/test/resources/testing-gpu-discovery.sh";
+
+ @ClassRule
+ public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder();
+
+ @Test
+ public void testGPUDriverWithTestScript() throws Exception {
+ final int gpuAmount = 2;
+ final Configuration config = new Configuration();
+ config.setString(GPUDriver.DISCOVERY_SCRIPT_PATH, TESTING_DISCOVERY_SCRIPT_PATH);
+
+ final GPUDriver gpuDriver = new GPUDriver(config);
+ final Set gpuResource = gpuDriver.retrieveResourceInfo(gpuAmount);
+
+ assertThat(gpuResource.size(), is(gpuAmount));
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testGPUDriverWithInvalidAmount() throws Exception {
+ final int gpuAmount = -1;
+ final Configuration config = new Configuration();
+ config.setString(GPUDriver.DISCOVERY_SCRIPT_PATH, TESTING_DISCOVERY_SCRIPT_PATH);
+
+ final GPUDriver gpuDriver = new GPUDriver(config);
+ gpuDriver.retrieveResourceInfo(gpuAmount);
+ }
+
+ @Test(expected = IllegalConfigurationException.class)
+ public void testGPUDriverWithIllegalConfigTestScript() throws Exception {
+ final Configuration config = new Configuration();
+ config.setString(GPUDriver.DISCOVERY_SCRIPT_PATH, " ");
+
+ new GPUDriver(config);
+ }
+
+ @Test(expected = FileNotFoundException.class)
+ public void testGPUDriverWithTestScriptDoNotExist() throws Exception {
+ final Configuration config = new Configuration();
+ config.setString(GPUDriver.DISCOVERY_SCRIPT_PATH, "invalid/path");
+
+ new GPUDriver(config);
+ }
+
+ @Test(expected = FlinkException.class)
+ public void testGPUDriverWithInexecutableScript() throws Exception {
+ final Configuration config = new Configuration();
+ final File inexecutableFile = TEMP_FOLDER.newFile();
+ assertTrue(inexecutableFile.setExecutable(false));
+
+ config.setString(GPUDriver.DISCOVERY_SCRIPT_PATH, inexecutableFile.getAbsolutePath());
+
+ new GPUDriver(config);
+ }
+
+ @Test(expected = FlinkException.class)
+ public void testGPUDriverWithTestScriptExitWithNonZero() throws Exception {
+ final Configuration config = new Configuration();
+ config.setString(GPUDriver.DISCOVERY_SCRIPT_PATH, TESTING_DISCOVERY_SCRIPT_PATH);
+ config.setString(GPUDriver.DISCOVERY_SCRIPT_ARG, "--exit-non-zero");
+
+ final GPUDriver gpuDriver = new GPUDriver(config);
+ gpuDriver.retrieveResourceInfo(1);
+ }
+}
diff --git a/flink-external-resources/flink-external-resource-gpu/src/test/resources/test-coordination-mode.sh b/flink-external-resources/flink-external-resource-gpu/src/test/resources/test-coordination-mode.sh
new file mode 100755
index 0000000000000..676d273b8ad06
--- /dev/null
+++ b/flink-external-resources/flink-external-resource-gpu/src/test/resources/test-coordination-mode.sh
@@ -0,0 +1,78 @@
+#!/usr/bin/env bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Usage: ./test-coordination-mode.sh target_function
+
+test_non_coordination_mode() {
+ IFS=',' read -r -a output <<< $(bash -c "$(dirname "$0")/testing-gpu-discovery.sh 2")
+ if [ ${#output[@]} -ne 2 ]; then
+ exit 1
+ fi
+}
+
+test_coordinate_indexes() {
+ IFS=',' read -r -a output1 <<< $(bash -c "$(dirname "$0")/testing-gpu-discovery.sh 2 --enable-coordination-mode --available-gpu-amount 4")
+ IFS=',' read -r -a output2 <<< $(bash -c "$(dirname "$0")/testing-gpu-discovery.sh 2 --enable-coordination-mode --available-gpu-amount 4")
+
+ if [[ ${#output1[@]} -ne 2 || ${#output2[@]} -ne 2 ]]; then
+ exit 1
+ fi
+
+ for i in output1
+ do
+ for j in output2
+ do
+ if [ $i == $j ]; then
+ exit 1
+ fi
+ done
+ done
+}
+
+test_preempt_from_dead_processes() {
+ local test_pid
+ while [[ -z $test_pid || $(ps -p $test_pid | grep -c $test_pid) -ne 0 ]]
+ do
+ test_pid=$(shuf -i 1-32768 -n 1)
+ done
+ echo 0 $test_pid >> /var/tmp/flink-gpu-coordination
+ IFS=',' read -r -a output1 <<< $(bash -c "$(dirname "$0")/testing-gpu-discovery.sh 2 --enable-coordination-mode --available-gpu-amount 4")
+ IFS=',' read -r -a output2 <<< $(bash -c "$(dirname "$0")/testing-gpu-discovery.sh 2 --enable-coordination-mode --available-gpu-amount 4")
+
+ if [[ ${#output1[@]} -ne 2 || ${#output2[@]} -ne 2 ]]; then
+ exit 1
+ fi
+}
+
+test_coordination_file() {
+ coordination_file=/var/tmp/flink-test-coordination
+ $(dirname "$0")/testing-gpu-discovery.sh 2 --enable-coordination-mode --coordination-file $coordination_file
+
+ if ![ -f $coordination_file ]; then
+ exit 1;
+ fi
+}
+
+clean_state() {
+ rm -f /var/tmp/flink-gpu-coordination
+ rm -f /var/tmp/flink-test-coordination
+}
+trap clean_state EXIT
+
+$@
diff --git a/flink-external-resources/flink-external-resource-gpu/src/test/resources/testing-gpu-discovery.sh b/flink-external-resources/flink-external-resource-gpu/src/test/resources/testing-gpu-discovery.sh
new file mode 100755
index 0000000000000..3b37a83342f30
--- /dev/null
+++ b/flink-external-resources/flink-external-resource-gpu/src/test/resources/testing-gpu-discovery.sh
@@ -0,0 +1,68 @@
+#!/usr/bin/env bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+source "$(dirname "$0")"/../../main/resources/gpu-discovery-common.sh
+
+if [ $# -lt 1 ]; then
+ echo "Usage: ./testing-gpu-discovery.sh gpu-amount [--available-gpu-amount amount] [--enable-coordination-mode] [--coordination-file filePath] [--exit-non-zero]"
+ exit 1
+fi
+
+AMOUNT=$1
+shift
+AVAILABLE_AMOUNT=$AMOUNT
+COORDINATION_FILE="/var/tmp/flink-gpu-coordination"
+COORDINATION_MODE=""
+EXIT_NON_ZERO=""
+
+while [[ $# -ge 1 ]]
+do
+key="$1"
+shift
+ case $key in
+ --enable-coordination-mode)
+ COORDINATION_MODE="coordination"
+ ;;
+ --coordination-file)
+ COORDINATION_FILE="$1"
+ shift
+ ;;
+ --available-gpu-amount)
+ AVAILABLE_AMOUNT=$1
+ shift
+ ;;
+ --exit-non-zero)
+ EXIT_NON_ZERO="exit-non-zero"
+ ;;
+ *)
+ # unknown option
+ ;;
+ esac
+done
+
+if [ "$EXIT_NON_ZERO" == "exit-non-zero" ]; then
+ exit 1
+fi
+
+if [ $AMOUNT -eq 0 ]; then
+ exit 0
+fi
+
+indexes=($(seq 0 1 $AVAILABLE_AMOUNT))
+gpu_discovery "${indexes[*]}" $AMOUNT $COORDINATION_MODE $COORDINATION_FILE
diff --git a/flink-external-resources/pom.xml b/flink-external-resources/pom.xml
new file mode 100644
index 0000000000000..23d9a4df0bcd6
--- /dev/null
+++ b/flink-external-resources/pom.xml
@@ -0,0 +1,40 @@
+
+
+
+
+ 4.0.0
+
+
+ org.apache.flink
+ flink-parent
+ 1.11-SNAPSHOT
+ ..
+
+
+ flink-external-resources
+ flink-external-resources
+ pom
+
+
+ flink-external-resource-gpu
+
+
+
diff --git a/pom.xml b/pom.xml
index 978dfb9eb67e2..a5c79a4eb5e0c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -87,6 +87,7 @@ under the License.
flink-ml-parentflink-walkthroughsflink-kubernetes
+ flink-external-resources
diff --git a/tools/ci/stage.sh b/tools/ci/stage.sh
index 67fdad8948b0f..e431ebe1c7d7b 100644
--- a/tools/ci/stage.sh
+++ b/tools/ci/stage.sh
@@ -42,7 +42,9 @@ flink-scala,\
flink-streaming-java,\
flink-streaming-scala,\
flink-metrics,\
-flink-metrics/flink-metrics-core"
+flink-metrics/flink-metrics-core,\
+flink-external-resources,\
+flink-external-resources/flink-external-resource-gpu"
MODULES_LIBRARIES="\
flink-libraries/flink-cep,\