Skip to content

Commit

Permalink
[FLINK-14465] Let StandaloneJobClusterEntryPoint use the user code cl…
Browse files Browse the repository at this point in the history
…ass loader

[FLINK-14465] The PackageProgram's constructor does not throw excpetion
any more when jarFile is null. Introducing this change is because there
may be no jarFile in perjob mode. All jars the user code depends on are
in the classpaths.

[FLINK-14465] ClassPathJobGraphRetriever creates PackagesProgram with user
class paths.

[FLINK-14465] StandaloneJobClusterEntryPoint uses "FLINK_HOME/usrlib"
as the job's class path. The environment variable FLINK_HOME is set
at Dockerfile. Link the FLINK_JOB_ARTIFACTS_DIR to the FLINK_HOME/job,
which makes the FlinkUserClassloader load the user class in the
standalone perjob mode.

This closes apache#10076.
  • Loading branch information
guoweiM authored and tillrohrmann committed Nov 9, 2019
1 parent 1806a37 commit c1e9aef
Show file tree
Hide file tree
Showing 31 changed files with 813 additions and 262 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -796,9 +796,12 @@ PackagedProgram buildProgram(
jarFile = getJarFile(jarFilePath);
}

PackagedProgram program = entryPointClass == null ?
new PackagedProgram(jarFile, classpaths, programArgs) :
new PackagedProgram(jarFile, classpaths, entryPointClass, programArgs);
PackagedProgram program = PackagedProgram.newBuilder()
.setJarFile(jarFile)
.setUserClassPaths(classpaths)
.setEntryPointClassName(entryPointClass)
.setArguments(programArgs)
.build();

