Skip to content

Commit

Permalink
[hotfix] Make SecurityUtils.SecurityConfiguration a toplevel class
Browse files Browse the repository at this point in the history
  • Loading branch information
aljoscha committed Sep 27, 2017
1 parent ec9e6e4 commit b4120c1
Show file tree
Hide file tree
Showing 28 changed files with 225 additions and 188 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import org.apache.flink.runtime.messages.JobManagerMessages.StoppingFailure;
import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepoint;
import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepointSuccess;
import org.apache.flink.runtime.security.SecurityConfiguration;
import org.apache.flink.runtime.security.SecurityUtils;
import org.apache.flink.runtime.util.EnvironmentInformation;
import org.apache.flink.util.Preconditions;
Expand Down Expand Up @@ -1127,7 +1128,7 @@ public static void main(final String[] args) {

try {
final CliFrontend cli = new CliFrontend();
SecurityUtils.install(new SecurityUtils.SecurityConfiguration(cli.config));
SecurityUtils.install(new SecurityConfiguration(cli.config));
int retCode = SecurityUtils.getInstalledContext()
.runSecured(new Callable<Integer>() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.runtime.security.SecurityUtils;
import org.apache.flink.runtime.security.SecurityConfiguration;
import org.apache.flink.runtime.security.modules.HadoopModule;
import org.apache.flink.streaming.util.TestStreamEnvironment;
import org.apache.flink.test.util.SecureTestEnvironment;
Expand Down Expand Up @@ -126,8 +126,8 @@ public static void startSecureCluster() throws Exception {
flinkConfig.setString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL,
SecureTestEnvironment.getHadoopServicePrincipal());

SecurityUtils.SecurityConfiguration ctx =
new SecurityUtils.SecurityConfiguration(
SecurityConfiguration ctx =
new SecurityConfiguration(
flinkConfig,
Collections.singletonList(securityConfig -> new HadoopModule(securityConfig, conf)));
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.flink.mesos.runtime.clusterframework.MesosConfigKeys;
import org.apache.flink.runtime.clusterframework.BootstrapTools;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.security.SecurityConfiguration;
import org.apache.flink.runtime.security.SecurityUtils;
import org.apache.flink.runtime.taskexecutor.TaskManagerRunner;
import org.apache.flink.runtime.util.EnvironmentInformation;
Expand Down Expand Up @@ -115,7 +116,7 @@ else if (tmpDirs != null) {
LOG.info("ResourceID assigned for this container: {}", resourceId);

// Run the TM in the security context
SecurityUtils.SecurityConfiguration sc = new SecurityUtils.SecurityConfiguration(configuration);
SecurityConfiguration sc = new SecurityConfiguration(configuration);
SecurityUtils.install(sc);

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.flink.runtime.jobmanager.MemoryArchivist;
import org.apache.flink.runtime.jobmaster.JobMaster;
import org.apache.flink.runtime.process.ProcessReaper;
import org.apache.flink.runtime.security.SecurityConfiguration;
import org.apache.flink.runtime.security.SecurityUtils;
import org.apache.flink.runtime.util.EnvironmentInformation;
import org.apache.flink.runtime.util.ExecutorThreadFactory;
Expand Down Expand Up @@ -163,7 +164,7 @@ protected int run(final String[] args) {
}

// configure security
SecurityUtils.SecurityConfiguration sc = new SecurityUtils.SecurityConfiguration(config);
SecurityConfiguration sc = new SecurityConfiguration(config);
SecurityUtils.install(sc);

// run the actual work in the installed security context
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.runtime.clusterframework.BootstrapTools;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.security.SecurityConfiguration;
import org.apache.flink.runtime.security.SecurityUtils;
import org.apache.flink.runtime.taskmanager.TaskManager;
import org.apache.flink.runtime.util.EnvironmentInformation;
Expand Down Expand Up @@ -115,7 +116,7 @@ else if (tmpDirs != null) {
LOG.info("ResourceID assigned for this container: {}", resourceId);

// Run the TM in the security context
SecurityUtils.SecurityConfiguration sc = new SecurityUtils.SecurityConfiguration(configuration);
SecurityConfiguration sc = new SecurityConfiguration(configuration);
SecurityUtils.install(sc);

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.flink.runtime.net.SSLUtils;
import org.apache.flink.runtime.rest.handler.legacy.DashboardConfigHandler;
import org.apache.flink.runtime.rest.handler.legacy.messages.DashboardConfiguration;
import org.apache.flink.runtime.security.SecurityConfiguration;
import org.apache.flink.runtime.security.SecurityUtils;
import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
import org.apache.flink.runtime.webmonitor.utils.WebFrontendBootstrap;
Expand Down Expand Up @@ -103,7 +104,7 @@ public static void main(String[] args) throws Exception {
final Configuration flinkConfig = GlobalConfiguration.loadConfiguration(configDir);

// run the history server
SecurityUtils.install(new SecurityUtils.SecurityConfiguration(flinkConfig));
SecurityUtils.install(new SecurityConfiguration(flinkConfig));

try {
SecurityUtils.getInstalledContext().runSecured(new Callable<Integer>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
import org.apache.flink.runtime.security.SecurityConfiguration;
import org.apache.flink.runtime.security.SecurityContext;
import org.apache.flink.runtime.security.SecurityUtils;
import org.apache.flink.util.ExceptionUtils;
Expand Down Expand Up @@ -141,7 +142,7 @@ protected void installDefaultFileSystem(Configuration configuration) throws Exce
protected SecurityContext installSecurityContext(Configuration configuration) throws Exception {
LOG.info("Install security context.");

SecurityUtils.install(new SecurityUtils.SecurityConfiguration(configuration));
SecurityUtils.install(new SecurityConfiguration(configuration));

return SecurityUtils.getInstalledContext();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.security;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.runtime.security.modules.HadoopModuleFactory;
import org.apache.flink.runtime.security.modules.JaasModuleFactory;
import org.apache.flink.runtime.security.modules.SecurityModuleFactory;
import org.apache.flink.runtime.security.modules.ZookeeperModuleFactory;

import org.apache.commons.lang3.StringUtils;

import java.io.File;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;

import static org.apache.flink.util.Preconditions.checkNotNull;

/**
* The global security configuration.
*
* <p>See {@link SecurityOptions} for corresponding configuration options.
*/
public class SecurityConfiguration {

private static final List<SecurityModuleFactory> DEFAULT_MODULES = Collections.unmodifiableList(
Arrays.asList(new HadoopModuleFactory(), new JaasModuleFactory(), new ZookeeperModuleFactory()));

private final List<SecurityModuleFactory> securityModuleFactories;

private final Configuration flinkConfig;

private final boolean isZkSaslDisable;

private final boolean useTicketCache;

private final String keytab;

private final String principal;

private final List<String> loginContextNames;

private final String zkServiceName;

private final String zkLoginContextName;

/**
* Create a security configuration from the global configuration.
* @param flinkConf the Flink global configuration.
*/
public SecurityConfiguration(Configuration flinkConf) {
this(flinkConf, DEFAULT_MODULES);
}

/**
* Create a security configuration from the global configuration.
* @param flinkConf the Flink global configuration.
* @param securityModuleFactories the security modules to apply.
*/
public SecurityConfiguration(Configuration flinkConf,
List<SecurityModuleFactory> securityModuleFactories) {
this.isZkSaslDisable = flinkConf.getBoolean(SecurityOptions.ZOOKEEPER_SASL_DISABLE);
this.keytab = flinkConf.getString(SecurityOptions.KERBEROS_LOGIN_KEYTAB);
this.principal = flinkConf.getString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL);
this.useTicketCache = flinkConf.getBoolean(SecurityOptions.KERBEROS_LOGIN_USETICKETCACHE);
this.loginContextNames = parseList(flinkConf.getString(SecurityOptions.KERBEROS_LOGIN_CONTEXTS));
this.zkServiceName = flinkConf.getString(SecurityOptions.ZOOKEEPER_SASL_SERVICE_NAME);
this.zkLoginContextName = flinkConf.getString(SecurityOptions.ZOOKEEPER_SASL_LOGIN_CONTEXT_NAME);
this.securityModuleFactories = Collections.unmodifiableList(securityModuleFactories);
this.flinkConfig = checkNotNull(flinkConf);
validate();
}

public boolean isZkSaslDisable() {
return isZkSaslDisable;
}

public String getKeytab() {
return keytab;
}

public String getPrincipal() {
return principal;
}

public boolean useTicketCache() {
return useTicketCache;
}

public Configuration getFlinkConfig() {
return flinkConfig;
}

public List<SecurityModuleFactory> getSecurityModuleFactories() {
return securityModuleFactories;
}

public List<String> getLoginContextNames() {
return loginContextNames;
}

public String getZooKeeperServiceName() {
return zkServiceName;
}

public String getZooKeeperLoginContextName() {
return zkLoginContextName;
}

private void validate() {
if (!StringUtils.isBlank(keytab)) {
// principal is required
if (StringUtils.isBlank(principal)) {
throw new IllegalConfigurationException("Kerberos login configuration is invalid; keytab requires a principal.");
}

// check the keytab is readable
File keytabFile = new File(keytab);
if (!keytabFile.exists() || !keytabFile.isFile() || !keytabFile.canRead()) {
throw new IllegalConfigurationException("Kerberos login configuration is invalid; keytab is unreadable");
}
}
}

private static List<String> parseList(String value) {
if (value == null || value.isEmpty()) {
return Collections.emptyList();
}

return Arrays.asList(value
.trim()
.replaceAll("(\\s*,+\\s*)+", ",")
.split(","));
}
}
Loading

0 comments on commit b4120c1

Please sign in to comment.