Skip to content

Commit

Permalink
[FLINK-30402][security][runtime] Separate token framework generic and…
Browse files Browse the repository at this point in the history
… hadoop specific parts
  • Loading branch information
gaborgsomogyi committed Dec 14, 2022
1 parent b30b12d commit 4ba3366
Show file tree
Hide file tree
Showing 24 changed files with 80 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@
import org.apache.flink.runtime.security.SecurityUtils;
import org.apache.flink.runtime.security.contexts.SecurityContext;
import org.apache.flink.runtime.security.token.DelegationTokenManager;
import org.apache.flink.runtime.security.token.KerberosDelegationTokenManagerFactory;
import org.apache.flink.runtime.security.token.hadoop.KerberosDelegationTokenManagerFactory;
import org.apache.flink.runtime.util.ZooKeeperUtils;
import org.apache.flink.runtime.webmonitor.retriever.impl.RpcMetricQueryServiceRetriever;
import org.apache.flink.util.AutoCloseableAsync;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
import org.apache.flink.runtime.security.token.DelegationTokenManager;
import org.apache.flink.runtime.security.token.KerberosDelegationTokenManagerFactory;
import org.apache.flink.runtime.security.token.hadoop.KerberosDelegationTokenManagerFactory;
import org.apache.flink.runtime.taskexecutor.TaskExecutor;
import org.apache.flink.runtime.taskexecutor.TaskManagerRunner;
import org.apache.flink.runtime.webmonitor.retriever.LeaderRetriever;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.hadoop.HadoopUserUtils;
import org.apache.flink.runtime.security.SecurityConfiguration;
import org.apache.flink.runtime.security.token.KerberosLoginProvider;
import org.apache.flink.runtime.security.token.hadoop.KerberosLoginProvider;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.Credentials;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* limitations under the License.
*/

package org.apache.flink.runtime.security.token;
package org.apache.flink.runtime.security.token.hadoop;

import org.apache.flink.annotation.Experimental;
import org.apache.flink.configuration.Configuration;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* limitations under the License.
*/

package org.apache.flink.runtime.security.token;
package org.apache.flink.runtime.security.token.hadoop;

import org.apache.flink.annotation.Internal;

Expand All @@ -29,7 +29,7 @@