program.setSavepointRestoreSettings(executionParameters.getSavepointRestoreSettings());

Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@
*/
public class PackagedProgramUtils {

private static final String PYTHON_DRIVER_CLASS_NAME = "org.apache.flink.client.python.PythonDriver";

private static final String PYTHON_GATEWAY_CLASS_NAME = "org.apache.flink.client.python.PythonGatewayServer";
/**
* Creates a {@link JobGraph} with a specified {@link JobID}
* from the given {@link PackagedProgram}.
Expand Down Expand Up @@ -102,5 +105,10 @@ public static Pipeline getPipelineFromProgram(PackagedProgram prog, int parallel
}
}

public static Boolean isPython(String entryPointClassName) {
return (entryPointClassName != null) &&
(entryPointClassName.equals(PYTHON_DRIVER_CLASS_NAME) || entryPointClassName.equals(PYTHON_GATEWAY_CLASS_NAME));
}

private PackagedProgramUtils() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ public void testFileNotJarFile() throws Exception {
ProgramOptions programOptions = mock(ProgramOptions.class);
ExecutionConfigAccessor executionOptions = mock(ExecutionConfigAccessor.class);
when(executionOptions.getJarFilePath()).thenReturn(getNonJarFilePath());
when(programOptions.getProgramArgs()).thenReturn(new String[0]);

try {
frontend.buildProgram(programOptions, executionOptions);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ public void setUp() throws Exception {
public void testDetachedMode() throws Exception{
final ClusterClient<?> clusterClient = new MiniClusterClient(new Configuration(), MINI_CLUSTER_RESOURCE.getMiniCluster());
try {
PackagedProgram prg = new PackagedProgram(TestExecuteTwice.class);
PackagedProgram prg = PackagedProgram.newBuilder().setEntryPointClassName(TestExecuteTwice.class.getName()).build();
ClientUtils.executeProgram(clusterClient, prg, 1, true);
fail(FAIL_MESSAGE);
} catch (ProgramInvocationException e) {
Expand All @@ -109,7 +109,7 @@ public void testDetachedMode() throws Exception{
}

try {
PackagedProgram prg = new PackagedProgram(TestEager.class);
PackagedProgram prg = PackagedProgram.newBuilder().setEntryPointClassName(TestEager.class.getName()).build();
ClientUtils.executeProgram(clusterClient, prg, 1, true);
fail(FAIL_MESSAGE);
} catch (ProgramInvocationException e) {
Expand All @@ -119,7 +119,7 @@ public void testDetachedMode() throws Exception{
}

try {
PackagedProgram prg = new PackagedProgram(TestGetRuntime.class);
PackagedProgram prg = PackagedProgram.newBuilder().setEntryPointClassName(TestGetRuntime.class.getName()).build();
ClientUtils.executeProgram(clusterClient, prg, 1, true);
fail(FAIL_MESSAGE);
} catch (ProgramInvocationException e) {
Expand All @@ -129,7 +129,7 @@ public void testDetachedMode() throws Exception{
}

try {
PackagedProgram prg = new PackagedProgram(TestGetAccumulator.class);
PackagedProgram prg = PackagedProgram.newBuilder().setEntryPointClassName(TestGetAccumulator.class.getName()).build();
ClientUtils.executeProgram(clusterClient, prg, 1, true);
fail(FAIL_MESSAGE);
} catch (ProgramInvocationException e) {
Expand All @@ -139,7 +139,7 @@ public void testDetachedMode() throws Exception{
}

try {
PackagedProgram prg = new PackagedProgram(TestGetAllAccumulator.class);
PackagedProgram prg = PackagedProgram.newBuilder().setEntryPointClassName(TestGetAllAccumulator.class.getName()).build();
ClientUtils.executeProgram(clusterClient, prg, 1, true);
fail(FAIL_MESSAGE);
} catch (ProgramInvocationException e) {
Expand Down Expand Up @@ -194,7 +194,10 @@ public Void answer(InvocationOnMock invocation) throws Throwable {

@Test
public void testGetExecutionPlan() throws ProgramInvocationException {
PackagedProgram prg = new PackagedProgram(TestOptimizerPlan.class, "/dev/random", "/tmp");
PackagedProgram prg = PackagedProgram.newBuilder()
.setEntryPointClassName(TestOptimizerPlan.class.getName())
.setArguments("/dev/random", "/tmp")
.build();

Optimizer optimizer = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), config);
Plan plan = (Plan) PackagedProgramUtils.getPipelineFromProgram(prg, 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,10 @@ public class ExecutionPlanCreationTest {
@Test
public void testGetExecutionPlan() {
try {
PackagedProgram prg = new PackagedProgram(TestOptimizerPlan.class, "/dev/random", "/tmp");
PackagedProgram prg = PackagedProgram.newBuilder()
.setEntryPointClassName(TestOptimizerPlan.class.getName())
.setArguments("/dev/random", "/tmp")
.build();

InetAddress mockAddress = InetAddress.getLocalHost();
InetSocketAddress mockJmAddress = new InetSocketAddress(mockAddress, 12345);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.client.program;

import org.apache.flink.client.cli.CliFrontendTestUtils;
import org.apache.flink.configuration.ConfigConstants;

import org.junit.Assert;
Expand All @@ -28,10 +29,13 @@
import java.io.File;
import java.io.FileOutputStream;
import java.nio.file.Files;
import java.util.Collections;
import java.util.List;
import java.util.zip.ZipEntry;
import java.util.zip.ZipOutputStream;

import static org.apache.flink.client.cli.CliFrontendTestUtils.TEST_JAR_MAIN_CLASS;

/**
* Tests for the {@link PackagedProgram}.
*/
Expand All @@ -57,8 +61,15 @@ public void testExtractContainedLibraries() throws Exception {
Assert.assertArrayEquals(nestedJarContent, Files.readAllBytes(files.iterator().next().toPath()));
}

private static final class NullOutputStream extends java.io.OutputStream {
@Override
public void write(int b) {}
@Test
public void testNotThrowExceptionWhenJarFileIsNull() throws Exception {
PackagedProgram.newBuilder()
.setUserClassPaths(Collections.singletonList(new File(CliFrontendTestUtils.getTestJarPath()).toURI().toURL()))
.setEntryPointClassName(TEST_JAR_MAIN_CLASS);
}

@Test(expected = IllegalArgumentException.class)
public void testBuilderThrowExceptionIfjarFileAndEntryPointClassNameAreBothNull() throws ProgramInvocationException {
PackagedProgram.newBuilder().build();
}
}
3 changes: 2 additions & 1 deletion flink-container/docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ ENV FLINK_LIB_DIR $FLINK_HOME/lib
ENV FLINK_PLUGINS_DIR $FLINK_HOME/plugins
ENV FLINK_OPT_DIR $FLINK_HOME/opt
ENV FLINK_JOB_ARTIFACTS_DIR $FLINK_INSTALL_PATH/artifacts
ENV FLINK_USR_LIB_DIR $FLINK_HOME/usrlib
ENV PATH $PATH:$FLINK_HOME/bin

# flink-dist can point to a directory or a tarball on the local system
Expand All @@ -51,7 +52,7 @@ ADD $job_artifacts/* $FLINK_JOB_ARTIFACTS_DIR/

RUN set -x && \
ln -s $FLINK_INSTALL_PATH/flink-[0-9]* $FLINK_HOME && \
for jar in $FLINK_JOB_ARTIFACTS_DIR/*.jar; do [ -f "$jar" ] || continue; ln -s $jar $FLINK_LIB_DIR; done && \
ln -s $FLINK_JOB_ARTIFACTS_DIR $FLINK_USR_LIB_DIR && \
if [ -n "$python_version" ]; then ln -s $FLINK_OPT_DIR/flink-python*.jar $FLINK_LIB_DIR; fi && \
if [ -f ${FLINK_INSTALL_PATH}/flink-shaded-hadoop* ]; then ln -s ${FLINK_INSTALL_PATH}/flink-shaded-hadoop* $FLINK_LIB_DIR; fi && \
addgroup -S flink && adduser -D -S -H -G flink -h $FLINK_HOME flink && \
Expand Down
2 changes: 1 addition & 1 deletion flink-container/docker/docker-entrypoint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
################################################################################

### If unspecified, the hostname of the container is taken as the JobManager address
FLINK_HOME=${FLINK_HOME:-"/opt/flink/bin"}
FLINK_HOME=${FLINK_HOME:-"/opt/flink"}

JOB_CLUSTER="job-cluster"
TASK_MANAGER="task-manager"
Expand Down
60 changes: 60 additions & 0 deletions flink-container/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,66 @@ under the License.
</descriptors>
</configuration>
</execution>
<execution>
<id>create-test-dependency-user-jar</id>
<phase>process-test-classes</phase>
<goals>
<goal>single</goal>
</goals>
<configuration>
<archive>
<manifest>
<mainClass>org.apache.flink.container.entrypoint.testjar.TestUserClassLoaderJob</mainClass>
</manifest>
</archive>
<finalName>maven</finalName>
<attach>false</attach>
<descriptors>
<descriptor>src/test/assembly/test-assembly-test-user-classloader-job-jar.xml</descriptor>
</descriptors>
</configuration>
</execution>
<execution>
<id>create-test-dependency-user-jar-depend</id>
<phase>process-test-classes</phase>
<goals>
<goal>single</goal>
</goals>
<configuration>
<finalName>maven</finalName>
<attach>false</attach>
<descriptors>
<descriptor>src/test/assembly/test-assembly-test-user-classloader-job-lib-jar.xml</descriptor>
</descriptors>
</configuration>
</execution>
</executions>
</plugin>
<!--Remove the external jar test code from the test-classes directory since it mustn't be in the
classpath when running the tests to actually test whether the user code class loader
is properly used.-->
<plugin>
<artifactId>maven-clean-plugin</artifactId>
<version>2.5</version><!--$NO-MVN-MAN-VER$-->
<executions>
<execution>
<id>remove-externaltestclasses</id>
<phase>process-test-classes</phase>
<goals>
<goal>clean</goal>
</goals>
<configuration>
<excludeDefaultDirectories>true</excludeDefaultDirectories>
<filesets>
<fileset>
<directory>${project.build.testOutputDirectory}</directory>
<includes>
<include>**/testjar/TestUser*.class</include>
</includes>
</fileset>
</filesets>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
Expand Down
Loading

0 comments on commit c1e9aef

Please sign in to comment.