Skip to content

Commit

Permalink
Adding kerberos & delegation token support
Browse files Browse the repository at this point in the history
  • Loading branch information
sunchao committed Apr 29, 2016
1 parent e041ea4 commit 7d9421a
Show file tree
Hide file tree
Showing 6 changed files with 170 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
*/
package com.facebook.presto.recordservice;

import com.cloudera.recordservice.core.DelegationToken;
import com.cloudera.recordservice.core.PlanRequestResult;
import com.cloudera.recordservice.core.RecordServiceException;
import com.cloudera.recordservice.core.RecordServicePlannerClient;
import com.cloudera.recordservice.core.Request;
import com.cloudera.recordservice.core.Schema;
Expand All @@ -23,13 +23,11 @@

import io.airlift.log.Logger;

import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.Random;

import javax.inject.Inject;

Expand All @@ -49,8 +47,7 @@ public List<String> getDatabases()
{
try {
HostAddress plannerAddr = getPlannerHostAddress();
return new RecordServicePlannerClient.Builder()
.getDatabases(plannerAddr.getHostText(), plannerAddr.getPort());
return getPlannerBuilder().getDatabases(plannerAddr.getHostText(), plannerAddr.getPort());
} catch (Exception e) {
throw new PrestoException(RecordServiceErrorCode.CATALOG_ERROR, e);
}
Expand All @@ -60,7 +57,7 @@ public List<String> getTables(String db)
{
try {
HostAddress plannerAddr = getPlannerHostAddress();
return new RecordServicePlannerClient.Builder()
return getPlannerBuilder()
.getTables(plannerAddr.getHostText(), plannerAddr.getPort(), db);
} catch (Exception e) {
throw new PrestoException(RecordServiceErrorCode.CATALOG_ERROR, e);
Expand All @@ -71,46 +68,91 @@ public Schema getSchema(String db, String table) {
try {
HostAddress plannerAddr = getPlannerHostAddress();
Request request = Request.createTableScanRequest(db + "." + table);
return new RecordServicePlannerClient.Builder()
return getPlannerBuilder()
.getSchema(plannerAddr.getHostText(), plannerAddr.getPort(), request).schema;
} catch (Exception e) {
throw new PrestoException(RecordServiceErrorCode.CATALOG_ERROR, e);
throw new PrestoException(RecordServiceErrorCode.PLAN_ERROR, e);
}
}

public PlanRequestResult getPlanResult(Request request) throws IOException, RecordServiceException
public RecordServicePlanResult getPlanResult(Request request)
{
HostAddress plannerAddr = getPlannerHostAddress();
LOG.info("Get planResult from " + plannerAddr);
return new RecordServicePlannerClient.Builder().planRequest(plannerAddr.getHostText(), plannerAddr.getPort(), request);
}

private HostAddress getPlannerHostAddress()
{
return config.getPlanners().iterator().next();
RecordServicePlannerClient plannerClient = null;
try {
HostAddress plannerAddr = getPlannerHostAddress();
LOG.info("Get planResult from " + plannerAddr);
RecordServicePlannerClient.Builder builder = getPlannerBuilder();
plannerClient = builder.connect(plannerAddr.getHostText(), plannerAddr.getPort());
PlanRequestResult planRequestResult = plannerClient.planRequest(request);
DelegationToken delegationToken = null;
if (plannerClient.isKerberosAuthenticated()) {
delegationToken = plannerClient.getDelegationToken("");
}
return new RecordServicePlanResult(planRequestResult, delegationToken);
} catch (Exception e) {
throw new PrestoException(RecordServiceErrorCode.PLAN_ERROR, e);
} finally {
if (plannerClient != null) {
plannerClient.close();
}
}
}

public static HostAddress getWorkerHostAddress(List<HostAddress> addresses)
public static HostAddress getWorkerHostAddress(List<HostAddress> addresses, List<HostAddress> globalAddresses)
{
String localHost = null;
String localHost;
try {
localHost = InetAddress.getLocalHost().getHostName();
}
catch (UnknownHostException e) {
LOG.error("Failed to get the local host.", e);
throw new PrestoException(RecordServiceErrorCode.TASK_ERROR, "Failed to get the local host.", e);
}

HostAddress address = null;

// 1. If the data is available on this node, schedule the task locally.
for (HostAddress add : addresses) {
if (localHost.equals(add.getHostText())) {
LOG.info("Both data and RecordServiceWorker are available locally for task.");
return add;
address = add;
break;
}
}

// 2. Otherwise, randomly pick a node.
Collections.shuffle(addresses);
// 2. Check if there's a RecordServiceWorker running locally. If so, pick that node.
if (address == null) {
for (HostAddress loc : globalAddresses) {
if (localHost.equals(loc.getHostText())) {
address = loc;
LOG.info("RecordServiceWorker is available locally for task");
break;
}
}
}

// 3. Finally, we don't have RecordServiceWorker running locally. Randomly pick
// a node from the global membership.
if (address == null) {
Random rand = new Random();
address = globalAddresses.get(rand.nextInt(globalAddresses.size()));
LOG.info("Neither RecordServiceWorker nor data is available locally for task {}." +
" Randomly selected host {} to execute it", address);
}

return addresses.get(0);
return address;
}

private HostAddress getPlannerHostAddress()
{
return config.getPlanners().iterator().next();
}

private RecordServicePlannerClient.Builder getPlannerBuilder()
{
RecordServicePlannerClient.Builder result = new RecordServicePlannerClient.Builder();
if (config.getKerberosPrincipal() != null) {
result.setKerberosPrincipal(config.getKerberosPrincipal());
}
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,20 +32,33 @@ public class RecordServiceConnectorConfig {
* RecordService planner host
*/
private Set<HostAddress> planners = ImmutableSet.of();
private String kerberosPrincipal = null;

@Size(min = 1)
public Set<HostAddress> getPlanners()
{
return planners;
}

public String getKerberosPrincipal()
{
return kerberosPrincipal;
}

@Config("recordservice.planner.hostports")
public RecordServiceConnectorConfig setPlanners(String plannerHostPorts)
{
this.planners = (plannerHostPorts == null) ? null : parsePlannerHostPorts(plannerHostPorts);
return this;
}

@Config("recordservice.kerberos.principal")
public RecordServiceConnectorConfig setKerberosPrincipal(String kerberosPrincipal)
{
this.kerberosPrincipal = kerberosPrincipal;
return this;
}

public static ImmutableSet<HostAddress> parsePlannerHostPorts(String plannerHostPorts)
{
Splitter splitter = Splitter.on(',').omitEmptyStrings().trimResults();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.recordservice;

import com.cloudera.recordservice.core.DelegationToken;
import com.cloudera.recordservice.core.PlanRequestResult;

public class RecordServicePlanResult {
public final PlanRequestResult planRequestResult;
public final DelegationToken delegationToken;

RecordServicePlanResult(PlanRequestResult planRequestResult, DelegationToken delegationToken)
{
this.planRequestResult = planRequestResult;
this.delegationToken = delegationToken;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package com.facebook.presto.recordservice;

import com.cloudera.recordservice.core.DelegationToken;
import com.cloudera.recordservice.core.RecordServiceException;
import com.cloudera.recordservice.core.RecordServiceWorkerClient;
import com.cloudera.recordservice.core.Records;
Expand All @@ -39,13 +40,20 @@ public class RecordServiceRecordSet implements RecordSet, Closeable

public RecordServiceRecordSet(RecordServiceSplit split)
{
HostAddress host = RecordServiceClient.getWorkerHostAddress(split.getAddresses());
HostAddress host = RecordServiceClient.getWorkerHostAddress(split.getAddresses(), split.getGlobalAddresses());

RecordServiceWorkerClient workerClient = null;
Records records = null;
DelegationToken delegationToken = null;
if (split.getIdentifier() != null) {
delegationToken = new DelegationToken(split.getIdentifier(), split.getPassword(), split.getToken());
}
try {
workerClient =
new RecordServiceWorkerClient.Builder().connect(host.getHostText(), host.getPort());
RecordServiceWorkerClient.Builder workerBuilder = new RecordServiceWorkerClient.Builder();
if (delegationToken != null) {
workerBuilder.setDelegationToken(delegationToken);
}
workerClient = workerBuilder.connect(host.getHostText(), host.getPort());
records = workerClient.execAndFetch(fromSplit(split));
records.setCloseWorker(true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.facebook.presto.spi.HostAddress;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonPropertyOrder;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;

Expand All @@ -34,6 +35,12 @@ public class RecordServiceSplit implements ConnectorSplit
private final long lo;
private final boolean resultOrdered;
private final List<HostAddress> addresses;
private final List<HostAddress> globalAddresses;

// Parts for DelegationToken. If no delegation token, these are all null.
private final String identifier;
private final String password;
private final byte[] token;

@JsonCreator
public RecordServiceSplit(
Expand All @@ -43,7 +50,11 @@ public RecordServiceSplit(
@JsonProperty("hi") long hi,
@JsonProperty("lo") long lo,
@JsonProperty("resultOrdered") boolean resultOrdered,
@JsonProperty("addresses") List<HostAddress> addresses)
@JsonProperty("addresses") List<HostAddress> addresses,
@JsonProperty("globalAddresses") List<HostAddress> globalAddresses,
@JsonProperty("identifier") String identifier,
@JsonProperty("password") String password,
@JsonProperty("token") byte[] token)
{
this.connectorId = requireNonNull(connectorId, "connectorId is null");
this.task = requireNonNull(task, "task is null");
Expand All @@ -52,6 +63,12 @@ public RecordServiceSplit(
this.lo = lo;
this.resultOrdered = resultOrdered;
this.addresses = ImmutableList.copyOf(requireNonNull(addresses, "addresses is null"));
this.globalAddresses = ImmutableList.copyOf(requireNonNull(globalAddresses, "globalAddresses is null"));

// These are nullable
this.identifier = identifier;
this.password = password;
this.token = token;
}

@Override
Expand Down Expand Up @@ -126,4 +143,28 @@ public boolean getResultOrdered()
{
return resultOrdered;
}

@JsonProperty
public List<HostAddress> getGlobalAddresses()
{
return globalAddresses;
}

@JsonProperty
public String getIdentifier()
{
return identifier;
}

@JsonProperty
public String getPassword()
{
return password;
}

@JsonProperty
public byte[] getToken()
{
return token;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package com.facebook.presto.recordservice;

import com.cloudera.recordservice.core.DelegationToken;
import com.cloudera.recordservice.core.NetworkAddress;
import com.cloudera.recordservice.core.PlanRequestResult;
import com.cloudera.recordservice.core.Request;
Expand Down Expand Up @@ -56,11 +57,17 @@ public ConnectorSplitSource getSplits(ConnectorTransactionHandle handle,
Request request = Request.createSqlRequest(layoutHandle.getQuery());

try {
PlanRequestResult planRequestResult = client.getPlanResult(request);
RecordServicePlanResult planResult = client.getPlanResult(request);
PlanRequestResult planRequestResult = planResult.planRequestResult;
DelegationToken delegationToken = planResult.delegationToken;
List<ConnectorSplit> splits = planRequestResult.tasks.stream()
.map(t -> new RecordServiceSplit(
connectorId, t.task, t.taskSize, t.taskId.hi,
t.taskId.lo, t.resultsOrdered, toHostAddress(planRequestResult.hosts)))
t.taskId.lo, t.resultsOrdered, toHostAddress(t.localHosts),
toHostAddress(planRequestResult.hosts),
delegationToken == null ? null : delegationToken.identifier,
delegationToken == null ? null : delegationToken.password,
delegationToken == null ? null : delegationToken.token))
.collect(Collectors.toList());
Collections.shuffle(splits);

Expand Down

0 comments on commit 7d9421a

Please sign in to comment.