Skip to content

Commit

Permalink
[hotfix][runtime] Migrate some related tests to Juint5
Browse files Browse the repository at this point in the history
  • Loading branch information
wanglijie95 committed Dec 20, 2022
1 parent f955706 commit 35498b2
Show file tree
Hide file tree
Showing 7 changed files with 334 additions and 383 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.blob.PermanentBlobCache;
import org.apache.flink.runtime.blob.VoidBlobStore;
import org.apache.flink.testutils.junit.utils.TempDirUtils;

import org.junit.After;
import org.junit.Before;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;

import java.io.IOException;
import java.net.InetSocketAddress;
Expand All @@ -34,26 +35,31 @@
* Tests {@link ExecutionGraph} deployment when offloading job and task information into the BLOB
* server.
*/
public class DefaultExecutionGraphDeploymentWithBlobCacheTest
class DefaultExecutionGraphDeploymentWithBlobCacheTest
extends DefaultExecutionGraphDeploymentWithBlobServerTest {

@Before
@BeforeEach
@Override
public void setupBlobServer() throws IOException {
Configuration config = new Configuration();
// always offload the serialized job and task information
config.setInteger(BlobServerOptions.OFFLOAD_MINSIZE, 0);
blobServer = new BlobServer(config, TEMPORARY_FOLDER.newFolder(), new VoidBlobStore());
blobServer =
new BlobServer(
config, TempDirUtils.newFolder(temporaryFolder), new VoidBlobStore());
blobServer.start();
blobWriter = blobServer;

InetSocketAddress serverAddress = new InetSocketAddress("localhost", blobServer.getPort());
blobCache =
new PermanentBlobCache(
config, TEMPORARY_FOLDER.newFolder(), new VoidBlobStore(), serverAddress);
config,
TempDirUtils.newFolder(temporaryFolder),
new VoidBlobStore(),
serverAddress);
}

