Skip to content

Commit

Permalink
[FLINK-16661] Move the ClassPathPackagedProgramRetriever to flink-cli…
Browse files Browse the repository at this point in the history
…ents

This is done because the ClassPathPackagedProgramRetriever is going to be
also used by other ApplicationEntrypoints in the future, including the
YarnApplicationClusterEntryPoint.
  • Loading branch information
kl0u committed Apr 30, 2020
1 parent 4d9e67a commit 93d4c35
Show file tree
Hide file tree
Showing 17 changed files with 78 additions and 340 deletions.
37 changes: 35 additions & 2 deletions flink-clients/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ under the License.
<configuration>
<archive>
<manifest>
<mainClass>org.apache.flink.client.testjar.WordCount</mainClass>
<mainClass>org.apache.flink.client.testjar.TestJob</mainClass>
</manifest>
</archive>
<finalName>maven</finalName>
Expand All @@ -118,6 +118,39 @@ under the License.
</descriptors>
</configuration>
</execution>
<execution>
<id>create-test-dependency-user-jar</id>
<phase>process-test-classes</phase>
<goals>
<goal>single</goal>
</goals>
<configuration>
<archive>
<manifest>
<mainClass>org.apache.flink.client.testjar.TestUserClassLoaderJob</mainClass>
</manifest>
</archive>
<finalName>maven</finalName>
<attach>false</attach>
<descriptors>
<descriptor>src/test/assembly/test-assembly-test-user-classloader-job-jar.xml</descriptor>
</descriptors>
</configuration>
</execution>
<execution>
<id>create-test-dependency-user-jar-depend</id>
<phase>process-test-classes</phase>
<goals>
<goal>single</goal>
</goals>
<configuration>
<finalName>maven</finalName>
<attach>false</attach>
<descriptors>
<descriptor>src/test/assembly/test-assembly-test-user-classloader-job-lib-jar.xml</descriptor>
</descriptors>
</configuration>
</execution>
</executions>
</plugin>
<!--Remove the external jar test code from the test-classes directory since it mustn't be in the
Expand All @@ -139,7 +172,7 @@ under the License.
<fileset>
<directory>${project.build.testOutputDirectory}</directory>
<includes>
<include>**/testjar/*.class</include>
<include>**/testjar/TestUser*.class</include>
</includes>
</fileset>
</filesets>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,12 @@
* limitations under the License.
*/

package org.apache.flink.container.entrypoint;
package org.apache.flink.client.deployment.application;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.client.program.PackagedProgramRetriever;
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.container.entrypoint.JarManifestParser.JarFileWithEntryClass;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.FlinkException;
Expand Down Expand Up @@ -56,7 +55,7 @@
* which creates the {@link org.apache.flink.client.program.PackagedProgram PackagedProgram} containing
* the user's {@code main()} from a class on the class path.
*/
class ClassPathPackagedProgramRetriever implements PackagedProgramRetriever {
public class ClassPathPackagedProgramRetriever implements PackagedProgramRetriever {

private static final Logger LOG = LoggerFactory.getLogger(ClassPathPackagedProgramRetriever.class);

Expand Down Expand Up @@ -178,7 +177,7 @@ private String scanClassPathForJobJar() throws IOException {
.collect(Collectors.toList());
}

final JarFileWithEntryClass jobJar = JarManifestParser.findOnlyEntryClass(jars);
final JarManifestParser.JarFileWithEntryClass jobJar = JarManifestParser.findOnlyEntryClass(jars);
LOG.info("Using {} as job jar", jobJar);
return jobJar.getEntryClass();
}
Expand Down Expand Up @@ -208,7 +207,10 @@ private static boolean notNullAndNotEmpty(String string) {
}
}

