Skip to content

Commit

Permalink
Add getCoordinators method to NodeManager
Browse files Browse the repository at this point in the history
  • Loading branch information
nileema committed Feb 12, 2015
1 parent 0d58117 commit 8a0b026
Show file tree
Hide file tree
Showing 6 changed files with 45 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,12 @@ public Node getCurrentNode()
{
return LOCAL_NODE;
}

@Override
public Set<Node> getCoordinators()
{
return ImmutableSet.of(LOCAL_NODE);
}
}

private static class TestingNode
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ public final class DiscoveryNodeManager
@GuardedBy("this")
private PrestoNode currentNode;

@GuardedBy("this")
private Set<Node> coordinators;

@Inject
public DiscoveryNodeManager(@ServiceType("presto") ServiceSelector serviceSelector, NodeInfo nodeInfo, FailureDetector failureDetector, NodeVersion expectedNodeVersion)
{
Expand Down Expand Up @@ -92,6 +95,7 @@ public synchronized void refreshNodes()

ImmutableSet.Builder<Node> activeNodesBuilder = ImmutableSet.builder();
ImmutableSet.Builder<Node> inactiveNodesBuilder = ImmutableSet.builder();
ImmutableSet.Builder<Node> coordinatorsBuilder = ImmutableSet.builder();
ImmutableSetMultimap.Builder<String, Node> byDataSourceBuilder = ImmutableSetMultimap.builder();

for (ServiceDescriptor service : services) {
Expand All @@ -108,6 +112,9 @@ public synchronized void refreshNodes()

if (isActive(node)) {
activeNodesBuilder.add(node);
if (Boolean.parseBoolean(service.getProperties().get("coordinator"))) {
coordinatorsBuilder.add(node);
}

// record available active nodes organized by data source
String dataSources = service.getProperties().get("datasources");
Expand All @@ -129,6 +136,7 @@ public synchronized void refreshNodes()

allNodes = new AllNodes(activeNodesBuilder.build(), inactiveNodesBuilder.build());
activeNodesByDataSource = byDataSourceBuilder.build();
coordinators = coordinatorsBuilder.build();

checkState(currentNode != null, "INVARIANT: current node not returned from service selector");
}
Expand Down Expand Up @@ -172,6 +180,13 @@ public synchronized Node getCurrentNode()
return currentNode;
}

@Override
public synchronized Set<Node> getCoordinators()
{
refreshIfNecessary();
return coordinators;
}

private static URI getHttpUri(ServiceDescriptor descriptor)
{
// favor https over http
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,13 @@ public Node getCurrentNode()
return localNode;
}

@Override
public Set<Node> getCoordinators()
{
// always use localNode as coordinator
return ImmutableSet.of(localNode);
}

@Override
public void refreshNodes()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,7 @@ protected void setup(Binder binder)
// presto announcement
discoveryBinder(binder).bindHttpAnnouncement("presto")
.addProperty("node_version", nodeVersion.toString())
.addProperty("coordinator", String.valueOf(serverConfig.isCoordinator()))
.addProperty("datasources", nullToEmpty(serverConfig.getDataSources()));

// statement resource
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@

import com.facebook.presto.server.NoOpFailureDetector;
import com.facebook.presto.spi.Node;
import com.facebook.presto.spi.NodeManager;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import io.airlift.discovery.client.ServiceDescriptor;
import io.airlift.discovery.client.ServiceSelector;
Expand Down Expand Up @@ -43,17 +45,19 @@ public class TestDiscoveryNodeManager
private NodeVersion expectedVersion;
private List<PrestoNode> activeNodes;
private List<PrestoNode> inactiveNodes;
private PrestoNode coordinator;
private ServiceSelector selector;

@BeforeMethod
public void setup()
{
expectedVersion = new NodeVersion("1");
coordinator = new PrestoNode(UUID.randomUUID().toString(), URI.create("https://192.0.2.8"), expectedVersion);
activeNodes = ImmutableList.of(
new PrestoNode(nodeInfo.getNodeId(), URI.create("https://192.0.1.1"), expectedVersion),
new PrestoNode(UUID.randomUUID().toString(), URI.create("https://192.0.2.1:8080"), expectedVersion),
new PrestoNode(UUID.randomUUID().toString(), URI.create("https://192.0.2.3"), expectedVersion),
new PrestoNode(UUID.randomUUID().toString(), URI.create("https://192.0.2.8"), expectedVersion));
coordinator);
inactiveNodes = ImmutableList.of(
new PrestoNode(UUID.randomUUID().toString(), URI.create("https://192.0.3.9"), NodeVersion.UNKNOWN),
new PrestoNode(UUID.randomUUID().toString(), URI.create("https://192.0.4.9"), new NodeVersion("2"))
Expand All @@ -65,6 +69,7 @@ public void setup()
.setNodeId(node.getNodeIdentifier())
.addProperty("http", node.getHttpUri().toString())
.addProperty("node_version", node.getNodeVersion().toString())
.addProperty("coordinator", String.valueOf(node.equals(coordinator)))
.build());
}

Expand Down Expand Up @@ -111,6 +116,14 @@ public void testGetCurrentNode()
assertEquals(manager.getCurrentNode(), expected);
}

@Test
public void testGetCoordinators()
throws Exception
{
NodeManager manager = new DiscoveryNodeManager(selector, nodeInfo, new NoOpFailureDetector(), expectedVersion);
assertEquals(manager.getCoordinators(), ImmutableSet.of(coordinator));
}

@SuppressWarnings("ResultOfObjectAllocationIgnored")
@Test(expectedExceptions = IllegalStateException.class, expectedExceptionsMessageRegExp = ".* current node not returned .*")
public void testGetCurrentNodeRequired()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,6 @@ public interface NodeManager
Set<Node> getActiveDatasourceNodes(String datasourceName);

Node getCurrentNode();

Set<Node> getCoordinators();
}

0 comments on commit 8a0b026

Please sign in to comment.