Skip to content

Commit

Permalink
[hotfix][k8s] Rename the implicit variable pod to podWithoutMainConta…
Browse files Browse the repository at this point in the history
…iner
  • Loading branch information
wangyang0918 authored and tillrohrmann committed Mar 1, 2021
1 parent b4bf3e3 commit 4fcc408
Show file tree
Hide file tree
Showing 20 changed files with 68 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,17 @@
*/
public class FlinkPod {

private final Pod pod;
private final Pod podWithoutMainContainer;

private final Container mainContainer;

public FlinkPod(Pod pod, Container mainContainer) {
this.pod = pod;
public FlinkPod(Pod podWithoutMainContainer, Container mainContainer) {
this.podWithoutMainContainer = podWithoutMainContainer;
this.mainContainer = mainContainer;
}

public Pod getPod() {
return pod;
public Pod getPodWithoutMainContainer() {
return podWithoutMainContainer;
}

public Container getMainContainer() {
Expand All @@ -51,11 +51,11 @@ public Container getMainContainer() {
/** Builder for creating a {@link FlinkPod}. */
public static class Builder {

private Pod pod;
private Pod podWithoutMainContainer;
private Container mainContainer;

public Builder() {
this.pod =
this.podWithoutMainContainer =
new PodBuilder()
.withNewMetadata()
.endMetadata()
Expand All @@ -68,12 +68,12 @@ public Builder() {

public Builder(FlinkPod flinkPod) {
checkNotNull(flinkPod);
this.pod = checkNotNull(flinkPod.getPod());
this.podWithoutMainContainer = checkNotNull(flinkPod.getPodWithoutMainContainer());
this.mainContainer = checkNotNull(flinkPod.getMainContainer());
}

public Builder withPod(Pod pod) {
this.pod = checkNotNull(pod);
this.podWithoutMainContainer = checkNotNull(pod);
return this;
}

Expand All @@ -83,7 +83,7 @@ public Builder withMainContainer(Container mainContainer) {
}

public FlinkPod build() {
return new FlinkPod(this.pod, this.mainContainer);
return new FlinkPod(this.podWithoutMainContainer, this.mainContainer);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public FlinkConfMountDecorator(AbstractKubernetesParameters kubernetesComponentC

@Override
public FlinkPod decorateFlinkPod(FlinkPod flinkPod) {
final Pod mountedPod = decoratePod(flinkPod.getPod());
final Pod mountedPod = decoratePod(flinkPod.getPodWithoutMainContainer());

final Container mountedMainContainer =
new ContainerBuilder(flinkPod.getMainContainer())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ public FlinkPod decorateFlinkPod(FlinkPod flinkPod) {
}

final Pod podWithHadoopConf =
new PodBuilder(flinkPod.getPod())
new PodBuilder(flinkPod.getPodWithoutMainContainer())
.editOrNewSpec()
.addNewVolumeLike(hadoopConfVolume)
.endVolume()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public InitJobManagerDecorator(KubernetesJobManagerParameters kubernetesJobManag
@Override
public FlinkPod decorateFlinkPod(FlinkPod flinkPod) {
final Pod basicPod =
new PodBuilder(flinkPod.getPod())
new PodBuilder(flinkPod.getPodWithoutMainContainer())
.withApiVersion(API_VERSION)
.editOrNewMetadata()
.withLabels(kubernetesJobManagerParameters.getLabels())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public InitTaskManagerDecorator(
@Override
public FlinkPod decorateFlinkPod(FlinkPod flinkPod) {
final Pod basicPod =
new PodBuilder(flinkPod.getPod())
new PodBuilder(flinkPod.getPodWithoutMainContainer())
.withApiVersion(Constants.API_VERSION)
.editOrNewMetadata()
.withName(kubernetesTaskManagerParameters.getPodName())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public KerberosMountDecorator(AbstractKubernetesParameters kubernetesParameters)

@Override
public FlinkPod decorateFlinkPod(FlinkPod flinkPod) {
PodBuilder podBuilder = new PodBuilder(flinkPod.getPod());
PodBuilder podBuilder = new PodBuilder(flinkPod.getPodWithoutMainContainer());
ContainerBuilder containerBuilder = new ContainerBuilder(flinkPod.getMainContainer());

if (!StringUtils.isNullOrWhitespaceOnly(securityConfig.getKeytab())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public MountSecretsDecorator(AbstractKubernetesParameters kubernetesComponentCon

@Override
public FlinkPod decorateFlinkPod(FlinkPod flinkPod) {
final Pod podWithMount = decoratePod(flinkPod.getPod());
final Pod podWithMount = decoratePod(flinkPod.getPodWithoutMainContainer());
final Container containerWithMount = decorateMainContainer(flinkPod.getMainContainer());

return new FlinkPod.Builder(flinkPod)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ private static Deployment createJobManagerDeployment(
final Container resolvedMainContainer = flinkPod.getMainContainer();

final Pod resolvedPod =
new PodBuilder(flinkPod.getPod())
new PodBuilder(flinkPod.getPodWithoutMainContainer())
.editOrNewSpec()
.addToContainers(resolvedMainContainer)
.endSpec()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public static KubernetesPod buildTaskManagerKubernetesPod(
}

final Pod resolvedPod =
new PodBuilder(flinkPod.getPod())
new PodBuilder(flinkPod.getPodWithoutMainContainer())
.editOrNewSpec()
.addToContainers(flinkPod.getMainContainer())
.endSpec()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,9 @@ protected void onSetup() throws Exception {
@Test
public void testWhetherPodOrContainerIsDecorated() {
final FlinkPod resultFlinkPod = flinkConfMountDecorator.decorateFlinkPod(baseFlinkPod);
assertNotEquals(baseFlinkPod.getPod(), resultFlinkPod.getPod());
assertNotEquals(
baseFlinkPod.getPodWithoutMainContainer(),
resultFlinkPod.getPodWithoutMainContainer());
assertNotEquals(baseFlinkPod.getMainContainer(), resultFlinkPod.getMainContainer());
}

Expand Down Expand Up @@ -133,7 +135,9 @@ public void testDecoratedFlinkPodWithoutLog4jAndLogback() {
.withItems(expectedKeyToPaths)
.endConfigMap()
.build());
assertEquals(expectedVolumes, resultFlinkPod.getPod().getSpec().getVolumes());
assertEquals(
expectedVolumes,
resultFlinkPod.getPodWithoutMainContainer().getSpec().getVolumes());

final List<VolumeMount> expectedVolumeMounts =
Collections.singletonList(
Expand Down Expand Up @@ -169,7 +173,9 @@ public void testDecoratedFlinkPodWithLog4j() throws IOException {
.withItems(expectedKeyToPaths)
.endConfigMap()
.build());
assertEquals(expectedVolumes, resultFlinkPod.getPod().getSpec().getVolumes());
assertEquals(
expectedVolumes,
resultFlinkPod.getPodWithoutMainContainer().getSpec().getVolumes());
}

@Test
Expand Down Expand Up @@ -197,7 +203,9 @@ public void testDecoratedFlinkPodWithLogback() throws IOException {
.withItems(expectedKeyToPaths)
.endConfigMap()
.build());
assertEquals(expectedVolumes, resultFlinkPod.getPod().getSpec().getVolumes());
assertEquals(
expectedVolumes,
resultFlinkPod.getPodWithoutMainContainer().getSpec().getVolumes());
}

@Test
Expand Down Expand Up @@ -230,7 +238,9 @@ public void testDecoratedFlinkPodWithLog4jAndLogback() throws IOException {
.withItems(expectedKeyToPaths)
.endConfigMap()
.build());
assertEquals(expectedVolumes, resultFlinkPod.getPod().getSpec().getVolumes());
assertEquals(
expectedVolumes,
resultFlinkPod.getPodWithoutMainContainer().getSpec().getVolumes());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ public void testExistingHadoopConfigMap() throws IOException {
assertEquals(0, hadoopConfMountDecorator.buildAccompanyingKubernetesResources().size());

final FlinkPod resultFlinkPod = hadoopConfMountDecorator.decorateFlinkPod(baseFlinkPod);
final List<Volume> volumes = resultFlinkPod.getPod().getSpec().getVolumes();
final List<Volume> volumes =
resultFlinkPod.getPodWithoutMainContainer().getSpec().getVolumes();
assertTrue(
volumes.stream()
.anyMatch(
Expand All @@ -87,7 +88,8 @@ public void testExistingConfigMapPrecedeOverHadoopConfEnv() throws IOException {
assertEquals(0, hadoopConfMountDecorator.buildAccompanyingKubernetesResources().size());

final FlinkPod resultFlinkPod = hadoopConfMountDecorator.decorateFlinkPod(baseFlinkPod);
final List<Volume> volumes = resultFlinkPod.getPod().getSpec().getVolumes();
final List<Volume> volumes =
resultFlinkPod.getPodWithoutMainContainer().getSpec().getVolumes();
assertTrue(
volumes.stream()
.anyMatch(
Expand All @@ -112,7 +114,9 @@ public void testHadoopConfDirectoryUnset() throws IOException {
assertEquals(0, hadoopConfMountDecorator.buildAccompanyingKubernetesResources().size());

final FlinkPod resultFlinkPod = hadoopConfMountDecorator.decorateFlinkPod(baseFlinkPod);
assertEquals(baseFlinkPod.getPod(), resultFlinkPod.getPod());
assertEquals(
baseFlinkPod.getPodWithoutMainContainer(),
resultFlinkPod.getPodWithoutMainContainer());
assertEquals(baseFlinkPod.getMainContainer(), resultFlinkPod.getMainContainer());
}

Expand All @@ -123,7 +127,9 @@ public void testEmptyHadoopConfDirectory() throws IOException {
assertEquals(0, hadoopConfMountDecorator.buildAccompanyingKubernetesResources().size());

final FlinkPod resultFlinkPod = hadoopConfMountDecorator.decorateFlinkPod(baseFlinkPod);
assertEquals(baseFlinkPod.getPod(), resultFlinkPod.getPod());
assertEquals(
baseFlinkPod.getPodWithoutMainContainer(),
resultFlinkPod.getPodWithoutMainContainer());
assertEquals(baseFlinkPod.getMainContainer(), resultFlinkPod.getMainContainer());
}

Expand Down Expand Up @@ -155,7 +161,8 @@ public void testPodWithHadoopConfVolume() throws IOException {
generateHadoopConfFileItems();
final FlinkPod resultFlinkPod = hadoopConfMountDecorator.decorateFlinkPod(baseFlinkPod);

final List<Volume> resultVolumes = resultFlinkPod.getPod().getSpec().getVolumes();
final List<Volume> resultVolumes =
resultFlinkPod.getPodWithoutMainContainer().getSpec().getVolumes();
assertEquals(1, resultVolumes.size());

final Volume resultVolume = resultVolumes.get(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ protected void onSetup() throws Exception {
new InitJobManagerDecorator(this.kubernetesJobManagerParameters);
final FlinkPod resultFlinkPod = initJobManagerDecorator.decorateFlinkPod(this.baseFlinkPod);

this.resultPod = resultFlinkPod.getPod();
this.resultPod = resultFlinkPod.getPodWithoutMainContainer();
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ protected void onSetup() throws Exception {
new InitJobManagerDecorator(this.kubernetesJobManagerParameters);
final FlinkPod resultFlinkPod = initJobManagerDecorator.decorateFlinkPod(this.baseFlinkPod);

this.resultPod = resultFlinkPod.getPod();
this.resultPod = resultFlinkPod.getPodWithoutMainContainer();
this.resultMainContainer = resultFlinkPod.getMainContainer();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ protected void onSetup() throws Exception {

final FlinkPod resultFlinkPod =
initTaskManagerDecorator.decorateFlinkPod(this.baseFlinkPod);
this.resultPod = resultFlinkPod.getPod();
this.resultPod = resultFlinkPod.getPodWithoutMainContainer();
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ protected void onSetup() throws Exception {

final FlinkPod resultFlinkPod =
initTaskManagerDecorator.decorateFlinkPod(this.baseFlinkPod);
this.resultPod = resultFlinkPod.getPod();
this.resultPod = resultFlinkPod.getPodWithoutMainContainer();
this.resultMainContainer = resultFlinkPod.getMainContainer();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,9 @@ protected void onSetup() throws Exception {
@Test
public void testWhetherContainerOrPodIsReplaced() {
final FlinkPod resultFlinkPod = javaCmdJobManagerDecorator.decorateFlinkPod(baseFlinkPod);
assertEquals(baseFlinkPod.getPod(), resultFlinkPod.getPod());
assertEquals(
baseFlinkPod.getPodWithoutMainContainer(),
resultFlinkPod.getPodWithoutMainContainer());
assertNotEquals(baseFlinkPod.getMainContainer(), resultFlinkPod.getMainContainer());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,9 @@ public void onSetup() throws Exception {
public void testWhetherContainerOrPodIsUpdated() {
final FlinkPod resultFlinkPod =
javaCmdTaskManagerDecorator.decorateFlinkPod(this.baseFlinkPod);
assertEquals(this.baseFlinkPod.getPod(), resultFlinkPod.getPod());
assertEquals(
this.baseFlinkPod.getPodWithoutMainContainer(),
resultFlinkPod.getPodWithoutMainContainer());
assertNotEquals(this.baseFlinkPod.getMainContainer(), resultFlinkPod.getMainContainer());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,9 @@ protected void onSetup() throws Exception {
@Test
public void testWhetherPodOrContainerIsDecorated() {
final FlinkPod resultFlinkPod = kerberosMountDecorator.decorateFlinkPod(baseFlinkPod);
assertNotEquals(baseFlinkPod.getPod(), resultFlinkPod.getPod());
assertNotEquals(
baseFlinkPod.getPodWithoutMainContainer(),
resultFlinkPod.getPodWithoutMainContainer());
assertNotEquals(baseFlinkPod.getMainContainer(), resultFlinkPod.getMainContainer());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,12 @@ protected void onSetup() throws Exception {
public void testWhetherPodOrContainerIsDecorated() {
final FlinkPod resultFlinkPod = mountSecretsDecorator.decorateFlinkPod(baseFlinkPod);

assertFalse(VolumeTestUtils.podHasVolume(baseFlinkPod.getPod(), SECRET_NAME + "-volume"));
assertTrue(VolumeTestUtils.podHasVolume(resultFlinkPod.getPod(), SECRET_NAME + "-volume"));
assertFalse(
VolumeTestUtils.podHasVolume(
baseFlinkPod.getPodWithoutMainContainer(), SECRET_NAME + "-volume"));
assertTrue(
VolumeTestUtils.podHasVolume(
resultFlinkPod.getPodWithoutMainContainer(), SECRET_NAME + "-volume"));

assertFalse(
VolumeTestUtils.containerHasVolume(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,10 @@ public void testCallbackHandler() {
FlinkPod pod = new FlinkPod.Builder().build();
final KubernetesPodsWatcher podsWatcher =
new KubernetesPodsWatcher(new TestingCallbackHandler(e -> {}));
podsWatcher.eventReceived(Watcher.Action.ADDED, pod.getPod());
podsWatcher.eventReceived(Watcher.Action.MODIFIED, pod.getPod());
podsWatcher.eventReceived(Watcher.Action.DELETED, pod.getPod());
podsWatcher.eventReceived(Watcher.Action.ERROR, pod.getPod());
podsWatcher.eventReceived(Watcher.Action.ADDED, pod.getPodWithoutMainContainer());
podsWatcher.eventReceived(Watcher.Action.MODIFIED, pod.getPodWithoutMainContainer());
podsWatcher.eventReceived(Watcher.Action.DELETED, pod.getPodWithoutMainContainer());
podsWatcher.eventReceived(Watcher.Action.ERROR, pod.getPodWithoutMainContainer());

assertThat(podAddedList.size(), is(1));
assertThat(podModifiedList.size(), is(1));
Expand Down

0 comments on commit 4fcc408

Please sign in to comment.