Skip to content

Commit

Permalink
[FLINK-14050][yarn] Narrow method visibility in YarnClusterDescriptor
Browse files Browse the repository at this point in the history
  • Loading branch information
tisonkun authored and kl0u committed Sep 30, 2019
1 parent a6188ff commit e3839aa
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.yarn;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.client.deployment.ClusterDeploymentException;
import org.apache.flink.client.deployment.ClusterDescriptor;
Expand Down Expand Up @@ -123,6 +124,9 @@ public class YarnClusterDescriptor implements ClusterDescriptor<ApplicationId> {
/** True if the descriptor must not shut down the YarnClient. */
private final boolean sharedYarnClient;

/** Lazily initialized list of files to ship. */
private final List<File> shipFiles = new LinkedList<>();

private String yarnQueue;

private String configurationDirectory;
Expand All @@ -131,9 +135,6 @@ public class YarnClusterDescriptor implements ClusterDescriptor<ApplicationId> {

private String dynamicPropertiesEncoded;

/** Lazily initialized list of files to ship. */
protected List<File> shipFiles = new LinkedList<>();

private final Configuration flinkConfiguration;

private boolean detached;
Expand All @@ -160,11 +161,16 @@ public YarnClusterDescriptor(
this.sharedYarnClient = sharedYarnClient;

this.flinkConfiguration = Preconditions.checkNotNull(flinkConfiguration);
userJarInclusion = getUserJarInclusionMode(flinkConfiguration);
this.userJarInclusion = getUserJarInclusionMode(flinkConfiguration);

this.configurationDirectory = Preconditions.checkNotNull(configurationDirectory);
}

@VisibleForTesting
List<File> getShipFiles() {
return shipFiles;
}

public YarnClient getYarnClient() {
return yarnClient;
}
Expand Down Expand Up @@ -456,7 +462,7 @@ private void validateClusterSpecification(ClusterSpecification clusterSpecificat
* @param jobGraph A job graph which is deployed with the Flink cluster, {@code null} if none
* @param detached True if the cluster should be started in detached mode
*/
protected ClusterClient<ApplicationId> deployInternal(
private ClusterClient<ApplicationId> deployInternal(
ClusterSpecification clusterSpecification,
String applicationName,
String yarnClusterEntrypoint,
Expand Down Expand Up @@ -559,7 +565,7 @@ protected ClusterClient<ApplicationId> deployInternal(
true);
}

protected ClusterSpecification validateClusterResources(
private ClusterSpecification validateClusterResources(
ClusterSpecification clusterSpecification,
int yarnMinAllocationMB,
Resource maximumResourceCapability,
Expand Down Expand Up @@ -674,7 +680,7 @@ private void checkYarnQueues(YarnClient yarnClient) {
}
}

public ApplicationReport startAppMaster(
private ApplicationReport startAppMaster(
Configuration configuration,
String applicationName,
String yarnClusterEntrypoint,
Expand Down Expand Up @@ -1551,7 +1557,7 @@ public void run() {
}
}

protected void addEnvironmentFoldersToShipFiles(Collection<File> effectiveShipFiles) {
void addEnvironmentFoldersToShipFiles(Collection<File> effectiveShipFiles) {
addLibFoldersToShipFiles(effectiveShipFiles);
addPluginsFoldersToShipFiles(effectiveShipFiles);
}
Expand All @@ -1569,7 +1575,7 @@ private void addLibFoldersToShipFiles(Collection<File> effectiveShipFiles) {
throw new YarnDeploymentException("The environment variable '" + ENV_FLINK_LIB_DIR +
"' is set to '" + libDir + "' but the directory doesn't exist.");
}
} else if (this.shipFiles.isEmpty()) {
} else if (shipFiles.isEmpty()) {
LOG.warn("Environment variable '{}' not set and ship files have not been provided manually. " +
"Not shipping any library files.", ENV_FLINK_LIB_DIR);
}
Expand All @@ -1588,7 +1594,7 @@ private void addPluginsFoldersToShipFiles(Collection<File> effectiveShipFiles) {
}
}

protected ContainerLaunchContext setupApplicationMasterContainer(
ContainerLaunchContext setupApplicationMasterContainer(
String yarnClusterEntrypoint,
boolean hasLogback,
boolean hasLog4j,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,7 @@ public void testMultipleYarnShipOptions() throws Exception {

YarnClusterDescriptor flinkYarnDescriptor = flinkYarnSessionCli.createClusterDescriptor(commandLine);

assertEquals(2, flinkYarnDescriptor.shipFiles.size());
assertEquals(2, flinkYarnDescriptor.getShipFiles().size());

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -426,26 +426,26 @@ public void testExplicitFileShipping() throws Exception {
File libFile = temporaryFolder.newFile("libFile.jar");
File libFolder = temporaryFolder.newFolder().getAbsoluteFile();

Assert.assertFalse(descriptor.shipFiles.contains(libFile));
Assert.assertFalse(descriptor.shipFiles.contains(libFolder));
Assert.assertFalse(descriptor.getShipFiles().contains(libFile));
Assert.assertFalse(descriptor.getShipFiles().contains(libFolder));

List<File> shipFiles = new ArrayList<>();
shipFiles.add(libFile);
shipFiles.add(libFolder);

descriptor.addShipFiles(shipFiles);

Assert.assertTrue(descriptor.shipFiles.contains(libFile));
Assert.assertTrue(descriptor.shipFiles.contains(libFolder));
Assert.assertTrue(descriptor.getShipFiles().contains(libFile));
Assert.assertTrue(descriptor.getShipFiles().contains(libFolder));

// only execute part of the deployment to test for shipped files
Set<File> effectiveShipFiles = new HashSet<>();
descriptor.addEnvironmentFoldersToShipFiles(effectiveShipFiles);

Assert.assertEquals(0, effectiveShipFiles.size());
Assert.assertEquals(2, descriptor.shipFiles.size());
Assert.assertTrue(descriptor.shipFiles.contains(libFile));
Assert.assertTrue(descriptor.shipFiles.contains(libFolder));
Assert.assertEquals(2, descriptor.getShipFiles().size());
Assert.assertTrue(descriptor.getShipFiles().contains(libFile));
Assert.assertTrue(descriptor.getShipFiles().contains(libFolder));
}
}

Expand Down Expand Up @@ -481,8 +481,8 @@ private void testEnvironmentDirectoryShipping(String environmentVariable) throws
// only add the ship the folder, not the contents
Assert.assertFalse(effectiveShipFiles.contains(libFile));
Assert.assertTrue(effectiveShipFiles.contains(libFolder));
Assert.assertFalse(descriptor.shipFiles.contains(libFile));
Assert.assertFalse(descriptor.shipFiles.contains(libFolder));
Assert.assertFalse(descriptor.getShipFiles().contains(libFile));
Assert.assertFalse(descriptor.getShipFiles().contains(libFolder));
}
}

Expand Down

0 comments on commit e3839aa

Please sign in to comment.