static class Builder {
/**
* A builder for the {@link ClassPathPackagedProgramRetriever}.
*/
public static class Builder {

private final String[] programArguments;

Expand All @@ -224,22 +226,22 @@ private Builder(String[] programArguments) {
this.programArguments = requireNonNull(programArguments);
}

Builder setJobClassName(@Nullable String jobClassName) {
public Builder setJobClassName(@Nullable String jobClassName) {
this.jobClassName = jobClassName;
return this;
}

Builder setUserLibDirectory(File userLibDirectory) {
public Builder setUserLibDirectory(File userLibDirectory) {
this.userLibDirectory = userLibDirectory;
return this;
}

Builder setJarsOnClassPath(Supplier<Iterable<File>> jarsOnClassPath) {
public Builder setJarsOnClassPath(Supplier<Iterable<File>> jarsOnClassPath) {
this.jarsOnClassPath = jarsOnClassPath;
return this;
}

ClassPathPackagedProgramRetriever build() throws IOException {
public ClassPathPackagedProgramRetriever build() throws IOException {
return new ClassPathPackagedProgramRetriever(
programArguments,
jobClassName,
Expand All @@ -248,7 +250,7 @@ ClassPathPackagedProgramRetriever build() throws IOException {
}
}

static Builder newBuilder(String[] programArguments) {
public static Builder newBuilder(String[] programArguments) {
return new Builder(programArguments);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* limitations under the License.
*/

package org.apache.flink.container.entrypoint;
package org.apache.flink.client.deployment.application;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.client.program.PackagedProgram;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ under the License.
<directory>${project.build.testOutputDirectory}</directory>
<outputDirectory>/</outputDirectory>
<includes>
<include>org/apache/flink/container/entrypoint/testjar/TestUserClassLoaderJob.class</include>
<include>org/apache/flink/client/testjar/TestUserClassLoaderJob.class</include>
</includes>
</fileSet>
</fileSets>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ under the License.
<directory>${project.build.testOutputDirectory}</directory>
<outputDirectory>/</outputDirectory>
<includes>
<include>org/apache/flink/container/entrypoint/testjar/TestUserClassLoaderJobLib.class</include>
<include>org/apache/flink/client/testjar/TestUserClassLoaderJobLib.class</include>
</includes>
</fileSet>
</fileSets>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,13 @@ public void testShowExecutionPlan() throws Exception {
replaceStdOut();
try {

String[] parameters = new String[]{CliFrontendTestUtils.getTestJarPath(), "-f", "true"};
String[] parameters = new String[]{CliFrontendTestUtils.getTestJarPath(), "-f", "true", "--arg", "suffix"};
Configuration configuration = getConfiguration();
CliFrontend testFrontend = new CliFrontend(
configuration,
Collections.singletonList(getCli(configuration)));
testFrontend.info(parameters);
assertTrue(buffer.toString().contains("\"parallelism\": \"4\""));
assertTrue(buffer.toString().contains("\"parallelism\" : 4"));
}
finally {
restoreStdOut();
Expand All @@ -80,13 +80,13 @@ public void testShowExecutionPlan() throws Exception {
public void testShowExecutionPlanWithParallelism() {
replaceStdOut();
try {
String[] parameters = {"-p", "17", CliFrontendTestUtils.getTestJarPath()};
String[] parameters = {"-p", "17", CliFrontendTestUtils.getTestJarPath(), "--arg", "suffix"};
Configuration configuration = getConfiguration();
CliFrontend testFrontend = new CliFrontend(
configuration,
Collections.singletonList(getCli(configuration)));
testFrontend.info(parameters);
assertTrue(buffer.toString().contains("\"parallelism\": \"17\""));
assertTrue(buffer.toString().contains("\"parallelism\" : 17"));
}
catch (Exception e) {
e.printStackTrace();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
*/
public class CliFrontendTestUtils {

public static final String TEST_JAR_MAIN_CLASS = "org.apache.flink.client.testjar.WordCount";
public static final String TEST_JAR_MAIN_CLASS = "org.apache.flink.client.testjar.TestJob";

public static final String TEST_JAR_CLASSLOADERTEST_CLASS = "org.apache.flink.client.testjar.JobWithExternalDependency";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,21 @@
* limitations under the License.
*/

package org.apache.flink.container.entrypoint;
package org.apache.flink.client.deployment.application;

import org.apache.flink.api.common.JobID;
import org.apache.flink.api.dag.Pipeline;
import org.apache.flink.client.deployment.executors.PipelineExecutorUtils;
import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.client.program.PackagedProgramUtils;
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.client.testjar.TestJob;
import org.apache.flink.client.testjar.TestJobInfo;
import org.apache.flink.configuration.ConfigUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.configuration.PipelineOptionsInternal;
import org.apache.flink.container.entrypoint.ClassPathPackagedProgramRetriever.JarsOnClassPath;
import org.apache.flink.container.entrypoint.testjar.TestJobInfo;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.util.ExceptionUtils;
Expand Down Expand Up @@ -202,7 +202,7 @@ public void testSavepointRestoreSettings() throws FlinkException, IOException, P

@Test
public void testJarFromClassPathSupplierSanityCheck() {
Iterable<File> jarFiles = JarsOnClassPath.INSTANCE.get();
Iterable<File> jarFiles = ClassPathPackagedProgramRetriever.JarsOnClassPath.INSTANCE.get();

// Junit executes this test, so it should be returned as part of JARs on the class path
assertThat(jarFiles, hasItem(hasProperty("name", containsString("junit"))));
Expand Down Expand Up @@ -312,18 +312,18 @@ private JobGraph retrieveJobGraph(ClassPathPackagedProgramRetriever retrieverUnd
}

private static String javaClassPath(String... entries) {
String pathSeparator = System.getProperty(JarsOnClassPath.PATH_SEPARATOR);
String pathSeparator = System.getProperty(ClassPathPackagedProgramRetriever.JarsOnClassPath.PATH_SEPARATOR);
return String.join(pathSeparator, entries);
}

private static Iterable<File> setClassPathAndGetJarsOnClassPath(String classPath) {
final String originalClassPath = System.getProperty(JarsOnClassPath.JAVA_CLASS_PATH);
final String originalClassPath = System.getProperty(ClassPathPackagedProgramRetriever.JarsOnClassPath.JAVA_CLASS_PATH);
try {
System.setProperty(JarsOnClassPath.JAVA_CLASS_PATH, classPath);
return JarsOnClassPath.INSTANCE.get();
System.setProperty(ClassPathPackagedProgramRetriever.JarsOnClassPath.JAVA_CLASS_PATH, classPath);
return ClassPathPackagedProgramRetriever.JarsOnClassPath.INSTANCE.get();
} finally {
// Reset property
System.setProperty(JarsOnClassPath.JAVA_CLASS_PATH, originalClassPath);
System.setProperty(ClassPathPackagedProgramRetriever.JarsOnClassPath.JAVA_CLASS_PATH, originalClassPath);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@
* limitations under the License.
*/

package org.apache.flink.container.entrypoint;
package org.apache.flink.client.deployment.application;

import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.container.entrypoint.JarManifestParser.JarFileWithEntryClass;
import org.apache.flink.client.testjar.TestJob;
import org.apache.flink.util.TestLogger;

import org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -124,7 +124,7 @@ public void testFindOnlyEntryClassSingleJarWithNoManifest() throws IOException {
public void testFindOnlyEntryClassSingleJar() throws IOException {
File jarFile = TestJob.getTestJobJar();

JarFileWithEntryClass jarFileWithEntryClass = JarManifestParser.findOnlyEntryClass(ImmutableList.of(jarFile));
JarManifestParser.JarFileWithEntryClass jarFileWithEntryClass = JarManifestParser.findOnlyEntryClass(ImmutableList.of(jarFile));

assertThat(jarFileWithEntryClass.getEntryClass(), is(equalTo(TestJob.class.getCanonicalName())));
}
Expand All @@ -141,7 +141,7 @@ public void testFindOnlyEntryClassMultipleJarsWithSingleManifestEntry() throws I
File jarWithNoManifest = createJarFileWithManifest(ImmutableMap.of());
File jarFile = TestJob.getTestJobJar();

JarFileWithEntryClass jarFileWithEntryClass = JarManifestParser
JarManifestParser.JarFileWithEntryClass jarFileWithEntryClass = JarManifestParser
.findOnlyEntryClass(ImmutableList.of(jarWithNoManifest, jarFile));

assertThat(jarFileWithEntryClass.getEntryClass(), is(equalTo(TestJob.class.getCanonicalName())));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@
* limitations under the License.
*/

package org.apache.flink.container.entrypoint;
package org.apache.flink.client.testjar;

import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.client.deployment.application.ClassPathPackagedProgramRetrieverTest;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
Expand Down Expand Up @@ -49,12 +50,12 @@ public static void main(String[] args) throws Exception {
* @return Test jar file
* @throws FileNotFoundException If test-jar can not be found
*/
static File getTestJobJar() throws FileNotFoundException {
public static File getTestJobJar() throws FileNotFoundException {
// Check the module's pom.xml for how we create the JAR
File f = new File("target/maven-test-jar.jar");
if (!f.exists()) {
throw new FileNotFoundException("Test jar not present. Invoke tests using Maven "
+ "or build the jar using 'mvn process-test-classes' in flink-container");
+ "or build the jar using 'mvn process-test-classes' in flink-clients");
}
return f;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* limitations under the License.
*/

package org.apache.flink.container.entrypoint.testjar;
package org.apache.flink.client.testjar;

import java.nio.file.Path;
import java.nio.file.Paths;
Expand All @@ -26,8 +26,8 @@
*/
public class TestJobInfo {

public static final String JOB_CLASS = "org.apache.flink.container.entrypoint.testjar.TestUserClassLoaderJob";
public static final String JOB_LIB_CLASS = "org.apache.flink.container.entrypoint.testjar.TestUserClassLoaderJobLib";
public static final String JOB_CLASS = "org.apache.flink.client.testjar.TestUserClassLoaderJob";
public static final String JOB_LIB_CLASS = "org.apache.flink.client.testjar.TestUserClassLoaderJobLib";
public static final Path JOB_JAR_PATH = Paths.get("target", "maven-test-user-classloader-job-jar.jar");
public static final Path JOB_LIB_JAR_PATH = Paths.get("target", "maven-test-user-classloader-job-lib-jar.jar");
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* limitations under the License.
*/

package org.apache.flink.container.entrypoint.testjar;
package org.apache.flink.client.testjar;

import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* limitations under the License.
*/

package org.apache.flink.container.entrypoint.testjar;
package org.apache.flink.client.testjar;

/**
* This class is depended by {@link TestUserClassLoaderJob}.
Expand Down
Loading

0 comments on commit 93d4c35

Please sign in to comment.