Skip to content

Commit

Permalink
[FLINK-15816][k8s] Limit the value of kubernetes.cluster-id to have n…
Browse files Browse the repository at this point in the history
…o more than 45 characters

This closes apache#11708.
  • Loading branch information
zhengcanbin authored and tillrohrmann committed Apr 20, 2020
1 parent fabeb9f commit fa816c5
Show file tree
Hide file tree
Showing 6 changed files with 112 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
<td><h5>kubernetes.cluster-id</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>The cluster id used for identifying the unique flink cluster. If it's not set, the client will generate a random UUID name.</td>
<td>The cluster-id, which should be no more than 45 characters, is used for identifying a unique Flink cluster. If not set, the client will automatically generate it with a random ID.</td>
</tr>
<tr>
<td><h5>kubernetes.config.file</h5></td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.executors.KubernetesSessionClusterExecutor;
import org.apache.flink.kubernetes.kubeclient.KubeClientFactory;
import org.apache.flink.kubernetes.utils.Constants;
import org.apache.flink.util.AbstractID;

import javax.annotation.Nullable;

import java.util.UUID;

import static org.apache.flink.util.Preconditions.checkNotNull;

/**
Expand Down Expand Up @@ -66,6 +66,7 @@ public String getClusterId(Configuration configuration) {
}

private String generateClusterId() {
return CLUSTER_ID_PREFIX + UUID.randomUUID();
final String randomID = new AbstractID().toString();
return (CLUSTER_ID_PREFIX + randomID).substring(0, Constants.MAXIMUM_CHARACTERS_OF_CLUSTER_ID);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,8 @@ public class KubernetesConfigOptions {
key("kubernetes.cluster-id")
.stringType()
.noDefaultValue()
.withDescription("The cluster id used for identifying the unique flink cluster. If it's not set, " +
"the client will generate a random UUID name.");
.withDescription("The cluster-id, which should be no more than 45 characters, is used for identifying " +
"a unique Flink cluster. If not set, the client will automatically generate it with a random ID.");

public static final ConfigOption<String> CONTAINER_IMAGE =
key("kubernetes.container.image")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,13 @@ public Configuration getFlinkConfiguration() {
@Override
public String getClusterId() {
final String clusterId = flinkConfig.getString(KubernetesConfigOptions.CLUSTER_ID);
checkNotNull(clusterId, "ClusterId must be specified.");

if (StringUtils.isBlank(clusterId)) {
throw new IllegalArgumentException(KubernetesConfigOptions.CLUSTER_ID.key() + " must not be blank.");
} else if (clusterId.length() > Constants.MAXIMUM_CHARACTERS_OF_CLUSTER_ID) {
throw new IllegalArgumentException(KubernetesConfigOptions.CLUSTER_ID.key() + " must be no more than " +
Constants.MAXIMUM_CHARACTERS_OF_CLUSTER_ID + " characters.");
}

return clusterId;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,4 +76,6 @@ public class Constants {
public static final String POD_IP_FIELD_PATH = "status.podIP";

public static final String HEADLESS_SERVICE_CLUSTER_IP = "None";

public static final int MAXIMUM_CHARACTERS_OF_CLUSTER_ID = 45;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.kubernetes.kubeclient.parameters;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.utils.Constants;
import org.apache.flink.util.StringUtils;
import org.apache.flink.util.TestLogger;

import org.junit.Test;

import java.util.List;
import java.util.Map;
import java.util.Random;

import static org.apache.flink.core.testutils.CommonTestUtils.assertThrows;

/**
* General tests for the {@link AbstractKubernetesParameters}.
*/
public class AbstractKubernetesParametersTest extends TestLogger {

private final Configuration flinkConfig = new Configuration();
private final TestingKubernetesParameters testingKubernetesParameters = new TestingKubernetesParameters(flinkConfig);

@Test
public void testClusterIdMustNotBeBlank() {
flinkConfig.set(KubernetesConfigOptions.CLUSTER_ID, " ");
assertThrows(
"must not be blank",
IllegalArgumentException.class,
testingKubernetesParameters::getClusterId
);
}

@Test
public void testClusterIdLengthLimitation() {
final String stringWithIllegalLength =
StringUtils.generateRandomAlphanumericString(new Random(), Constants.MAXIMUM_CHARACTERS_OF_CLUSTER_ID + 1);
flinkConfig.set(KubernetesConfigOptions.CLUSTER_ID, stringWithIllegalLength);
assertThrows(
"must be no more than " + Constants.MAXIMUM_CHARACTERS_OF_CLUSTER_ID + " characters",
IllegalArgumentException.class,
testingKubernetesParameters::getClusterId
);
}

private class TestingKubernetesParameters extends AbstractKubernetesParameters {

public TestingKubernetesParameters(Configuration flinkConfig) {
super(flinkConfig);
}

@Override
public Map<String, String> getLabels() {
throw new UnsupportedOperationException("NOT supported");
}

@Override
public Map<String, String> getNodeSelector() {
throw new UnsupportedOperationException("NOT supported");
}

@Override
public Map<String, String> getEnvironments() {
throw new UnsupportedOperationException("NOT supported");
}

@Override
public Map<String, String> getAnnotations() {
throw new UnsupportedOperationException("NOT supported");
}

@Override
public List<Map<String, String>> getTolerations() {
throw new UnsupportedOperationException("NOT supported");
}
}
}

0 comments on commit fa816c5

Please sign in to comment.