Skip to content

Commit

Permalink
[FLINK-18097][history] Delete all job-related files on expiration
Browse files Browse the repository at this point in the history
  • Loading branch information
Draczech authored and zentol committed Jul 8, 2020
1 parent a076927 commit 78d6ee1
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -285,20 +285,7 @@ public void run() {
LOG.info("Processing archive {} finished.", jobArchivePath);
} catch (IOException e) {
LOG.error("Failure while fetching/processing job archive for job {}.", jobID, e);
// Make sure we do not include this job in the overview
try {
Files.delete(new File(webOverviewDir, jobID + JSON_FILE_ENDING).toPath());
} catch (IOException ioe) {
LOG.debug("Could not delete file from overview directory.", ioe);
}

// Clean up job files we may have created
File jobDirectory = new File(webJobDir, jobID);
try {
FileUtils.deleteDirectory(jobDirectory);
} catch (IOException ioe) {
LOG.debug("Could not clean up job directory.", ioe);
}
deleteJobFiles(jobID);
}
}
}
Expand Down Expand Up @@ -340,18 +327,36 @@ private List<ArchiveEvent> cleanupExpiredJobs(Set<String> jobsToRemove) {

cachedArchives.removeAll(jobsToRemove);
jobsToRemove.forEach(removedJobID -> {
try {
Files.deleteIfExists(new File(webOverviewDir, removedJobID + JSON_FILE_ENDING).toPath());
FileUtils.deleteDirectory(new File(webJobDir, removedJobID));
} catch (IOException e) {
LOG.error("Failure while removing job overview for job {}.", removedJobID, e);
}
deleteJobFiles(removedJobID);
deleteLog.add(new ArchiveEvent(removedJobID, ArchiveEventType.DELETED));
});

return deleteLog;
}

private void deleteJobFiles(String jobID) {
// Make sure we do not include this job in the overview
try {
Files.deleteIfExists(new File(webOverviewDir, jobID + JSON_FILE_ENDING).toPath());
} catch (IOException ioe) {
LOG.warn("Could not delete file from overview directory.", ioe);
}

// Clean up job files we may have created
File jobDirectory = new File(webJobDir, jobID);
try {
FileUtils.deleteDirectory(jobDirectory);
} catch (IOException ioe) {
LOG.warn("Could not clean up job directory.", ioe);
}

try {
Files.deleteIfExists(new File(webJobDir, jobID + JSON_FILE_ENDING).toPath());
} catch (IOException ioe) {
LOG.warn("Could not delete file from job directory.", ioe);
}
}

}

private static String convertLegacyJobOverview(String legacyOverview) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
Expand All @@ -71,7 +72,11 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.not;
import static org.junit.Assert.assertTrue;

/**
Expand Down Expand Up @@ -260,7 +265,8 @@ private void runArchiveExpirationTest(boolean cleanupExpiredJobs) throws Excepti
waitForArchivesCreation(numJobs);

CountDownLatch numExpectedArchivedJobs = new CountDownLatch(numJobs);
CountDownLatch numExpectedExpiredJobs = new CountDownLatch(numExpiredJobs);
CountDownLatch firstArchiveExpiredLatch = new CountDownLatch(numExpiredJobs);
CountDownLatch allArchivesExpiredLatch = new CountDownLatch(cleanupExpiredJobs ? numJobs : 0);

Configuration historyServerConfig = createTestConfiguration(cleanupExpiredJobs);

Expand All @@ -273,7 +279,8 @@ private void runArchiveExpirationTest(boolean cleanupExpiredJobs) throws Excepti
numExpectedArchivedJobs.countDown();
break;
case DELETED:
numExpectedExpiredJobs.countDown();
firstArchiveExpiredLatch.countDown();
allArchivesExpiredLatch.countDown();
break;
}
});
Expand All @@ -295,21 +302,50 @@ private void runArchiveExpirationTest(boolean cleanupExpiredJobs) throws Excepti
// delete one archive from jm
Files.deleteIfExists(jmDirectory.toPath().resolve(jobIdToDelete));

assertTrue(numExpectedExpiredJobs.await(10L, TimeUnit.SECONDS));
assertTrue(firstArchiveExpiredLatch.await(10L, TimeUnit.SECONDS));

// check that archive is present in hs
// check that archive is still/no longer present in hs
Collection<JobDetails> jobsAfterDeletion = getJobsOverview(baseUrl).getJobs();
Assert.assertEquals(numJobs - numExpiredJobs, jobsAfterDeletion.size());
Assert.assertEquals(1 - numExpiredJobs, jobsAfterDeletion.stream()
.map(JobDetails::getJobId)
.map(JobID::toString)
.filter(jobId -> jobId.equals(jobIdToDelete))
.count());

// delete remaining archives from jm and ensure files are cleaned up
List<String> remainingJobIds = jobsAfterDeletion.stream()
.map(JobDetails::getJobId)
.map(JobID::toString)
.collect(Collectors.toList());

for (String remainingJobId : remainingJobIds) {
Files.deleteIfExists(jmDirectory.toPath().resolve(remainingJobId));
}

assertTrue(allArchivesExpiredLatch.await(10L, TimeUnit.SECONDS));

assertJobFilesCleanedUp(cleanupExpiredJobs);
} finally {
hs.stop();
}
}

private void assertJobFilesCleanedUp(boolean jobFilesShouldBeDeleted) throws IOException {
try (Stream<Path> paths = Files.walk(hsDirectory.toPath())) {
final List<Path> jobFiles = paths
.filter(path -> !path.equals(hsDirectory.toPath()))
.map(path -> hsDirectory.toPath().relativize(path))
.filter(path -> !path.equals(Paths.get("config.json")))
.filter(path -> !path.equals(Paths.get("jobs")))
.filter(path -> !path.equals(Paths.get("jobs", "overview.json")))
.filter(path -> !path.equals(Paths.get("overviews")))
.collect(Collectors.toList());

assertThat(jobFiles, jobFilesShouldBeDeleted ? empty() : not(empty()));
}
}

private void waitForArchivesCreation(int numJobs) throws InterruptedException {
// the job is archived asynchronously after env.execute() returns
File[] archives = jmDirectory.listFiles();
Expand Down

0 comments on commit 78d6ee1

Please sign in to comment.