Skip to content

Commit

Permalink
Remove pluggable global connectors
Browse files Browse the repository at this point in the history
We only have one connector (the system connector), so get rid of the abstraction.
Also, since the system needs to have specialized logic for dealing with sys tables,
get rid of the system connector. We'll revisit this once we figure out how we
want to model it going forward.
  • Loading branch information
martint committed Sep 22, 2014
1 parent c7b7516 commit db9716a
Show file tree
Hide file tree
Showing 15 changed files with 72 additions and 81 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,7 @@
import com.facebook.presto.split.SplitManager;
import com.google.inject.Inject;

import javax.annotation.Nullable;

import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

Expand Down Expand Up @@ -71,7 +68,6 @@ public ConnectorManager(MetadataManager metadataManager,
RecordSinkManager recordSinkManager,
HandleResolver handleResolver,
Map<String, ConnectorFactory> connectorFactories,
Map<String, Connector> globalConnectors,
NodeManager nodeManager)
{
this.metadataManager = metadataManager;
Expand All @@ -82,11 +78,6 @@ public ConnectorManager(MetadataManager metadataManager,
this.handleResolver = handleResolver;
this.nodeManager = nodeManager;
this.connectorFactories.putAll(connectorFactories);

// add the global connectors
for (Entry<String, Connector> entry : globalConnectors.entrySet()) {
addGlobalConnector(entry.getKey(), entry.getValue());
}
}

public void addConnectorFactory(ConnectorFactory connectorFactory)
Expand Down Expand Up @@ -122,12 +113,7 @@ public synchronized void createConnection(String catalogName, ConnectorFactory c
addConnector(catalogName, connectorId, connector);
}

public void addGlobalConnector(String connectorId, Connector connector)
{
addConnector(null, connectorId, connector);
}

private void addConnector(@Nullable String catalogName, String connectorId, Connector connector)
private void addConnector(String catalogName, String connectorId, Connector connector)
{
ConnectorMetadata connectorMetadata = connector.getMetadata();
checkState(connectorMetadata != null, "Connector %s can not provide metadata", connectorId);
Expand Down Expand Up @@ -177,16 +163,11 @@ private void addConnector(@Nullable String catalogName, String connectorId, Conn
// IMPORTANT: all the instances need to be fetched from the connector *before* we add them to the corresponding managers.
// Otherwise, a broken connector would leave the managers in an inconsistent state with respect to each other

if (catalogName != null) {
metadataManager.addConnectorMetadata(connectorId, catalogName, connectorMetadata);
metadataManager.addConnectorMetadata(connectorId, catalogName, connectorMetadata);

metadataManager.addInformationSchemaMetadata(makeInformationSchemaConnectorId(connectorId), catalogName, new InformationSchemaMetadata(catalogName));
splitManager.addConnectorSplitManager(makeInformationSchemaConnectorId(connectorId), new InformationSchemaSplitManager(nodeManager));
pageSourceManager.addConnectorPageSourceProvider(makeInformationSchemaConnectorId(connectorId), new InformationSchemaPageSourceProvider(metadataManager, splitManager));
}
else {
metadataManager.addGlobalSchemaMetadata(connectorId, connectorMetadata);
}
metadataManager.addInformationSchemaMetadata(makeInformationSchemaConnectorId(connectorId), catalogName, new InformationSchemaMetadata(catalogName));
splitManager.addConnectorSplitManager(makeInformationSchemaConnectorId(connectorId), new InformationSchemaSplitManager(nodeManager));
pageSourceManager.addConnectorPageSourceProvider(makeInformationSchemaConnectorId(connectorId), new InformationSchemaPageSourceProvider(metadataManager, splitManager));

splitManager.addConnectorSplitManager(connectorId, connectorSplitManager);
handleResolver.addHandleResolver(connectorId, connectorHandleResolver);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package com.facebook.presto.connector.informationSchema;

import com.facebook.presto.connector.system.SystemTablesManager;
import com.facebook.presto.spi.ConnectorColumnHandle;
import com.facebook.presto.spi.ConnectorPartition;
import com.facebook.presto.spi.ConnectorPartitionResult;
Expand All @@ -35,7 +36,6 @@
import java.util.Map;
import java.util.Map.Entry;

import static com.facebook.presto.connector.system.SystemSplitManager.SYSTEM_DATASOURCE;
import static com.facebook.presto.util.Types.checkType;
import static com.google.common.base.Preconditions.checkNotNull;

Expand Down Expand Up @@ -69,7 +69,7 @@ public ConnectorSplitSource getPartitionSplits(ConnectorTableHandle table, List<
{
checkNotNull(partitions, "partitions is null");
if (partitions.isEmpty()) {
return new FixedSplitSource(SYSTEM_DATASOURCE, ImmutableList.<ConnectorSplit>of());
return new FixedSplitSource(SystemTablesManager.CONNECTOR_ID, ImmutableList.<ConnectorSplit>of());
}

ConnectorPartition partition = Iterables.getOnlyElement(partitions);
Expand All @@ -85,7 +85,7 @@ public ConnectorSplitSource getPartitionSplits(ConnectorTableHandle table, List<

ConnectorSplit split = new InformationSchemaSplit(informationSchemaPartition.getTable(), filters.build(), localAddress);

return new FixedSplitSource(SYSTEM_DATASOURCE, ImmutableList.of(split));
return new FixedSplitSource(SystemTablesManager.CONNECTOR_ID, ImmutableList.of(split));
}

public static class InformationSchemaPartition
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
public class SystemSplitManager
implements ConnectorSplitManager
{
public static final String SYSTEM_DATASOURCE = "system";
private final NodeManager nodeManager;
private final ConcurrentMap<SchemaTableName, SystemTable> tables = new ConcurrentHashMap<>();

Expand Down Expand Up @@ -76,7 +75,7 @@ public ConnectorSplitSource getPartitionSplits(ConnectorTableHandle table, List<
{
checkNotNull(partitions, "partitions is null");
if (partitions.isEmpty()) {
return new FixedSplitSource(SYSTEM_DATASOURCE, ImmutableList.<ConnectorSplit>of());
return new FixedSplitSource(SystemTablesManager.CONNECTOR_ID, ImmutableList.<ConnectorSplit>of());
}

ConnectorPartition partition = Iterables.getOnlyElement(partitions);
Expand All @@ -90,12 +89,12 @@ public ConnectorSplitSource getPartitionSplits(ConnectorTableHandle table, List<
for (Node node : nodeManager.getActiveNodes()) {
splits.add(new SystemSplit(systemPartition.getTableHandle(), node.getHostAndPort()));
}
return new FixedSplitSource(SYSTEM_DATASOURCE, splits.build());
return new FixedSplitSource(SystemTablesManager.CONNECTOR_ID, splits.build());
}

HostAddress address = nodeManager.getCurrentNode().getHostAndPort();
ConnectorSplit split = new SystemSplit(systemPartition.getTableHandle(), address);
return new FixedSplitSource(SYSTEM_DATASOURCE, ImmutableList.of(split));
return new FixedSplitSource(SystemTablesManager.CONNECTOR_ID, ImmutableList.of(split));
}

public static class SystemPartition
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
//
public class SystemTablesManager
{
public static final String CONNECTOR_ID = "system";
private final SystemTablesMetadata metadata;
private final SystemSplitManager splitManager;
private final SystemRecordSetProvider recordSetProvider;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,26 +13,23 @@
*/
package com.facebook.presto.connector.system;

import com.facebook.presto.spi.Connector;
import com.facebook.presto.spi.SystemTable;
import com.google.inject.Binder;
import com.google.inject.Module;
import com.google.inject.Scopes;
import com.google.inject.multibindings.Multibinder;

import static com.google.inject.multibindings.MapBinder.newMapBinder;

public class SystemTablesModule
implements Module
{
@Override
public void configure(Binder binder)
{
newMapBinder(binder, String.class, Connector.class).addBinding("system").to(SystemConnector.class);
binder.bind(SystemTablesManager.class).in(Scopes.SINGLETON);
binder.bind(SystemTablesMetadata.class).in(Scopes.SINGLETON);
binder.bind(SystemSplitManager.class).in(Scopes.SINGLETON);
binder.bind(SystemRecordSetProvider.class).in(Scopes.SINGLETON);

Multibinder<SystemTable> globalTableBinder = Multibinder.newSetBinder(binder, SystemTable.class);
globalTableBinder.addBinding().to(NodesSystemTable.class).in(Scopes.SINGLETON);
globalTableBinder.addBinding().to(QuerySystemTable.class).in(Scopes.SINGLETON);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package com.facebook.presto.metadata;

import com.facebook.presto.connector.system.SystemTablesManager;
import com.facebook.presto.failureDetector.FailureDetector;
import com.facebook.presto.spi.Node;
import com.facebook.presto.util.IterableTransformer;
Expand All @@ -35,7 +36,6 @@
import java.util.Set;
import java.util.concurrent.TimeUnit;

import static com.facebook.presto.connector.system.SystemSplitManager.SYSTEM_DATASOURCE;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Predicates.in;
Expand Down Expand Up @@ -120,7 +120,7 @@ public synchronized void refreshNodes()
}

// always add system data source
byDataSourceBuilder.put(SYSTEM_DATASOURCE, node);
byDataSourceBuilder.put(SystemTablesManager.CONNECTOR_ID, node);
}
else {
inactiveNodesBuilder.add(node);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
*/
package com.facebook.presto.metadata;

import com.facebook.presto.connector.system.SystemHandleResolver;
import com.facebook.presto.connector.system.SystemTablesManager;
import com.facebook.presto.spi.ConnectorColumnHandle;
import com.facebook.presto.spi.ConnectorHandleResolver;
import com.facebook.presto.spi.ConnectorIndexHandle;
Expand Down Expand Up @@ -43,6 +45,7 @@ public HandleResolver()
public HandleResolver(Map<String, ConnectorHandleResolver> handleIdResolvers)
{
this.handleIdResolvers.putAll(handleIdResolvers);
this.handleIdResolvers.put(SystemTablesManager.CONNECTOR_ID, new SystemHandleResolver());
}

public void addHandleResolver(String id, ConnectorHandleResolver connectorHandleResolver)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
package com.facebook.presto.metadata;

import com.facebook.presto.connector.informationSchema.InformationSchemaMetadata;
import com.facebook.presto.connector.system.SystemTablesManager;
import com.facebook.presto.connector.system.SystemTablesMetadata;
import com.facebook.presto.spi.ColumnMetadata;
import com.facebook.presto.spi.ConnectorColumnHandle;
import com.facebook.presto.spi.ConnectorInsertTableHandle;
Expand All @@ -32,12 +34,12 @@
import com.facebook.presto.type.TypeDeserializer;
import com.facebook.presto.type.TypeRegistry;
import com.fasterxml.jackson.databind.JsonDeserializer;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import io.airlift.json.JsonCodec;
import io.airlift.json.JsonCodecFactory;
import io.airlift.json.ObjectMapperProvider;
Expand Down Expand Up @@ -70,30 +72,34 @@
public class MetadataManager
implements Metadata
{
private final Set<String> globalConnectors = Sets.newConcurrentHashSet();
private final SystemTablesMetadata systemMetadata;
private final ConcurrentMap<String, ConnectorMetadataEntry> informationSchemasByCatalog = new ConcurrentHashMap<>();
private final ConcurrentMap<String, ConnectorMetadataEntry> connectorsByCatalog = new ConcurrentHashMap<>();
private final ConcurrentMap<String, ConnectorMetadata> connectorsById = new ConcurrentHashMap<>();
private final FunctionRegistry functions;
private final TypeManager typeManager;
private final JsonCodec<ViewDefinition> viewCodec;

@VisibleForTesting
public MetadataManager()
{
this(new FeaturesConfig(), new TypeRegistry());
this(new FeaturesConfig(), new TypeRegistry(), new SystemTablesMetadata());
}

public MetadataManager(FeaturesConfig featuresConfig, TypeManager typeManager)
public MetadataManager(FeaturesConfig featuresConfig, TypeManager typeManager, SystemTablesMetadata systemMetadata)
{
this(featuresConfig, typeManager, createTestingViewCodec());
this(featuresConfig, typeManager, createTestingViewCodec(), systemMetadata);
}

@Inject
public MetadataManager(FeaturesConfig featuresConfig, TypeManager typeManager, JsonCodec<ViewDefinition> viewCodec)
public MetadataManager(FeaturesConfig featuresConfig, TypeManager typeManager, JsonCodec<ViewDefinition> viewCodec, SystemTablesMetadata systemMetadata)
{
functions = new FunctionRegistry(typeManager, featuresConfig.isExperimentalSyntaxEnabled());
this.typeManager = checkNotNull(typeManager, "types is null");
this.viewCodec = checkNotNull(viewCodec, "viewCodec is null");
this.systemMetadata = checkNotNull(systemMetadata, "systemMetadata is null");

connectorsById.put(SystemTablesManager.CONNECTOR_ID, systemMetadata);
}

public synchronized void addConnectorMetadata(String connectorId, String catalogName, ConnectorMetadata connectorMetadata)
Expand Down Expand Up @@ -124,18 +130,6 @@ public synchronized void addInformationSchemaMetadata(String connectorId, String
informationSchemasByCatalog.put(catalogName, new ConnectorMetadataEntry(connectorId, metadata));
}

public synchronized void addGlobalSchemaMetadata(String connectorId, ConnectorMetadata connectorMetadata)
{
checkNotNull(connectorId, "connectorId is null");
checkNotNull(connectorMetadata, "connectorMetadata is null");

checkArgument(!globalConnectors.contains(connectorId), "Global connector '%s' is already registered", connectorId);
checkArgument(!connectorsById.containsKey(connectorId), "Connector '%s' is already registered", connectorId);

connectorsById.put(connectorId, connectorMetadata);
globalConnectors.add(connectorId);
}

@Override
public Type getType(String typeName)
{
Expand Down Expand Up @@ -464,9 +458,7 @@ private List<ConnectorMetadataEntry> allConnectorsFor(String catalogName)
{
ImmutableList.Builder<ConnectorMetadataEntry> builder = ImmutableList.builder();

for (String connectorId : globalConnectors) {
builder.add(new ConnectorMetadataEntry(connectorId, connectorsById.get(connectorId)));
}
builder.add(new ConnectorMetadataEntry(SystemTablesManager.CONNECTOR_ID, systemMetadata));

ConnectorMetadataEntry entry = informationSchemasByCatalog.get(catalogName);
if (entry != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
*/
package com.facebook.presto.split;

import com.facebook.presto.connector.system.SystemRecordSetProvider;
import com.facebook.presto.connector.system.SystemTablesManager;
import com.facebook.presto.metadata.ColumnHandle;
import com.facebook.presto.metadata.Split;
import com.facebook.presto.spi.ConnectorColumnHandle;
Expand All @@ -36,8 +38,9 @@ public class PageSourceManager
private final ConcurrentMap<String, ConnectorPageSourceProvider> pageSourceProviders = new ConcurrentHashMap<>();

@Inject
public PageSourceManager()
public PageSourceManager(SystemRecordSetProvider systemRecordSetProvider)
{
pageSourceProviders.put(SystemTablesManager.CONNECTOR_ID, new RecordPageSourceProvider(systemRecordSetProvider));
}

public void addConnectorPageSourceProvider(String connectorId, ConnectorPageSourceProvider connectorPageSourceProvider)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
*/
package com.facebook.presto.split;

import com.facebook.presto.connector.system.SystemSplitManager;
import com.facebook.presto.connector.system.SystemTablesManager;
import com.facebook.presto.execution.ConnectorAwareSplitSource;
import com.facebook.presto.execution.SplitSource;
import com.facebook.presto.metadata.ColumnHandle;
Expand All @@ -29,18 +31,28 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;

import javax.inject.Inject;

import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

import static com.facebook.presto.metadata.Partition.connectorPartitionGetter;
import static com.facebook.presto.metadata.Util.toConnectorDomain;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;

public class SplitManager
{
private final ConcurrentMap<String, ConnectorSplitManager> splitManagers = new ConcurrentHashMap<>();
private final SystemSplitManager systemSplitManager;

@Inject
public SplitManager(SystemSplitManager systemSplitManager)
{
this.systemSplitManager = checkNotNull(systemSplitManager, "systemSplitManager is null");
}

public void addConnectorSplitManager(String connectorId, ConnectorSplitManager connectorSplitManager)
{
Expand Down Expand Up @@ -71,9 +83,14 @@ public SplitSource getPartitionSplits(TableHandle handle, List<Partition> partit

private ConnectorSplitManager getConnectorSplitManager(TableHandle handle)
{
ConnectorSplitManager result = splitManagers.get(handle.getConnectorId());
String connectorId = handle.getConnectorId();

if (connectorId.equals(SystemTablesManager.CONNECTOR_ID)) {
return systemSplitManager;
}

checkArgument(result != null, "No split manager for connector '%s'", handle.getConnectorId());
ConnectorSplitManager result = splitManagers.get(connectorId);
checkArgument(result != null, "No split manager for connector '%s'", connectorId);

return result;
}
Expand Down
Loading

0 comments on commit db9716a

Please sign in to comment.