Skip to content

Commit

Permalink
[hotfix][runtime][k8s] Renaming methods for better code readability.
Browse files Browse the repository at this point in the history
  • Loading branch information
xintongsong authored and tillrohrmann committed Apr 25, 2020
1 parent 98e8b0e commit 8882b86
Show file tree
Hide file tree
Showing 9 changed files with 21 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ private ClusterClientProvider<String> deployClusterInternal(
new KubernetesJobManagerParameters(flinkConfig, clusterSpecification);

final KubernetesJobManagerSpecification kubernetesJobManagerSpec =
KubernetesJobManagerFactory.createJobManagerComponent(kubernetesJobManagerParameters);
KubernetesJobManagerFactory.buildKubernetesJobManagerSpecification(kubernetesJobManagerParameters);

client.createJobManagerComponent(kubernetesJobManagerSpec);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,17 +210,17 @@ public void onAdded(List<KubernetesPod> pods) {

@Override
public void onModified(List<KubernetesPod> pods) {
runAsync(() -> pods.forEach(this::removePodIfTerminated));
runAsync(() -> pods.forEach(this::removePodAndTryRestartIfRequired));
}

@Override
public void onDeleted(List<KubernetesPod> pods) {
runAsync(() -> pods.forEach(this::removePodIfTerminated));
runAsync(() -> pods.forEach(this::removePodAndTryRestartIfRequired));
}

@Override
public void onError(List<KubernetesPod> pods) {
runAsync(() -> pods.forEach(this::removePodIfTerminated));
runAsync(() -> pods.forEach(this::removePodAndTryRestartIfRequired));
}

@VisibleForTesting
Expand Down Expand Up @@ -257,7 +257,7 @@ private void requestKubernetesPod(WorkerResourceSpec workerResourceSpec) {
pendingWorkerNum);

final KubernetesPod taskManagerPod =
KubernetesTaskManagerFactory.createTaskManagerComponent(parameters);
KubernetesTaskManagerFactory.buildTaskManagerKubernetesPod(parameters);
kubeClient.createTaskManagerPod(taskManagerPod)
.whenCompleteAsync(
(ignore, throwable) -> {
Expand Down Expand Up @@ -314,7 +314,7 @@ private void requestKubernetesPodIfRequired() {
}
}

private void removePodIfTerminated(KubernetesPod pod) {
private void removePodAndTryRestartIfRequired(KubernetesPod pod) {
if (pod.isTerminated()) {
internalStopPod(pod.getName());
requestKubernetesPodIfRequired();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
*/
public class KubernetesJobManagerFactory {

public static KubernetesJobManagerSpecification createJobManagerComponent(
public static KubernetesJobManagerSpecification buildKubernetesJobManagerSpecification(
KubernetesJobManagerParameters kubernetesJobManagerParameters) throws IOException {
FlinkPod flinkPod = new FlinkPod.Builder().build();
List<HasMetadata> accompanyingResources = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
*/
public class KubernetesTaskManagerFactory {

public static KubernetesPod createTaskManagerComponent(KubernetesTaskManagerParameters kubernetesTaskManagerParameters) {
public static KubernetesPod buildTaskManagerKubernetesPod(KubernetesTaskManagerParameters kubernetesTaskManagerParameters) {
FlinkPod flinkPod = new FlinkPod.Builder().build();

final KubernetesStepDecorator[] stepDecorators = new KubernetesStepDecorator[] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public void setup() throws Exception {
final KubernetesJobManagerParameters kubernetesJobManagerParameters =
new KubernetesJobManagerParameters(flinkConfig, clusterSpecification);
this.kubernetesJobManagerSpecification =
KubernetesJobManagerFactory.createJobManagerComponent(kubernetesJobManagerParameters);
KubernetesJobManagerFactory.buildKubernetesJobManagerSpecification(kubernetesJobManagerParameters);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public void setup() throws Exception {
flinkConfig.set(KubernetesConfigOptions.JOB_MANAGER_SERVICE_ACCOUNT, SERVICE_ACCOUNT_NAME);

this.kubernetesJobManagerSpecification =
KubernetesJobManagerFactory.createJobManagerComponent(kubernetesJobManagerParameters);
KubernetesJobManagerFactory.buildKubernetesJobManagerSpecification(kubernetesJobManagerParameters);
}

@Test
Expand Down Expand Up @@ -215,7 +215,7 @@ public void testFlinkConfConfigMap() {
@Test
public void testExistingHadoopConfigMap() throws IOException {
flinkConfig.set(KubernetesConfigOptions.HADOOP_CONF_CONFIG_MAP, EXISTING_HADOOP_CONF_CONFIG_MAP);
kubernetesJobManagerSpecification = KubernetesJobManagerFactory.createJobManagerComponent(kubernetesJobManagerParameters);
kubernetesJobManagerSpecification = KubernetesJobManagerFactory.buildKubernetesJobManagerSpecification(kubernetesJobManagerParameters);

assertFalse(kubernetesJobManagerSpecification.getAccompanyingResources().stream()
.anyMatch(resource -> resource.getMetadata().getName().equals(HadoopConfMountDecorator.getHadoopConfConfigMapName(CLUSTER_ID))));
Expand All @@ -228,7 +228,7 @@ public void testExistingHadoopConfigMap() throws IOException {
public void testHadoopConfConfigMap() throws IOException {
setHadoopConfDirEnv();
generateHadoopConfFileItems();
kubernetesJobManagerSpecification = KubernetesJobManagerFactory.createJobManagerComponent(kubernetesJobManagerParameters);
kubernetesJobManagerSpecification = KubernetesJobManagerFactory.buildKubernetesJobManagerSpecification(kubernetesJobManagerParameters);

final ConfigMap resultConfigMap = (ConfigMap) kubernetesJobManagerSpecification.getAccompanyingResources()
.stream()
Expand All @@ -248,7 +248,7 @@ public void testHadoopConfConfigMap() throws IOException {
@Test
public void testEmptyHadoopConfDirectory() throws IOException {
setHadoopConfDirEnv();
kubernetesJobManagerSpecification = KubernetesJobManagerFactory.createJobManagerComponent(kubernetesJobManagerParameters);
kubernetesJobManagerSpecification = KubernetesJobManagerFactory.buildKubernetesJobManagerSpecification(kubernetesJobManagerParameters);

assertFalse(kubernetesJobManagerSpecification.getAccompanyingResources().stream()
.anyMatch(resource -> resource.getMetadata().getName().equals(HadoopConfMountDecorator.getHadoopConfConfigMapName(CLUSTER_ID))));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public void setup() throws Exception {
generateHadoopConfFileItems();

this.resultPod =
KubernetesTaskManagerFactory.createTaskManagerComponent(kubernetesTaskManagerParameters).getInternalResource();
KubernetesTaskManagerFactory.buildTaskManagerKubernetesPod(kubernetesTaskManagerParameters).getInternalResource();
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public abstract class ActiveResourceManager <WorkerType extends ResourceIDRetrie
/** The process environment variables. */
protected final Map<String, String> env;

protected final TaskExecutorProcessSpec taskExecutorProcessSpec;
protected final TaskExecutorProcessSpec defaultTaskExecutorProcessSpec;

protected final int defaultMemoryMB;

Expand Down Expand Up @@ -98,11 +98,11 @@ public ActiveResourceManager(
this.env = env;

double defaultCpus = getCpuCores(flinkConfig);
this.taskExecutorProcessSpec = TaskExecutorProcessUtils
this.defaultTaskExecutorProcessSpec = TaskExecutorProcessUtils
.newProcessSpecBuilder(flinkConfig)
.withCpuCores(defaultCpus)
.build();
this.defaultMemoryMB = taskExecutorProcessSpec.getTotalProcessMemorySize().getMebiBytes();
this.defaultMemoryMB = defaultTaskExecutorProcessSpec.getTotalProcessMemorySize().getMebiBytes();

// Load the flink config uploaded by flink client
this.flinkClientConfig = loadClientConfiguration();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ public YarnResourceManager(
numPendingContainerRequests = 0;

this.webInterfaceUrl = webInterfaceUrl;
this.resource = Resource.newInstance(defaultMemoryMB, taskExecutorProcessSpec.getCpuCores().getValue().intValue());
this.resource = Resource.newInstance(defaultMemoryMB, defaultTaskExecutorProcessSpec.getCpuCores().getValue().intValue());
}

protected AMRMClientAsync<AMRMClient.ContainerRequest> createAndStartResourceManagerClient(
Expand Down Expand Up @@ -293,7 +293,7 @@ protected void internalDeregisterApplication(
public boolean startNewWorker(WorkerResourceSpec workerResourceSpec) {
Preconditions.checkArgument(Objects.equals(
workerResourceSpec,
WorkerResourceSpec.fromTaskExecutorProcessSpec(taskExecutorProcessSpec)));
WorkerResourceSpec.fromTaskExecutorProcessSpec(defaultTaskExecutorProcessSpec)));
requestYarnContainer();
return true;
}
Expand Down Expand Up @@ -566,12 +566,12 @@ private ContainerLaunchContext createTaskExecutorLaunchContext(
final String currDir = env.get(ApplicationConstants.Environment.PWD.key());

final ContaineredTaskManagerParameters taskManagerParameters =
ContaineredTaskManagerParameters.create(flinkConfig, taskExecutorProcessSpec);
ContaineredTaskManagerParameters.create(flinkConfig, defaultTaskExecutorProcessSpec);

log.info("TaskExecutor {} will be started on {} with {}.",
containerId,
host,
taskExecutorProcessSpec);
defaultTaskExecutorProcessSpec);

final Configuration taskManagerConfig = BootstrapTools.cloneConfiguration(flinkConfig);

Expand Down

0 comments on commit 8882b86

Please sign in to comment.