Skip to content

Commit

Permalink
[FLINK-12143] Distribute FLINK_PLUGINS_DIR the same way as FLINK_LIB_…
Browse files Browse the repository at this point in the history
…DIR is distributed
  • Loading branch information
pnowojski committed Jun 13, 2019
1 parent a98a7ee commit b5dc213
Show file tree
Hide file tree
Showing 9 changed files with 69 additions and 26 deletions.
1 change: 1 addition & 0 deletions flink-container/docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ RUN apk add --no-cache bash snappy libc6-compat
ENV FLINK_INSTALL_PATH=/opt
ENV FLINK_HOME $FLINK_INSTALL_PATH/flink
ENV FLINK_LIB_DIR $FLINK_HOME/lib
ENV FLINK_PLUGINS_DIR $FLINK_HOME/plugins
ENV FLINK_OPT_DIR $FLINK_HOME/opt
ENV FLINK_JOB_ARTIFACTS_DIR $FLINK_INSTALL_PATH/artifacts
ENV PATH $PATH:$FLINK_HOME/bin
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2011,6 +2011,9 @@ public final class ConfigConstants {
/** The environment variable name which contains the location of the opt directory. */
public static final String ENV_FLINK_OPT_DIR = "FLINK_OPT_DIR";

/** The environment variable name which contains the location of the plugins folder. */
public static final String ENV_FLINK_PLUGINS_DIR = "FLINK_PLUGINS_DIR";

/** The environment variable name which contains the location of the bin directory. */
public static final String ENV_FLINK_BIN_DIR = "FLINK_BIN_DIR";

Expand Down
2 changes: 2 additions & 0 deletions flink-dist/src/main/flink-bin/bin/config.sh
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,7 @@ SYMLINK_RESOLVED_BIN=`cd "$bin"; pwd -P`
# Define the main directory of the flink installation
FLINK_HOME=`dirname "$SYMLINK_RESOLVED_BIN"`
FLINK_LIB_DIR=$FLINK_HOME/lib
FLINK_PLUGINS_DIR=$FLINK_HOME/plugins
FLINK_OPT_DIR=$FLINK_HOME/opt


Expand All @@ -314,6 +315,7 @@ YAML_CONF=${FLINK_CONF_DIR}/${FLINK_CONF_FILE}
### Exported environment variables ###
export FLINK_CONF_DIR
export FLINK_BIN_DIR
export FLINK_PLUGINS_DIR
# export /lib dir to access it during deployment of the Yarn staging files
export FLINK_LIB_DIR
# export /opt dir to access it for the SQL client
Expand Down
1 change: 1 addition & 0 deletions flink-dist/src/main/flink-bin/bin/flink.bat
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ setlocal
SET bin=%~dp0
SET FLINK_HOME=%bin%..
SET FLINK_LIB_DIR=%FLINK_HOME%\lib
SET FLINK_PLUGINS_DIR=%FLINK_HOME%\plugins

SET JVM_ARGS=-Xmx512m

Expand Down
1 change: 1 addition & 0 deletions flink-dist/src/main/flink-bin/bin/start-cluster.bat
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ setlocal EnableDelayedExpansion
SET bin=%~dp0
SET FLINK_HOME=%bin%..
SET FLINK_LIB_DIR=%FLINK_HOME%\lib
SET FLINK_PLUGINS_DIR=%FLINK_HOME%\plugins
SET FLINK_CONF_DIR=%FLINK_HOME%\conf
SET FLINK_LOG_DIR=%FLINK_HOME%\log

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,18 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.clusterframework.ContainerSpecification;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;
import java.util.Map;

import static org.apache.flink.configuration.ConfigConstants.ENV_FLINK_BIN_DIR;
import static org.apache.flink.configuration.ConfigConstants.ENV_FLINK_CONF_DIR;
import static org.apache.flink.configuration.ConfigConstants.ENV_FLINK_HOME_DIR;
import static org.apache.flink.configuration.ConfigConstants.ENV_FLINK_LIB_DIR;
import static org.apache.flink.configuration.ConfigConstants.ENV_FLINK_PLUGINS_DIR;
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkState;

Expand All @@ -56,11 +57,13 @@ public class FlinkDistributionOverlay extends AbstractContainerOverlay {
final File flinkBinPath;
final File flinkConfPath;
final File flinkLibPath;
final File flinkPluginsPath;

public FlinkDistributionOverlay(File flinkBinPath, File flinkConfPath, File flinkLibPath) {
public FlinkDistributionOverlay(File flinkBinPath, File flinkConfPath, File flinkLibPath, File flinkPluginsPath) {
this.flinkBinPath = checkNotNull(flinkBinPath);
this.flinkConfPath = checkNotNull(flinkConfPath);
this.flinkLibPath = checkNotNull(flinkLibPath);
this.flinkPluginsPath = checkNotNull(flinkPluginsPath);
}

@Override
Expand All @@ -72,6 +75,7 @@ public void configure(ContainerSpecification container) throws IOException {
addPathRecursively(flinkBinPath, TARGET_ROOT, container);
addPathRecursively(flinkConfPath, TARGET_ROOT, container);
addPathRecursively(flinkLibPath, TARGET_ROOT, container);
addPathRecursively(flinkPluginsPath, TARGET_ROOT, container);
}

public static Builder newBuilder() {
Expand All @@ -85,6 +89,7 @@ public static class Builder {
File flinkBinPath;
File flinkConfPath;
File flinkLibPath;
File flinkPluginsPath;

/**
* Configures the overlay using the current environment.
Expand All @@ -97,12 +102,13 @@ public Builder fromEnvironment(Configuration globalConfiguration) {
flinkBinPath = getObligatoryFileFromEnvironment(ENV_FLINK_BIN_DIR);
flinkConfPath = getObligatoryFileFromEnvironment(ENV_FLINK_CONF_DIR);
flinkLibPath = getObligatoryFileFromEnvironment(ENV_FLINK_LIB_DIR);
flinkPluginsPath = getObligatoryFileFromEnvironment(ENV_FLINK_PLUGINS_DIR);

return this;
}

public FlinkDistributionOverlay build() {
return new FlinkDistributionOverlay(flinkBinPath, flinkConfPath, flinkLibPath);
return new FlinkDistributionOverlay(flinkBinPath, flinkConfPath, flinkLibPath, flinkPluginsPath);
}

private static File getObligatoryFileFromEnvironment(String envVariableName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.HashMap;
import java.util.Map;

import static org.apache.flink.configuration.ConfigConstants.ENV_FLINK_PLUGINS_DIR;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;

Expand All @@ -49,6 +50,7 @@ public void testConfigure() throws Exception {

File binFolder = tempFolder.newFolder("bin");
File libFolder = tempFolder.newFolder("lib");
File pluginsFolder = tempFolder.newFolder("plugins");
File confFolder = tempFolder.newFolder("conf");

Path[] files = createPaths(
Expand All @@ -58,14 +60,17 @@ public void testConfigure() throws Exception {
"lib/foo.jar",
"lib/A/foo.jar",
"lib/B/foo.jar",
"lib/B/bar.jar");
"lib/B/bar.jar",
"plugins/P1/plugin1a.jar",
"plugins/P1/plugin1b.jar",
"plugins/P2/plugin2.jar");

ContainerSpecification containerSpecification = new ContainerSpecification();
FlinkDistributionOverlay overlay = new FlinkDistributionOverlay(
binFolder,
confFolder,
libFolder
);
libFolder,
pluginsFolder);
overlay.configure(containerSpecification);

for(Path file : files) {
Expand All @@ -79,31 +84,39 @@ public void testBuilderFromEnvironment() throws Exception {

File binFolder = tempFolder.newFolder("bin");
File libFolder = tempFolder.newFolder("lib");
File pluginsFolder = tempFolder.newFolder("plugins");
File confFolder = tempFolder.newFolder("conf");

// adjust the test environment for the purposes of this test
Map<String, String> map = new HashMap<String, String>(System.getenv());
map.put(ENV_FLINK_BIN_DIR, binFolder.getAbsolutePath());
map.put(ENV_FLINK_LIB_DIR, libFolder.getAbsolutePath());
map.put(ENV_FLINK_PLUGINS_DIR, pluginsFolder.getAbsolutePath());
map.put(ENV_FLINK_CONF_DIR, confFolder.getAbsolutePath());
CommonTestUtils.setEnv(map);

FlinkDistributionOverlay.Builder builder = FlinkDistributionOverlay.newBuilder().fromEnvironment(conf);

assertEquals(binFolder.getAbsolutePath(), builder.flinkBinPath.getAbsolutePath());
assertEquals(libFolder.getAbsolutePath(), builder.flinkLibPath.getAbsolutePath());
assertEquals(pluginsFolder.getAbsolutePath(), builder.flinkPluginsPath.getAbsolutePath());
assertEquals(confFolder.getAbsolutePath(), builder.flinkConfPath.getAbsolutePath());
}

@Test
public void testBuilderFromEnvironmentBad() throws Exception {
testBuilderFromEnvironmentBad(ENV_FLINK_BIN_DIR);
testBuilderFromEnvironmentBad(ENV_FLINK_LIB_DIR);
testBuilderFromEnvironmentBad(ENV_FLINK_PLUGINS_DIR);
testBuilderFromEnvironmentBad(ENV_FLINK_CONF_DIR);
}

public void testBuilderFromEnvironmentBad(String obligatoryEnvironmentVariable) throws Exception {
Configuration conf = new Configuration();

// adjust the test environment for the purposes of this test
Map<String, String> map = new HashMap<>(System.getenv());
map.remove(ENV_FLINK_BIN_DIR);
map.remove(ENV_FLINK_LIB_DIR);
map.remove(ENV_FLINK_CONF_DIR);
map.remove(obligatoryEnvironmentVariable);
CommonTestUtils.setEnv(map);

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@
import java.util.stream.Collectors;

import static org.apache.flink.configuration.ConfigConstants.ENV_FLINK_LIB_DIR;
import static org.apache.flink.configuration.ConfigConstants.ENV_FLINK_PLUGINS_DIR;
import static org.apache.flink.runtime.entrypoint.component.FileJobGraphRetriever.JOB_GRAPH_FILE_PATH;
import static org.apache.flink.yarn.cli.FlinkYarnSessionCli.CONFIG_FILE_LOG4J_NAME;
import static org.apache.flink.yarn.cli.FlinkYarnSessionCli.CONFIG_FILE_LOGBACK_NAME;
Expand Down Expand Up @@ -700,7 +701,7 @@ public ApplicationReport startAppMaster(
}
}

addLibFolderToShipFiles(systemShipFiles);
addEnvironmentFoldersToShipFiles(systemShipFiles);

// Set-up ApplicationSubmissionContext for the application

Expand Down Expand Up @@ -1513,23 +1514,32 @@ public void run() {
}
}

protected void addLibFolderToShipFiles(Collection<File> effectiveShipFiles) {
protected void addEnvironmentFoldersToShipFiles(Collection<File> effectiveShipFiles) {
// Add lib folder to the ship files if the environment variable is set.
// This is for convenience when running from the command-line.
// (for other files users explicitly set the ship files)
String libDir = System.getenv().get(ENV_FLINK_LIB_DIR);
if (libDir != null) {
File libDirFile = new File(libDir);
if (libDirFile.isDirectory()) {
effectiveShipFiles.add(libDirFile);
} else {
throw new YarnDeploymentException("The environment variable '" + ENV_FLINK_LIB_DIR +
"' is set to '" + libDir + "' but the directory doesn't exist.");
}
addEnvFolderToShipFiles(effectiveShipFiles, libDir, ENV_FLINK_LIB_DIR);
} else if (this.shipFiles.isEmpty()) {
LOG.warn("Environment variable '{}' not set and ship files have not been provided manually. " +
"Not shipping any library files.", ENV_FLINK_LIB_DIR);
}

String pluginsDir = System.getenv().get(ENV_FLINK_PLUGINS_DIR);
if (pluginsDir != null) {
addEnvFolderToShipFiles(effectiveShipFiles, pluginsDir, ENV_FLINK_PLUGINS_DIR);
}
}

private void addEnvFolderToShipFiles(Collection<File> effectiveShipFiles, String directory, String environmentVariableName) {
File directoryFile = new File(directory);
if (directoryFile.isDirectory()) {
effectiveShipFiles.add(directoryFile);
} else {
throw new YarnDeploymentException("The environment variable '" + environmentVariableName +
"' is set to '" + directory + "' but the directory doesn't exist.");
}
}

protected ContainerLaunchContext setupApplicationMasterContainer(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -435,10 +435,10 @@ public void testSetupApplicationMasterContainer() {
}

/**
* Tests to ship a lib folder through the {@code YarnClusterDescriptor.addShipFiles}.
* Tests to ship files through the {@code YarnClusterDescriptor.addShipFiles}.
*/
@Test
public void testExplicitLibShipping() throws Exception {
public void testExplicitFileShipping() throws Exception {
try (YarnClusterDescriptor descriptor = new YarnClusterDescriptor(
new Configuration(),
yarnConfiguration,
Expand All @@ -464,7 +464,7 @@ public void testExplicitLibShipping() throws Exception {

// only execute part of the deployment to test for shipped files
Set<File> effectiveShipFiles = new HashSet<>();
descriptor.addLibFolderToShipFiles(effectiveShipFiles);
descriptor.addEnvironmentFoldersToShipFiles(effectiveShipFiles);

Assert.assertEquals(0, effectiveShipFiles.size());
Assert.assertEquals(2, descriptor.shipFiles.size());
Expand All @@ -473,11 +473,17 @@ public void testExplicitLibShipping() throws Exception {
}
}

/**
* Tests to ship a lib folder through the {@code ConfigConstants.ENV_FLINK_LIB_DIR}.
*/
@Test
public void testEnvironmentLibShipping() throws Exception {
testEnvironmentDirectoryShipping(ConfigConstants.ENV_FLINK_LIB_DIR);
}

@Test
public void testEnvironmentPluginsShipping() throws Exception {
testEnvironmentDirectoryShipping(ConfigConstants.ENV_FLINK_PLUGINS_DIR);
}

public void testEnvironmentDirectoryShipping(String environmentVariable) throws Exception {
try (YarnClusterDescriptor descriptor = new YarnClusterDescriptor(
new Configuration(),
yarnConfiguration,
Expand All @@ -493,10 +499,10 @@ public void testEnvironmentLibShipping() throws Exception {
final Map<String, String> oldEnv = System.getenv();
try {
Map<String, String> env = new HashMap<>(1);
env.put(ConfigConstants.ENV_FLINK_LIB_DIR, libFolder.getAbsolutePath());
env.put(environmentVariable, libFolder.getAbsolutePath());
CommonTestUtils.setEnv(env);
// only execute part of the deployment to test for shipped files
descriptor.addLibFolderToShipFiles(effectiveShipFiles);
descriptor.addEnvironmentFoldersToShipFiles(effectiveShipFiles);
} finally {
CommonTestUtils.setEnv(oldEnv);
}
Expand Down

0 comments on commit b5dc213

Please sign in to comment.