Skip to content

Commit

Permalink
[hotfix][client] Make ConfigUtils.decodeListFromConfig return new arr…
Browse files Browse the repository at this point in the history
…ay list and throw exception
  • Loading branch information
wangyang0918 authored and kl0u committed May 18, 2020
1 parent 81ffe8a commit 2f13ba1
Show file tree
Hide file tree
Showing 6 changed files with 26 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.flink.client.cli;

import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
Expand Down Expand Up @@ -72,22 +71,12 @@ public Configuration applyToConfiguration(final Configuration baseConfiguration)
return baseConfiguration;
}

public List<URL> getJars() {
return decodeUrlList(configuration, PipelineOptions.JARS);
public List<URL> getJars() throws MalformedURLException {
return ConfigUtils.decodeListFromConfig(configuration, PipelineOptions.JARS, URL::new);
}

public List<URL> getClasspaths() {
return decodeUrlList(configuration, PipelineOptions.CLASSPATHS);
}

private List<URL> decodeUrlList(final Configuration configuration, final ConfigOption<List<String>> configOption) {
return ConfigUtils.decodeListFromConfig(configuration, configOption, url -> {
try {
return new URL(url);
} catch (MalformedURLException e) {
throw new IllegalArgumentException("Invalid URL", e);
}
});
public List<URL> getClasspaths() throws MalformedURLException {
return ConfigUtils.decodeListFromConfig(configuration, PipelineOptions.CLASSPATHS, URL::new);
}

public int getParallelism() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.slf4j.LoggerFactory;

import java.net.InetSocketAddress;
import java.net.MalformedURLException;
import java.util.Collection;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -81,7 +82,7 @@ public EmbeddedExecutor(
}

@Override
public CompletableFuture<JobClient> execute(final Pipeline pipeline, final Configuration configuration) {
public CompletableFuture<JobClient> execute(final Pipeline pipeline, final Configuration configuration) throws MalformedURLException {
checkNotNull(pipeline);
checkNotNull(configuration);

Expand All @@ -101,7 +102,7 @@ private CompletableFuture<JobClient> getJobClientFuture(final JobID jobId) {
return CompletableFuture.completedFuture(jobClientCreator.getJobClient(jobId));
}

private CompletableFuture<JobClient> submitAndGetJobClientFuture(final Pipeline pipeline, final Configuration configuration) {
private CompletableFuture<JobClient> submitAndGetJobClientFuture(final Pipeline pipeline, final Configuration configuration) throws MalformedURLException {
final Time timeout = Time.milliseconds(configuration.get(ExecutionOptions.EMBEDDED_RPC_TIMEOUT).toMillis());

final JobGraph jobGraph = PipelineExecutorUtils.getJobGraph(pipeline, configuration);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;

import java.net.MalformedURLException;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;

Expand Down Expand Up @@ -80,7 +81,7 @@ public CompletableFuture<JobClient> execute(Pipeline pipeline, Configuration con
return PerJobMiniClusterFactory.createWithFactory(effectiveConfig, miniClusterFactory).submitJob(jobGraph);
}

private JobGraph getJobGraph(Pipeline pipeline, Configuration configuration) {
private JobGraph getJobGraph(Pipeline pipeline, Configuration configuration) throws MalformedURLException {
// This is a quirk in how LocalEnvironment used to work. It sets the default parallelism
// to <num taskmanagers> * <num task slots>. Might be questionable but we keep the behaviour
// for now.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@

import javax.annotation.Nonnull;

import java.net.MalformedURLException;

import static org.apache.flink.util.Preconditions.checkNotNull;

/**
Expand All @@ -44,7 +46,7 @@ public class PipelineExecutorUtils {
* savepoint settings used to bootstrap its state.
* @return the corresponding {@link JobGraph}.
*/
public static JobGraph getJobGraph(@Nonnull final Pipeline pipeline, @Nonnull final Configuration configuration) {
public static JobGraph getJobGraph(@Nonnull final Pipeline pipeline, @Nonnull final Configuration configuration) throws MalformedURLException {
checkNotNull(pipeline);
checkNotNull(configuration);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@

import java.io.File;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Path;
Expand Down Expand Up @@ -328,7 +329,7 @@ public void testRetrieveFromJarFileWithUserLib() throws IOException, FlinkExcept
containsInAnyOrder(expectedURLs.stream().map(URL::toString).toArray()));
}

private JobGraph retrieveJobGraph(ClassPathPackagedProgramRetriever retrieverUnderTest, Configuration configuration) throws FlinkException, ProgramInvocationException {
private JobGraph retrieveJobGraph(ClassPathPackagedProgramRetriever retrieverUnderTest, Configuration configuration) throws FlinkException, ProgramInvocationException, MalformedURLException {
final PackagedProgram packagedProgram = retrieverUnderTest.getPackagedProgram();

final int defaultParallelism = configuration.getInteger(CoreOptions.DEFAULT_PARALLELISM);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@
package org.apache.flink.configuration;

import org.apache.flink.annotation.Internal;
import org.apache.flink.util.function.FunctionWithException;

import javax.annotation.Nullable;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.function.Function;
Expand Down Expand Up @@ -108,19 +108,25 @@ public static <IN, OUT> void encodeCollectionToConfig(
* @param mapper the transformation function from {@code IN} to {@code OUT}.
* @return the transformed values in a list of type {@code OUT}.
*/
public static <IN, OUT> List<OUT> decodeListFromConfig(
public static <IN, OUT, E extends Throwable> List<OUT> decodeListFromConfig(
final ReadableConfig configuration,
final ConfigOption<List<IN>> key,
final Function<IN, OUT> mapper) {
final FunctionWithException<IN, OUT, E> mapper) throws E {

checkNotNull(configuration);
checkNotNull(key);
checkNotNull(mapper);

final List<IN> encodedString = configuration.get(key);
return encodedString != null
? encodedString.stream().map(mapper).collect(Collectors.toList())
: Collections.emptyList();
if (encodedString == null || encodedString.isEmpty()) {
return new ArrayList<>();
}

final List<OUT> result = new ArrayList<>(encodedString.size());
for (IN input : encodedString) {
result.add(mapper.apply(input));
}
return result;
}

private ConfigUtils() {
Expand Down

0 comments on commit 2f13ba1

Please sign in to comment.