Skip to content

Commit

Permalink
[FLINK-18241] Use correct user class loader in OptimizerPlanEnvironme…
Browse files Browse the repository at this point in the history
…nt & StreamPlanEnvironment

This closes apache#12607
  • Loading branch information
dawidwys committed Jun 12, 2020
1 parent ba7854b commit 039a74a
Show file tree
Hide file tree
Showing 8 changed files with 245 additions and 100 deletions.
8 changes: 8 additions & 0 deletions flink-clients/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,14 @@ under the License.
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${project.version}</version>
<scope>test</scope>
<type>test-jar</type>
</dependency>
</dependencies>

<!-- More information on this:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ public Pipeline getPipeline() {
return pipeline;
}

public OptimizerPlanEnvironment(Configuration configuration, int parallelism) {
super(configuration);
public OptimizerPlanEnvironment(Configuration configuration, ClassLoader userClassloader, int parallelism) {
super(configuration, userClassloader);
if (parallelism > 0) {
setParallelism(parallelism);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,9 +135,15 @@ public static Pipeline getPipelineFromProgram(
}

// temporary hack to support the optimizer plan preview
OptimizerPlanEnvironment benv = new OptimizerPlanEnvironment(configuration, parallelism);
OptimizerPlanEnvironment benv = new OptimizerPlanEnvironment(
configuration,
program.getUserCodeClassLoader(),
parallelism);
benv.setAsContext();
StreamPlanEnvironment senv = new StreamPlanEnvironment(configuration, parallelism);
StreamPlanEnvironment senv = new StreamPlanEnvironment(
configuration,
program.getUserCodeClassLoader(),
parallelism);
senv.setAsContext();

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ public Pipeline getPipeline() {
return pipeline;
}

public StreamPlanEnvironment(Configuration configuration, int parallelism) {
super(configuration);
public StreamPlanEnvironment(Configuration configuration, ClassLoader userClassLoader, int parallelism) {
super(configuration, userClassLoader);
if (parallelism > 0) {
setParallelism(parallelism);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.client.program;

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.dag.Pipeline;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.testutils.ClassLoaderUtils;

import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import java.io.IOException;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.function.Function;

import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;

/**
* Tests for {@link PackagedProgramUtils} methods that should be executed for
* {@link StreamExecutionEnvironment} and {@link Environment}.
*/
@RunWith(Parameterized.class)
public class PackagedProgramUtilsPipelineTest {

@Parameterized.Parameter
public TestParameter testParameter;

@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();

@Parameterized.Parameters
public static Collection<TestParameter> parameters() {
return Arrays.asList(
TestParameter.of(DataSetTestProgram.class, pipeline -> ((Plan) pipeline).getExecutionConfig()),
TestParameter.of(DataStreamTestProgram.class, pipeline -> ((StreamGraph) pipeline).getExecutionConfig())
);
}

/**
* This tests whether configuration forwarding from a {@link Configuration} to the environment
* works.
*/
@Test
public void testConfigurationForwarding() throws Exception {
// we want to test forwarding with this config, ensure that the default is what we expect.
assertThat(
ExecutionEnvironment.getExecutionEnvironment().getConfig().isAutoTypeRegistrationDisabled(),
is(false));

PackagedProgram packagedProgram = PackagedProgram.newBuilder()
.setEntryPointClassName(testParameter.entryClass().getName())
.build();

Configuration config = new Configuration();
config.set(PipelineOptions.AUTO_TYPE_REGISTRATION, false);

Pipeline pipeline = PackagedProgramUtils.getPipelineFromProgram(
packagedProgram,
config,
1 /* parallelism */,
false /* suppress output */);

ExecutionConfig executionConfig = testParameter.extractExecutionConfig(pipeline);

// we want to test forwarding with this config, ensure that the default is what we expect.
assertThat(executionConfig.isAutoTypeRegistrationDisabled(), is(true));
}

@Test
public void testUserClassloaderForConfiguration() throws Exception {
String userSerializerClassName = "UserSerializer";
List<URL> userUrls = getClassUrls(userSerializerClassName);

PackagedProgram packagedProgram = PackagedProgram.newBuilder()
.setUserClassPaths(userUrls)
.setEntryPointClassName(testParameter.entryClass().getName())
.build();

Configuration config = new Configuration();
config.set(PipelineOptions.KRYO_DEFAULT_SERIALIZERS, Collections.singletonList(
String.format(
"class:%s,serializer:%s",
PackagedProgramUtilsPipelineTest.class.getName(),
userSerializerClassName)
));

Pipeline pipeline = PackagedProgramUtils.getPipelineFromProgram(
packagedProgram,
config,
1 /* parallelism */,
false /* suppress output */);

ExecutionConfig executionConfig = testParameter.extractExecutionConfig(pipeline);

assertThat(
executionConfig.getDefaultKryoSerializerClasses().get(PackagedProgramUtilsPipelineTest.class).getName(),
is(userSerializerClassName));
}

private List<URL> getClassUrls(String className) throws IOException {
URLClassLoader urlClassLoader = ClassLoaderUtils.compileAndLoadJava(
temporaryFolder.newFolder(),
className + ".java",
"import com.esotericsoftware.kryo.Kryo;\n" +
"import com.esotericsoftware.kryo.Serializer;\n" +
"import com.esotericsoftware.kryo.io.Input;\n" +
"import com.esotericsoftware.kryo.io.Output;\n"
+ "public class " + className + " extends Serializer {\n" +
"\t@Override\n" +
"\tpublic void write(\n" +
"\t\tKryo kryo,\n" +
"\t\tOutput output,\n" +
"\t\tObject object) {\n" +
"\t}\n" +
"\n" +
"\t@Override\n" +
"\tpublic Object read(Kryo kryo, Input input, Class type) {\n" +
"\t\treturn null;\n" +
"\t}\n" +
"}");
return Arrays.asList(urlClassLoader.getURLs());
}

private interface TestParameter {
Class<?> entryClass();

ExecutionConfig extractExecutionConfig(Pipeline pipeline);

static TestParameter of(Class<?> entryClass, Function<Pipeline, ExecutionConfig> executionConfigExtractor) {
return new TestParameter() {
@Override
public Class<?> entryClass() {
return entryClass;
}

@Override
public ExecutionConfig extractExecutionConfig(Pipeline pipeline) {
return executionConfigExtractor.apply(pipeline);
}
};
}
}

/** Test Program for the DataSet API. */
public static class DataSetTestProgram {
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.fromElements("hello").print();
env.execute();
}
}

/** Test Program for the DataStream API. */
public static class DataStreamTestProgram {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.fromElements("hello").print();
env.execute();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,8 @@

package org.apache.flink.client.program;

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.dag.Pipeline;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.streaming.api.datastream.DataStream;

import org.junit.Test;

Expand All @@ -38,59 +32,12 @@

/**
* Tests {@link PackagedProgramUtils}.
*
* <p>See also {@link PackagedProgramUtilsPipelineTest} for tests that need to test behaviour of
* {@link DataStream} and {@link DataSet} programs.
*/
public class PackagedProgramUtilsTest {

/**
* This tests whether configuration forwarding from a {@link Configuration} to the environment
* works.
*/
@Test
public void testDataSetConfigurationForwarding() throws Exception {
assertPrecondition(ExecutionEnvironment.getExecutionEnvironment().getConfig());

PackagedProgram packagedProgram = PackagedProgram.newBuilder()
.setEntryPointClassName(DataSetTestProgram.class.getName())
.build();

Configuration config = createConfigurationWithOption();

Pipeline pipeline = PackagedProgramUtils.getPipelineFromProgram(
packagedProgram,
config,
1 /* parallelism */,
false /* suppress output */);

ExecutionConfig executionConfig = ((Plan) pipeline).getExecutionConfig();

assertExpectedOption(executionConfig);
}

/**
* This tests whether configuration forwarding from a {@link Configuration} to the environment
* works.
*/
@Test
public void testDataStreamConfigurationForwarding() throws Exception {
assertPrecondition(ExecutionEnvironment.getExecutionEnvironment().getConfig());

PackagedProgram packagedProgram = PackagedProgram.newBuilder()
.setEntryPointClassName(DataStreamTestProgram.class.getName())
.build();

Configuration config = createConfigurationWithOption();

Pipeline pipeline = PackagedProgramUtils.getPipelineFromProgram(
packagedProgram,
config,
1 /* parallelism */,
false /* suppress output */);

ExecutionConfig executionConfig = ((StreamGraph) pipeline).getExecutionConfig();

assertExpectedOption(executionConfig);
}

@Test
public void testResolveURI() throws URISyntaxException {
final String relativeFile = "path/of/user.jar";
Expand All @@ -111,38 +58,4 @@ public void testResolveURI() throws URISyntaxException {
assertThat(resolveURI(localSchemaFile).getScheme(), is("local"));
assertThat(resolveURI(localSchemaFile).toString(), is(localSchemaFile));
}

private static void assertPrecondition(ExecutionConfig executionConfig) {
// we want to test forwarding with this config, ensure that the default is what we expect.
assertThat(executionConfig.isAutoTypeRegistrationDisabled(), is(false));
}

private static void assertExpectedOption(ExecutionConfig executionConfig) {
// we want to test forwarding with this config, ensure that the default is what we expect.
assertThat(executionConfig.isAutoTypeRegistrationDisabled(), is(true));
}

private static Configuration createConfigurationWithOption() {
Configuration config = new Configuration();
config.set(PipelineOptions.AUTO_TYPE_REGISTRATION, false);
return config;
}

/** Test Program for the DataSet API. */
public static class DataSetTestProgram {
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.fromElements("hello").print();
env.execute();
}
}

/** Test Program for the DataStream API. */
public static class DataStreamTestProgram {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.fromElements("hello").print();
env.execute();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,18 @@ public class ExecutionEnvironment {
*/
@PublicEvolving
public ExecutionEnvironment(final Configuration configuration) {
this(DefaultExecutorServiceLoader.INSTANCE, configuration, null);
this(configuration, null);
}

/**
* Creates a new {@link ExecutionEnvironment} that will use the given {@link Configuration} to
* configure the {@link PipelineExecutor}.
*
* <p>In addition, this constructor allows specifying the user code {@link ClassLoader}.
*/
@PublicEvolving
public ExecutionEnvironment(final Configuration configuration, final ClassLoader userClassloader) {
this(DefaultExecutorServiceLoader.INSTANCE, configuration, userClassloader);
}

/**
Expand Down
Loading

0 comments on commit 039a74a

Please sign in to comment.