Skip to content

Commit

Permalink
[FLINK-15134][client] Delete temporary files created in YarnClusterDe…
Browse files Browse the repository at this point in the history
…scriptor
  • Loading branch information
tisonkun authored and kl0u committed Dec 9, 2019
1 parent 2994f0e commit 4ee5477
Showing 1 changed file with 26 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -858,6 +858,9 @@ private ApplicationReport startAppMaster(
homeDir,
"");

paths.add(remotePathJar);
classPathBuilder.append(flinkJarPath.getName()).append(File.pathSeparator);

// set the right configuration values for the TaskManager
configuration.setInteger(
TaskManagerOptions.NUM_TASK_SLOTS,
Expand All @@ -869,25 +872,28 @@ private ApplicationReport startAppMaster(

// Upload the flink configuration
// write out configuration file
File tmpConfigurationFile = File.createTempFile(appId + "-flink-conf.yaml", null);
tmpConfigurationFile.deleteOnExit();
BootstrapTools.writeConfiguration(configuration, tmpConfigurationFile);
File tmpConfigurationFile = null;
try {
tmpConfigurationFile = File.createTempFile(appId + "-flink-conf.yaml", null);
BootstrapTools.writeConfiguration(configuration, tmpConfigurationFile);

String flinkConfigKey = "flink-conf.yaml";
Path remotePathConf = setupSingleLocalResource(
flinkConfigKey,
String flinkConfigKey = "flink-conf.yaml";
Path remotePathConf = setupSingleLocalResource(
flinkConfigKey,
fs,
appId,
new Path(tmpConfigurationFile.getAbsolutePath()),
localResources,
homeDir,
"");
envShipFileList.append(flinkConfigKey).append("=").append(remotePathConf).append(",");

paths.add(remotePathJar);
classPathBuilder.append(flinkJarPath.getName()).append(File.pathSeparator);
paths.add(remotePathConf);
classPathBuilder.append("flink-conf.yaml").append(File.pathSeparator);
envShipFileList.append(flinkConfigKey).append("=").append(remotePathConf).append(",");
paths.add(remotePathConf);
classPathBuilder.append("flink-conf.yaml").append(File.pathSeparator);
} finally {
if (tmpConfigurationFile != null && !tmpConfigurationFile.delete()) {
LOG.warn("Fail to delete temporary file {}.", tmpConfigurationFile.toPath());
}
}

if (userJarInclusion == YarnConfigOptions.UserJarInclusion.LAST) {
for (String userClassPath : userClassPaths) {
Expand All @@ -898,10 +904,10 @@ private ApplicationReport startAppMaster(
// write job graph to tmp file and add it to local resource
// TODO: server use user main method to generate job graph
if (jobGraph != null) {
File tmpJobGraphFile = null;
try {
File fp = File.createTempFile(appId.toString(), null);
fp.deleteOnExit();
try (FileOutputStream output = new FileOutputStream(fp);
tmpJobGraphFile = File.createTempFile(appId.toString(), null);
try (FileOutputStream output = new FileOutputStream(tmpJobGraphFile);
ObjectOutputStream obOutput = new ObjectOutputStream(output);){
obOutput.writeObject(jobGraph);
}
Expand All @@ -913,7 +919,7 @@ private ApplicationReport startAppMaster(
jobGraphFilename,
fs,
appId,
new Path(fp.toURI()),
new Path(tmpJobGraphFile.toURI()),
localResources,
homeDir,
"");
Expand All @@ -922,6 +928,10 @@ private ApplicationReport startAppMaster(
} catch (Exception e) {
LOG.warn("Add job graph to local resource fail");
throw e;
} finally {
if (tmpJobGraphFile != null && !tmpJobGraphFile.delete()) {
LOG.warn("Fail to delete temporary file {}.", tmpConfigurationFile.toPath());
}
}
}

Expand Down

0 comments on commit 4ee5477

Please sign in to comment.