Skip to content

Commit

Permalink
[FLINK-15153][kubernetes] Service selector needs to contain jobmanage…
Browse files Browse the repository at this point in the history
…r component label

The jobmanager label needs to be added to service selector. Otherwise, it may select the wrong backend pods(taskmanager).
  • Loading branch information
wangyang0918 authored and zhijiangW committed Dec 11, 2019
1 parent 09208f9 commit 4286fca
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

/**
* Setup services port.
Expand Down Expand Up @@ -89,7 +90,13 @@ protected Service decorateInternalResource(Service resource, Configuration flink
}

spec.setPorts(servicePorts);
spec.setSelector(resource.getMetadata().getLabels());

final Map<String, String> labels = new LabelBuilder()
.withExist(resource.getMetadata().getLabels())
.withJobManagerComponent()
.toLabels();

spec.setSelector(labels);

resource.setSpec(spec);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@ public void testCreateInternalService() throws Exception {

assertEquals(KubernetesConfigOptions.ServiceExposedType.ClusterIP.toString(), service.getSpec().getType());

// The selector labels should contain jobmanager component
labels.put(Constants.LABEL_COMPONENT_KEY, Constants.LABEL_COMPONENT_JOB_MANAGER);
assertEquals(labels, service.getSpec().getSelector());

assertThat(service.getSpec().getPorts().stream().map(ServicePort::getPort).collect(Collectors.toList()),
Expand Down Expand Up @@ -133,6 +135,7 @@ public void testCreateRestService() throws Exception {

assertEquals(KubernetesConfigOptions.ServiceExposedType.LoadBalancer.toString(), service.getSpec().getType());

labels.put(Constants.LABEL_COMPONENT_KEY, Constants.LABEL_COMPONENT_JOB_MANAGER);
assertEquals(labels, service.getSpec().getSelector());

assertThat(service.getSpec().getPorts().stream().map(ServicePort::getPort).collect(Collectors.toList()),
Expand Down

0 comments on commit 4286fca

Please sign in to comment.