@After
@AfterEach
@Override
public void shutdownBlobServer() throws IOException {
if (blobServer != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,71 +22,57 @@
import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.blob.BlobStore;
import org.apache.flink.runtime.blob.PermanentBlobKey;
import org.apache.flink.runtime.blob.VoidBlobStore;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.testutils.junit.utils.TempDirUtils;
import org.apache.flink.types.Either;
import org.apache.flink.util.SerializedValue;

import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.rules.TemporaryFolder;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.io.TempDir;

import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.assertj.core.api.Assertions.assertThat;

/**
* Tests {@link ExecutionGraph} deployment when offloading job and task information into the BLOB
* server.
*/
public class DefaultExecutionGraphDeploymentWithBlobServerTest
class DefaultExecutionGraphDeploymentWithBlobServerTest
extends DefaultExecutionGraphDeploymentTest {

@ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
@TempDir Path temporaryFolder;

private Set<byte[]> seenHashes =
Collections.newSetFromMap(new ConcurrentHashMap<byte[], Boolean>());

protected BlobServer blobServer = null;

@Before
@BeforeEach
public void setupBlobServer() throws IOException {
Configuration config = new Configuration();
// always offload the serialized job and task information
config.setInteger(BlobServerOptions.OFFLOAD_MINSIZE, 0);
blobServer =
Mockito.spy(
new BlobServer(config, TEMPORARY_FOLDER.newFolder(), new VoidBlobStore()));
new AssertBlobServer(
config, TempDirUtils.newFolder(temporaryFolder), new VoidBlobStore());
blobWriter = blobServer;
blobCache = blobServer;

seenHashes.clear();

// verify that we do not upload the same content more than once
doAnswer(
invocation -> {
PermanentBlobKey key = (PermanentBlobKey) invocation.callRealMethod();

assertTrue(seenHashes.add(key.getHash()));

return key;
})
.when(blobServer)
.putPermanent(any(JobID.class), Matchers.<byte[]>any());

blobServer.start();
}

@After
@AfterEach
public void shutdownBlobServer() throws IOException {
if (blobServer != null) {
blobServer.close();
Expand All @@ -98,7 +84,7 @@ protected void checkJobOffloaded(DefaultExecutionGraph eg) throws Exception {
Either<SerializedValue<JobInformation>, PermanentBlobKey> jobInformationOrBlobKey =
eg.getJobInformationOrBlobKey();

assertTrue(jobInformationOrBlobKey.isRight());
assertThat(jobInformationOrBlobKey.isRight()).isTrue();

// must not throw:
blobServer.getFile(eg.getJobID(), jobInformationOrBlobKey.right());
Expand All @@ -109,9 +95,24 @@ protected void checkTaskOffloaded(ExecutionGraph eg, JobVertexID jobVertexId) th
Either<SerializedValue<TaskInformation>, PermanentBlobKey> taskInformationOrBlobKey =
eg.getJobVertex(jobVertexId).getTaskInformationOrBlobKey();

assertTrue(taskInformationOrBlobKey.isRight());
assertThat(taskInformationOrBlobKey.isRight()).isTrue();

// must not throw:
blobServer.getFile(eg.getJobID(), taskInformationOrBlobKey.right());
}

private class AssertBlobServer extends BlobServer {
public AssertBlobServer(Configuration config, File storageDir, BlobStore blobStore)
throws IOException {
super(config, storageDir, blobStore);
}

@Override
public PermanentBlobKey putPermanent(JobID jobId, byte[] value) throws IOException {
PermanentBlobKey key = super.putPermanent(jobId, value);
// verify that we do not upload the same content more than once
assertThat(seenHashes.add(key.getHash())).isTrue();
return key;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,11 @@
import org.apache.flink.runtime.operators.BatchTask;
import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup;
import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
import org.apache.flink.testutils.junit.utils.TempDirUtils;
import org.apache.flink.util.function.FunctionUtils;

import org.junit.Before;
import org.junit.Test;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.net.InetSocketAddress;
Expand All @@ -55,8 +56,7 @@
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.assertj.core.api.Assertions.assertThat;

/**
* Tests {@link ExecutionGraph} deployment when job and task information are offloaded into the BLOB
Expand All @@ -65,17 +65,19 @@
* even the size limit of {@link BlobCacheSizeTracker} in {@link PermanentBlobCache} is set to the
* minimum value.
*/
public class DefaultExecutionGraphDeploymentWithSmallBlobCacheSizeLimitTest
class DefaultExecutionGraphDeploymentWithSmallBlobCacheSizeLimitTest
extends DefaultExecutionGraphDeploymentWithBlobCacheTest {

@Before
@BeforeEach
@Override
public void setupBlobServer() throws IOException {
Configuration config = new Configuration();
// Always offload the serialized JobInformation, TaskInformation and cached
// ShuffleDescriptors
config.setInteger(BlobServerOptions.OFFLOAD_MINSIZE, 0);
blobServer = new BlobServer(config, TEMPORARY_FOLDER.newFolder(), new VoidBlobStore());
blobServer =
new BlobServer(
config, TempDirUtils.newFolder(temporaryFolder), new VoidBlobStore());
blobServer.start();
blobWriter = blobServer;

Expand All @@ -85,7 +87,7 @@ public void setupBlobServer() throws IOException {
blobCache =
new PermanentBlobCache(
config,
TEMPORARY_FOLDER.newFolder(),
TempDirUtils.newFolder(temporaryFolder),
new VoidBlobStore(),
serverAddress,
blobCacheSizeTracker);
Expand All @@ -103,7 +105,7 @@ public void setupBlobServer() throws IOException {
* larger than 1 and the deletion won't happen so frequently.
*/
@Test
public void testDeployMultipleTasksWithSmallBlobCacheSizeLimit() throws Exception {
void testDeployMultipleTasksWithSmallBlobCacheSizeLimit() throws Exception {

final int numberOfVertices = 4;
final int parallelism = 10;
Expand All @@ -124,7 +126,7 @@ public void testDeployMultipleTasksWithSmallBlobCacheSizeLimit() throws Exceptio
for (ExecutionJobVertex ejv : eg.getVerticesTopologically()) {
for (ExecutionVertex ev : ejv.getTaskVertices()) {

assertEquals(ExecutionState.CREATED, ev.getExecutionState());
assertThat(ev.getExecutionState()).isEqualTo(ExecutionState.CREATED);

LogicalSlot slot =
new TestingLogicalSlotBuilder()
Expand All @@ -134,13 +136,13 @@ public void testDeployMultipleTasksWithSmallBlobCacheSizeLimit() throws Exceptio
execution.transitionState(ExecutionState.SCHEDULED);
execution.registerProducedPartitions(slot.getTaskManagerLocation()).get();
ev.deployToSlot(slot);
assertEquals(ExecutionState.DEPLOYING, ev.getExecutionState());
assertThat(ev.getExecutionState()).isEqualTo(ExecutionState.DEPLOYING);

TaskDeploymentDescriptor tdd = tdds.take();
assertNotNull(tdd);
assertThat(tdd).isNotNull();

List<InputGateDeploymentDescriptor> igdds = tdd.getInputGates();
assertEquals(ev.getAllConsumedPartitionGroups().size(), igdds.size());
assertThat(igdds).hasSize(ev.getAllConsumedPartitionGroups().size());

if (igdds.size() > 0) {
checkShuffleDescriptors(igdds.get(0), ev.getConsumedPartitionGroup(0));
Expand Down Expand Up @@ -188,9 +190,8 @@ private static void checkShuffleDescriptors(
InputGateDeploymentDescriptor igdd, ConsumedPartitionGroup consumedPartitionGroup) {
int idx = 0;
for (IntermediateResultPartitionID consumedPartitionId : consumedPartitionGroup) {
assertEquals(
consumedPartitionId,
igdd.getShuffleDescriptors()[idx++].getResultPartitionID().getPartitionId());
assertThat(igdd.getShuffleDescriptors()[idx++].getResultPartitionID().getPartitionId())
.isEqualTo(consumedPartitionId);
}
}
}
Loading

0 comments on commit 35498b2

Please sign in to comment.