/** Delegation token serializer and deserializer functionality. */
@Internal
public class DelegationTokenConverter {
public class HadoopDelegationTokenConverter {
/** Serializes delegation tokens. */
public static byte[] serialize(Credentials credentials) throws IOException {
try (DataOutputBuffer dob = new DataOutputBuffer()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@
* limitations under the License.
*/

package org.apache.flink.runtime.security.token;
package org.apache.flink.runtime.security.token.hadoop;

import org.apache.flink.annotation.Experimental;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.security.token.DelegationTokenManager;

import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* limitations under the License.
*/

package org.apache.flink.runtime.security.token;
package org.apache.flink.runtime.security.token.hadoop;

import org.apache.flink.annotation.Internal;

Expand All @@ -29,18 +29,18 @@

/** Delegation token updater functionality. */
@Internal
public final class DelegationTokenUpdater {
public final class HadoopDelegationTokenUpdater {

private static final Logger LOG = LoggerFactory.getLogger(DelegationTokenUpdater.class);
private static final Logger LOG = LoggerFactory.getLogger(HadoopDelegationTokenUpdater.class);

private DelegationTokenUpdater() {}
private HadoopDelegationTokenUpdater() {}

/** Updates delegation tokens for the current user. */
public static void addCurrentUserCredentials(byte[] credentialsBytes) throws IOException {
if (credentialsBytes == null || credentialsBytes.length == 0) {
throw new IllegalArgumentException("Illegal credentials tried to be set");
}
Credentials credentials = DelegationTokenConverter.deserialize(credentialsBytes);
Credentials credentials = HadoopDelegationTokenConverter.deserialize(credentialsBytes);
LOG.info("Updating delegation tokens for current user");
dumpAllTokens(credentials);
UserGroupInformation.getCurrentUser().addCredentials(credentials);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* limitations under the License.
*/

package org.apache.flink.runtime.security.token;
package org.apache.flink.runtime.security.token.hadoop;

import org.apache.flink.annotation.Experimental;
import org.apache.flink.annotation.VisibleForTesting;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@
* limitations under the License.
*/

package org.apache.flink.runtime.security.token;
package org.apache.flink.runtime.security.token.hadoop;

import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.security.token.DelegationTokenListener;
import org.apache.flink.runtime.security.token.DelegationTokenManager;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.concurrent.ScheduledExecutor;

Expand Down Expand Up @@ -189,7 +191,7 @@ protected Optional<Long> obtainDelegationTokensAndGetNextRenewal(Credentials cre
.flatMap(nr -> nr.map(Stream::of).orElseGet(Stream::empty))
.min(Long::compare);

DelegationTokenUpdater.dumpAllTokens(credentials);
HadoopDelegationTokenUpdater.dumpAllTokens(credentials);

return nextRenewal;
}
Expand Down Expand Up @@ -269,11 +271,12 @@ void startTokensUpdate() {
Optional<Long> nextRenewal = obtainDelegationTokensAndGetNextRenewal(credentials);

if (credentials.numberOfTokens() > 0) {
byte[] credentialsBytes = DelegationTokenConverter.serialize(credentials);
byte[] credentialsBytes = HadoopDelegationTokenConverter.serialize(credentials);

DelegationTokenUpdater.addCurrentUserCredentials(credentialsBytes);
HadoopDelegationTokenUpdater.addCurrentUserCredentials(credentialsBytes);

LOG.info("Notifying listener about new tokens");
checkNotNull(delegationTokenListener, "Listener must not be null");
delegationTokenListener.onNewTokensObtained(credentialsBytes);
LOG.info("Listener notified successfully");
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@
* limitations under the License.
*/

package org.apache.flink.runtime.security.token;
package org.apache.flink.runtime.security.token.hadoop;

import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.runtime.hadoop.HadoopDependency;
import org.apache.flink.runtime.security.token.DelegationTokenManager;
import org.apache.flink.runtime.security.token.NoOpDelegationTokenManager;
import org.apache.flink.util.concurrent.ScheduledExecutor;

import org.slf4j.Logger;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* limitations under the License.
*/

package org.apache.flink.runtime.security.token;
package org.apache.flink.runtime.security.token.hadoop;

import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.Configuration;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcServiceUtils;
import org.apache.flink.runtime.security.token.DelegationTokenUpdater;
import org.apache.flink.runtime.security.token.hadoop.HadoopDelegationTokenUpdater;
import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager;
Expand Down Expand Up @@ -1340,7 +1340,7 @@ public CompletableFuture<Acknowledge> updateDelegationTokens(
}

try {
DelegationTokenUpdater.addCurrentUserCredentials(tokens);
HadoopDelegationTokenUpdater.addCurrentUserCredentials(tokens);
return CompletableFuture.completedFuture(Acknowledge.get());
} catch (Throwable t) {
log.error("Could not update delegation tokens.", t);
Expand Down Expand Up @@ -2379,7 +2379,8 @@ public void onRegistrationSuccess(
if (success.getInitialTokens() != null) {
try {
log.info("Receive initial delegation tokens from resource manager");
DelegationTokenUpdater.addCurrentUserCredentials(success.getInitialTokens());
HadoopDelegationTokenUpdater.addCurrentUserCredentials(
success.getInitialTokens());
} catch (Throwable t) {
log.error("Could not update delegation tokens.", t);
ExceptionUtils.rethrowIfFatalError(t);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,5 @@
# See the License for the specific language governing permissions and
# limitations under the License.

org.apache.flink.runtime.security.token.HadoopFSDelegationTokenProvider
org.apache.flink.runtime.security.token.HBaseDelegationTokenProvider
org.apache.flink.runtime.security.token.hadoop.HadoopFSDelegationTokenProvider
org.apache.flink.runtime.security.token.hadoop.HBaseDelegationTokenProvider
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* limitations under the License.
*/

package org.apache.flink.runtime.security.token;
package org.apache.flink.runtime.security.token.hadoop;

import org.apache.flink.configuration.Configuration;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* limitations under the License.
*/

package org.apache.flink.runtime.security.token;
package org.apache.flink.runtime.security.token.hadoop;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.Credentials;
Expand All @@ -28,8 +28,8 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;

/** Test for {@link DelegationTokenConverter}. */
public class DelegationTokenConverterTest {
/** Test for {@link HadoopDelegationTokenConverter}. */
public class HadoopDelegationTokenConverterTest {

@Test
public void testRoundTrip() throws IOException {
Expand All @@ -39,9 +39,9 @@ public void testRoundTrip() throws IOException {
credentials.addToken(
tokenService, new Token<>(new byte[4], new byte[4], tokenKind, tokenService));

byte[] credentialsBytes = DelegationTokenConverter.serialize(credentials);
byte[] credentialsBytes = HadoopDelegationTokenConverter.serialize(credentials);
Credentials deserializedCredentials =
DelegationTokenConverter.deserialize(credentialsBytes);
HadoopDelegationTokenConverter.deserialize(credentialsBytes);

assertEquals(1, credentials.getAllTokens().size());
assertEquals(1, deserializedCredentials.getAllTokens().size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* limitations under the License.
*/

package org.apache.flink.runtime.security.token;
package org.apache.flink.runtime.security.token.hadoop;

import org.apache.commons.collections.CollectionUtils;
import org.apache.hadoop.io.Text;
Expand All @@ -36,8 +36,8 @@
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

/** Test for {@link DelegationTokenConverter}. */
public class DelegationTokenUpdaterITCase {
/** Test for {@link HadoopDelegationTokenConverter}. */
public class HadoopDelegationTokenUpdaterITCase {

@Test
public void addCurrentUserCredentialsShouldThrowExceptionWhenNullCredentials() {
Expand All @@ -58,7 +58,7 @@ private void addCurrentUserCredentialsShouldThrowException(byte[] credentialsByt
assertThrows(
IllegalArgumentException.class,
() ->
DelegationTokenUpdater.addCurrentUserCredentials(
HadoopDelegationTokenUpdater.addCurrentUserCredentials(
credentialsBytes));
assertTrue(e.getMessage().contains("Illegal credentials"));
}
Expand All @@ -72,13 +72,13 @@ public void addCurrentUserCredentialsShouldOverwriteCredentials() throws IOExcep
credentials.addToken(
tokenService, new Token<>(new byte[4], new byte[4], tokenKind, tokenService));

byte[] credentialsBytes = DelegationTokenConverter.serialize(credentials);
byte[] credentialsBytes = HadoopDelegationTokenConverter.serialize(credentials);

try (MockedStatic<UserGroupInformation> ugi = mockStatic(UserGroupInformation.class)) {
UserGroupInformation userGroupInformation = mock(UserGroupInformation.class);
ugi.when(UserGroupInformation::getCurrentUser).thenReturn(userGroupInformation);

DelegationTokenUpdater.addCurrentUserCredentials(credentialsBytes);
HadoopDelegationTokenUpdater.addCurrentUserCredentials(credentialsBytes);
ArgumentCaptor<Credentials> argumentCaptor = ArgumentCaptor.forClass(Credentials.class);
verify(userGroupInformation, times(1)).addCredentials(argumentCaptor.capture());
assertTrue(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* limitations under the License.
*/

package org.apache.flink.runtime.security.token;
package org.apache.flink.runtime.security.token.hadoop;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
Expand Down Expand Up @@ -47,17 +47,20 @@ class HadoopFSDelegationTokenProviderITCase {
final Text tokenService1 = new Text("TEST_TOKEN_SERVICE1");
final Text tokenService2 = new Text("TEST_TOKEN_SERVICE2");

private class TestDelegationToken extends Token<TestDelegationTokenIdentifier> {
private class TestDelegationToken extends Token<TestHadoopDelegationTokenIdentifier> {

private long newExpiration;

public TestDelegationToken(
Text tokenService, TestDelegationTokenIdentifier identifier, long newExpiration) {
Text tokenService,
TestHadoopDelegationTokenIdentifier identifier,
long newExpiration) {
super(identifier.getBytes(), new byte[4], identifier.getKind(), tokenService);
this.newExpiration = newExpiration;
}

public TestDelegationToken(Text tokenService, TestDelegationTokenIdentifier identifier) {
public TestDelegationToken(
Text tokenService, TestHadoopDelegationTokenIdentifier identifier) {
this(tokenService, identifier, 0L);
}

Expand Down Expand Up @@ -112,14 +115,14 @@ protected void obtainDelegationTokens(
String renewer,
Set<FileSystem> fileSystemsToAccess,
Credentials credentials) {
TestDelegationTokenIdentifier tokenIdentifier1 =
new TestDelegationTokenIdentifier(NOW);
TestHadoopDelegationTokenIdentifier tokenIdentifier1 =
new TestHadoopDelegationTokenIdentifier(NOW);
credentials.addToken(
tokenService1,
new TestDelegationToken(tokenService1, tokenIdentifier1, NOW + 1));

TestDelegationTokenIdentifier tokenIdentifier2 =
new TestDelegationTokenIdentifier(NOW);
TestHadoopDelegationTokenIdentifier tokenIdentifier2 =
new TestHadoopDelegationTokenIdentifier(NOW);
credentials.addToken(
tokenService2,
new TestDelegationToken(tokenService2, tokenIdentifier2, NOW + 2));
Expand Down Expand Up @@ -155,10 +158,12 @@ public void getTokenRenewalDateShouldReturnMinWhenMultipleTokens() {
HadoopFSDelegationTokenProvider provider = new HadoopFSDelegationTokenProvider();
Clock constantClock = Clock.fixed(ofEpochMilli(NOW), ZoneId.systemDefault());
Credentials credentials = new Credentials();
TestDelegationTokenIdentifier tokenIdentifier1 = new TestDelegationTokenIdentifier(NOW);
TestHadoopDelegationTokenIdentifier tokenIdentifier1 =
new TestHadoopDelegationTokenIdentifier(NOW);
credentials.addToken(
tokenService1, new TestDelegationToken(tokenService1, tokenIdentifier1));
TestDelegationTokenIdentifier tokenIdentifier2 = new TestDelegationTokenIdentifier(NOW + 1);
TestHadoopDelegationTokenIdentifier tokenIdentifier2 =
new TestHadoopDelegationTokenIdentifier(NOW + 1);
credentials.addToken(
tokenService2, new TestDelegationToken(tokenService2, tokenIdentifier2));

Expand All @@ -173,7 +178,7 @@ public void getIssueDateShouldReturnIssueDateWithFutureToken() {
Clock constantClock = Clock.fixed(ofEpochMilli(NOW), ZoneId.systemDefault());
long issueDate = NOW + 1;
AbstractDelegationTokenIdentifier tokenIdentifier =
new TestDelegationTokenIdentifier(issueDate);
new TestHadoopDelegationTokenIdentifier(issueDate);

assertEquals(
issueDate,
Expand All @@ -188,7 +193,7 @@ public void getIssueDateShouldReturnIssueDateWithPastToken() {
Clock constantClock = Clock.fixed(ofEpochMilli(NOW), ZoneId.systemDefault());
long issueDate = NOW - 1;
AbstractDelegationTokenIdentifier tokenIdentifier =
new TestDelegationTokenIdentifier(issueDate);
new TestHadoopDelegationTokenIdentifier(issueDate);

assertEquals(
issueDate,
Expand All @@ -203,7 +208,7 @@ public void getIssueDateShouldReturnNowWithInvalidToken() {
Clock constantClock = Clock.fixed(ofEpochMilli(NOW), ZoneId.systemDefault());
long issueDate = -1;
AbstractDelegationTokenIdentifier tokenIdentifier =
new TestDelegationTokenIdentifier(issueDate);
new TestHadoopDelegationTokenIdentifier(issueDate);

assertEquals(
NOW,
Expand Down
Loading

0 comments on commit 4ba3366

Please sign in to comment.