Skip to content

Commit

Permalink
[hotfix][tests] Refactor SchedulerTestBase for removing dead code
Browse files Browse the repository at this point in the history
This closes apache#10077 .
  • Loading branch information
tisonkun committed Nov 5, 2019
1 parent a041920 commit 047ba24
Showing 1 changed file with 6 additions and 98 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,35 +27,29 @@
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
import org.apache.flink.runtime.instance.SlotSharingGroupId;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.jobmaster.JobMasterId;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.jobmaster.SlotRequestId;
import org.apache.flink.runtime.jobmaster.slotpool.LocationPreferenceSlotSelectionStrategy;
import org.apache.flink.runtime.jobmaster.slotpool.Scheduler;
import org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl;
import org.apache.flink.runtime.jobmaster.slotpool.SlotPool;
import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
import org.apache.flink.runtime.jobmaster.slotpool.SlotSelectionStrategy;
import org.apache.flink.runtime.jobmaster.slotpool.SlotSharingManager;
import org.apache.flink.runtime.jobmaster.slotpool.TestingSlotPoolImpl;
import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLogger;

import org.junit.After;
import org.junit.Before;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
Expand All @@ -66,22 +60,19 @@
*/
public abstract class SchedulerTestBase extends TestLogger {

protected TestingSlotProvider testingSlotProvider;
protected TestingSlotPoolSlotProvider testingSlotProvider;

private SlotPool slotPool;

private TestingScheduler scheduler;
private Scheduler scheduler;

private ComponentMainThreadExecutor componentMainThreadExecutor;

@Before
public void setup() throws Exception {
final JobID jobId = new JobID();
slotPool = new TestingSlotPoolImpl(jobId);
scheduler = new TestingScheduler(
new HashMap<>(16),
LocationPreferenceSlotSelectionStrategy.INSTANCE,
slotPool);
scheduler = new SchedulerImpl(LocationPreferenceSlotSelectionStrategy.INSTANCE, slotPool);

testingSlotProvider = new TestingSlotPoolSlotProvider();

Expand Down Expand Up @@ -110,29 +101,7 @@ protected final void runInMainThreadExecutor(Runnable runnable) {
CompletableFuture.runAsync(runnable, componentMainThreadExecutor).join();
}

protected interface TestingSlotProvider extends SlotProvider {
TaskManagerLocation addTaskManager(int numberSlots);

void releaseTaskManager(ResourceID resourceId);

int getNumberOfAvailableSlots();

int getNumberOfLocalizedAssignments();

int getNumberOfNonLocalizedAssignments();

int getNumberOfUnconstrainedAssignments();

int getNumberOfHostLocalizedAssignments();

int getNumberOfSlots(SlotSharingGroup slotSharingGroup);

int getNumberOfAvailableSlotsForGroup(SlotSharingGroup slotSharingGroup, JobVertexID jobVertexId);

void shutdown() throws Exception;
}

private final class TestingSlotPoolSlotProvider implements TestingSlotProvider {
protected final class TestingSlotPoolSlotProvider implements SlotProvider {

private final AtomicInteger numberOfLocalizedAssignments;

Expand All @@ -149,7 +118,6 @@ private TestingSlotPoolSlotProvider() {
this.numberOfHostLocalizedAssignments = new AtomicInteger();
}

@Override
public TaskManagerLocation addTaskManager(int numberSlots) {
final TaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation();
final ResourceID resourceId = taskManagerLocation.getResourceID();
Expand Down Expand Up @@ -188,7 +156,6 @@ public TaskManagerLocation addTaskManager(int numberSlots) {
return taskManagerLocation;
}

@Override
public void releaseTaskManager(ResourceID resourceId) {
try {
supplyInMainThreadExecutor(() -> slotPool.releaseTaskManager(resourceId, null));
Expand All @@ -197,42 +164,26 @@ public void releaseTaskManager(ResourceID resourceId) {
}
}

@Override
public int getNumberOfAvailableSlots() {
return supplyInMainThreadExecutor(() -> slotPool.getAvailableSlotsInformation().size());
}

@Override
public int getNumberOfLocalizedAssignments() {
return numberOfLocalizedAssignments.get();
}

@Override
public int getNumberOfNonLocalizedAssignments() {
return numberOfNonLocalizedAssignments.get();
}

@Override
public int getNumberOfUnconstrainedAssignments() {
return numberOfUnconstrainedAssignments.get();
}

@Override
public int getNumberOfHostLocalizedAssignments() {
return numberOfHostLocalizedAssignments.get();
}

@Override
public int getNumberOfSlots(SlotSharingGroup slotSharingGroup) {
return supplyInMainThreadExecutor(() -> scheduler.getNumberOfSharedSlots(slotSharingGroup.getSlotSharingGroupId()));
}

@Override
public int getNumberOfAvailableSlotsForGroup(SlotSharingGroup slotSharingGroup, JobVertexID jobVertexId) {
return supplyInMainThreadExecutor(() -> scheduler.getNumberOfAvailableSlotsForGroup(slotSharingGroup.getSlotSharingGroupId(), jobVertexId));
}

@Override
public void shutdown() {
runInMainThreadExecutor(() -> slotPool.close());
}
Expand Down Expand Up @@ -268,51 +219,8 @@ public CompletableFuture<LogicalSlot> allocateSlot(

@Override
public void cancelSlotRequest(SlotRequestId slotRequestId, @Nullable SlotSharingGroupId slotSharingGroupId, Throwable cause) {
}
}

/**
* Test implementation of scheduler that offers a bit more introspection.
*/
private static final class TestingScheduler extends SchedulerImpl {

private final Map<SlotSharingGroupId, SlotSharingManager> slotSharingManagersMap;

public TestingScheduler(
@Nonnull Map<SlotSharingGroupId, SlotSharingManager> slotSharingManagersMap,
@Nonnull SlotSelectionStrategy slotSelectionStrategy,
@Nonnull SlotPool slotPoolGateway) {

super(slotSelectionStrategy, slotPoolGateway, slotSharingManagersMap);
this.slotSharingManagersMap = slotSharingManagersMap;
}

public int getNumberOfSharedSlots(SlotSharingGroupId slotSharingGroupId) {
final SlotSharingManager multiTaskSlotManager = slotSharingManagersMap.get(slotSharingGroupId);

if (multiTaskSlotManager != null) {
return multiTaskSlotManager.getResolvedRootSlots().size();
} else {
throw new FlinkRuntimeException("No MultiTaskSlotManager registered under " + slotSharingGroupId + '.');
}
}

public int getNumberOfAvailableSlotsForGroup(SlotSharingGroupId slotSharingGroupId, JobVertexID jobVertexId) {
final SlotSharingManager multiTaskSlotManager = slotSharingManagersMap.get(slotSharingGroupId);

if (multiTaskSlotManager != null) {
int availableSlots = 0;

for (SlotSharingManager.MultiTaskSlot multiTaskSlot : multiTaskSlotManager.getResolvedRootSlots()) {
if (!multiTaskSlot.contains(jobVertexId)) {
availableSlots++;
}
}

return availableSlots;
} else {
throw new FlinkRuntimeException("No MultiTaskSlotmanager registered under " + slotSharingGroupId + '.');
}
}
}

}

0 comments on commit 047ba24

Please sign in to comment.