Skip to content

Commit

Permalink
Extract native connector into separate plugin
Browse files Browse the repository at this point in the history
The native connector is now called Raptor and lives in presto-raptor.
  • Loading branch information
electrum committed May 21, 2014
1 parent 1ee856d commit 6b98ecc
Show file tree
Hide file tree
Showing 86 changed files with 1,814 additions and 1,102 deletions.
7 changes: 7 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
<module>presto-hive-cdh5</module>
<module>presto-example-http</module>
<module>presto-tpch</module>
<module>presto-raptor</module>
<module>presto-client</module>
<module>presto-parser</module>
<module>presto-main</module>
Expand Down Expand Up @@ -119,6 +120,12 @@
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>com.facebook.presto</groupId>
<artifactId>presto-raptor</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>com.facebook.presto</groupId>
<artifactId>presto-client</artifactId>
Expand Down
21 changes: 5 additions & 16 deletions presto-benchmark/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,6 @@
</properties>

<dependencies>
<dependency>
<groupId>io.airlift</groupId>
<artifactId>dbpool</artifactId>
</dependency>

<dependency>
<groupId>org.jdbi</groupId>
<artifactId>jdbi</artifactId>
</dependency>

<dependency>
<groupId>com.facebook.presto</groupId>
<artifactId>presto-spi</artifactId>
Expand All @@ -41,6 +31,11 @@
<artifactId>presto-main</artifactId>
</dependency>

<dependency>
<groupId>com.facebook.presto</groupId>
<artifactId>presto-raptor</artifactId>
</dependency>

<dependency>
<groupId>com.facebook.presto</groupId>
<artifactId>presto-tpch</artifactId>
Expand Down Expand Up @@ -96,12 +91,6 @@
<artifactId>annotations</artifactId>
</dependency>

<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<scope>runtime</scope>
</dependency>

<!-- for testing -->
<dependency>
<groupId>org.testng</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,47 +13,35 @@
*/
package com.facebook.presto.benchmark;

import com.facebook.presto.connector.NativeConnectorFactory;
import com.facebook.presto.metadata.ColumnMetadataMapper;
import com.facebook.presto.metadata.DatabaseLocalStorageManager;
import com.facebook.presto.metadata.DatabaseLocalStorageManagerConfig;
import com.facebook.presto.metadata.DatabaseShardManager;
import com.facebook.presto.metadata.InMemoryNodeManager;
import com.facebook.presto.metadata.LocalStorageManager;
import com.facebook.presto.metadata.MetadataManager;
import com.facebook.presto.metadata.NativeConnectorId;
import com.facebook.presto.metadata.NativeMetadata;
import com.facebook.presto.metadata.NativeRecordSinkProvider;
import com.facebook.presto.metadata.QualifiedTableName;
import com.facebook.presto.metadata.TableColumnMapper;
import com.facebook.presto.raptor.RaptorConnectorFactory;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.NodeManager;
import com.facebook.presto.split.NativeDataStreamProvider;
import com.facebook.presto.split.NativePartitionKey;
import com.facebook.presto.split.NativeSplitManager;
import com.facebook.presto.spi.block.BlockEncodingSerde;
import com.facebook.presto.spi.type.TypeManager;
import com.facebook.presto.testing.LocalQueryRunner;
import com.facebook.presto.tpch.TpchConnectorFactory;
import com.facebook.presto.tpch.testing.SampledTpchConnectorFactory;
import com.facebook.presto.type.TypeRegistry;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import io.airlift.dbpool.H2EmbeddedDataSource;
import io.airlift.dbpool.H2EmbeddedDataSourceConfig;
import org.skife.jdbi.v2.DBI;
import org.skife.jdbi.v2.IDBI;

import java.io.File;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.ExecutorService;

import static com.facebook.presto.spi.type.TimeZoneKey.UTC_KEY;
import static com.facebook.presto.testing.TestingBlockEncodingManager.createTestingBlockEncodingManager;

public final class BenchmarkQueryRunner
{
private BenchmarkQueryRunner()
{
}
private static final String TPCH_SAMPLED_CACHE_DIR = System.getProperty("tpchSampledCacheDir", "/tmp/presto_tpch/sampled_data_cache");
private static final String TPCH_CACHE_DIR = System.getProperty("tpchCacheDir", "/tmp/presto_tpch/data_cache");

private BenchmarkQueryRunner() {}

public static LocalQueryRunner createLocalSampledQueryRunner(ExecutorService executor)
{
Expand All @@ -64,12 +52,9 @@ public static LocalQueryRunner createLocalSampledQueryRunner(ExecutorService exe
InMemoryNodeManager nodeManager = localQueryRunner.getNodeManager();
localQueryRunner.createCatalog("tpch_sampled", new SampledTpchConnectorFactory(nodeManager, 1, 2), ImmutableMap.<String, String>of());

// add native
NativeConnectorFactory nativeConnectorFactory = createNativeConnectorFactory(
nodeManager,
localQueryRunner.getTypeManager(),
System.getProperty("tpchSampledCacheDir", "/tmp/presto_tpch/sampled_data_cache"));
localQueryRunner.createCatalog("default", nativeConnectorFactory, ImmutableMap.<String, String>of());
// add raptor
RaptorConnectorFactory raptorConnectorFactory = createRaptorConnectorFactory(TPCH_SAMPLED_CACHE_DIR, nodeManager);
localQueryRunner.createCatalog("default", raptorConnectorFactory, ImmutableMap.<String, String>of());

MetadataManager metadata = localQueryRunner.getMetadata();
if (!metadata.getTableHandle(session, new QualifiedTableName("default", "default", "orders")).isPresent()) {
Expand All @@ -90,12 +75,9 @@ public static LocalQueryRunner createLocalQueryRunner(ExecutorService executor)
InMemoryNodeManager nodeManager = localQueryRunner.getNodeManager();
localQueryRunner.createCatalog("tpch", new TpchConnectorFactory(nodeManager, 1), ImmutableMap.<String, String>of());

// add native
NativeConnectorFactory nativeConnectorFactory = createNativeConnectorFactory(
nodeManager,
localQueryRunner.getTypeManager(),
System.getProperty("tpchCacheDir", "/tmp/presto_tpch/data_cache"));
localQueryRunner.createCatalog("default", nativeConnectorFactory, ImmutableMap.<String, String>of());
// add raptor
RaptorConnectorFactory raptorConnectorFactory = createRaptorConnectorFactory(TPCH_CACHE_DIR, nodeManager);
localQueryRunner.createCatalog("default", raptorConnectorFactory, ImmutableMap.<String, String>of());

MetadataManager metadata = localQueryRunner.getMetadata();
if (!metadata.getTableHandle(session, new QualifiedTableName("default", "default", "orders")).isPresent()) {
Expand All @@ -107,51 +89,25 @@ public static LocalQueryRunner createLocalQueryRunner(ExecutorService executor)
return localQueryRunner;
}

private static NativeConnectorFactory createNativeConnectorFactory(NodeManager nodeManager, TypeRegistry typeRegistry, String cacheDir)
private static RaptorConnectorFactory createRaptorConnectorFactory(String cacheDir, NodeManager nodeManager)
{
try {
File dataDir = new File(cacheDir);
File databaseDir = new File(dataDir, "db");

IDBI localStorageManagerDbi = createDataSource(typeRegistry, databaseDir, "StorageManager");

DatabaseLocalStorageManagerConfig storageManagerConfig = new DatabaseLocalStorageManagerConfig()
.setCompressed(false)
.setDataDirectory(new File(dataDir, "data"));
LocalStorageManager localStorageManager = new DatabaseLocalStorageManager(localStorageManagerDbi, createTestingBlockEncodingManager(), storageManagerConfig);

NativeDataStreamProvider nativeDataStreamProvider = new NativeDataStreamProvider(localStorageManager);

NativeRecordSinkProvider nativeRecordSinkProvider = new NativeRecordSinkProvider(localStorageManager, nodeManager.getCurrentNode().getNodeIdentifier());
Map<String, String> config = ImmutableMap.<String, String>builder()
.put("metadata.db.type", "h2")
.put("metadata.db.filename", databaseDir.getAbsolutePath())
.put("storage.data-directory", dataDir.getAbsolutePath())
.build();

IDBI metadataDbi = createDataSource(typeRegistry, databaseDir, "Metastore");
DatabaseShardManager shardManager = new DatabaseShardManager(metadataDbi);
NativeMetadata nativeMetadata = new NativeMetadata(new NativeConnectorId("default"), metadataDbi, shardManager);
NativeSplitManager nativeSplitManager = new NativeSplitManager(nodeManager, shardManager, nativeMetadata);
NativeConnectorFactory nativeConnectorFactory = new NativeConnectorFactory(nativeMetadata, nativeSplitManager, nativeDataStreamProvider, nativeRecordSinkProvider);
BlockEncodingSerde blockEncodingSerde = createTestingBlockEncodingManager();
TypeManager typeManager = new TypeRegistry();

return nativeConnectorFactory;
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw Throwables.propagate(e);
return new RaptorConnectorFactory(config, nodeManager, blockEncodingSerde, typeManager);
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}

private static IDBI createDataSource(TypeRegistry typeRegistry, File databaseDir, String name)
throws Exception
{
H2EmbeddedDataSourceConfig dataSourceConfig = new H2EmbeddedDataSourceConfig();
dataSourceConfig.setFilename(new File(databaseDir, name).getAbsolutePath());
H2EmbeddedDataSource dataSource = new H2EmbeddedDataSource(dataSourceConfig);
DBI dbi = new DBI(dataSource);

dbi.registerMapper(new TableColumnMapper(typeRegistry));
dbi.registerMapper(new ColumnMetadataMapper(typeRegistry));
dbi.registerMapper(new NativePartitionKey.Mapper(typeRegistry));
return dbi;
}
}
11 changes: 0 additions & 11 deletions presto-docs/src/main/sphinx/installation/deployment.rst
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,6 @@ The following is a minimal configuration for the coordinator:
coordinator=true
datasources=jmx
http-server.http.port=8080
presto-metastore.db.type=h2
presto-metastore.db.filename=var/db/MetaStore
task.max-memory=1GB
discovery-server.enabled=true
discovery.uri=http:https://example.net:8080
Expand All @@ -130,8 +128,6 @@ And this is a minimal configuration for the workers:
coordinator=false
datasources=jmx,hive
http-server.http.port=8080
presto-metastore.db.type=h2
presto-metastore.db.filename=var/db/MetaStore
task.max-memory=1GB
discovery.uri=http:https://example.net:8080
Expand All @@ -149,13 +145,6 @@ These properties require some explanation:
Specifies the port for the HTTP server. Presto uses HTTP for all
communication, internal and external.

* ``presto-metastore.db.filename``:
The location of the local H2 database used for storing metadata.
Currently, this is mainly used by features that are still in
development and thus a local database suffices.
Also, this should only be needed by the coordinator, but currently
it is also required for workers.

* ``task.max-memory=1GB``:
The maximum amount of memory used by a single task
(a fragment of a query plan running on a specific node).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
import java.io.ObjectInputStream;
import java.util.Set;

import static com.google.common.base.Preconditions.checkNotNull;

/**
* MBeanServer wrapper that a ignores calls to registerMBean when there is already
* a MBean registered with the specified object name.
Expand All @@ -56,7 +58,7 @@ public class RebindSafeMBeanServer

public RebindSafeMBeanServer(MBeanServer mbeanServer)
{
this.mbeanServer = mbeanServer;
this.mbeanServer = checkNotNull(mbeanServer, "mbeanServer is null");
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package com.facebook.presto.jdbc;

import com.facebook.presto.server.testing.TestingPrestoServer;
import com.facebook.presto.tpch.TpchPlugin;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import io.airlift.log.Logging;
Expand Down Expand Up @@ -60,6 +61,8 @@ public void setup()
{
Logging.initialize();
server = new TestingPrestoServer();
server.installPlugin(new TpchPlugin());
server.createConnection("default", "tpch"); // TODO: change catalog name
}

@AfterClass
Expand Down Expand Up @@ -197,15 +200,15 @@ public void testGetSchemas()
{
try (Connection connection = createConnection()) {
try (ResultSet rs = connection.getMetaData().getSchemas()) {
assertGetSchemasResult(rs, 2);
assertGetSchemasResult(rs, 11);
}

try (ResultSet rs = connection.getMetaData().getSchemas(null, null)) {
assertGetSchemasResult(rs, 2);
assertGetSchemasResult(rs, 11);
}

try (ResultSet rs = connection.getMetaData().getSchemas(TEST_CATALOG, null)) {
assertGetSchemasResult(rs, 2);
assertGetSchemasResult(rs, 11);
}

try (ResultSet rs = connection.getMetaData().getSchemas("", null)) {
Expand All @@ -225,8 +228,8 @@ public void testGetSchemas()
assertGetSchemasResult(rs, 1);
}

try (ResultSet rs = connection.getMetaData().getSchemas(null, "%s%")) {
assertGetSchemasResult(rs, 2);
try (ResultSet rs = connection.getMetaData().getSchemas(null, "sf%")) {
assertGetSchemasResult(rs, 8);
}

try (ResultSet rs = connection.getMetaData().getSchemas("unknown", null)) {
Expand Down
19 changes: 10 additions & 9 deletions presto-main/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,6 @@
<artifactId>discovery</artifactId>
</dependency>

<dependency>
<groupId>io.airlift</groupId>
<artifactId>dbpool</artifactId>
</dependency>

<dependency>
<groupId>io.airlift</groupId>
<artifactId>event</artifactId>
Expand Down Expand Up @@ -179,13 +174,13 @@
</dependency>

<dependency>
<groupId>org.jdbi</groupId>
<artifactId>jdbi</artifactId>
<groupId>org.iq80.snappy</groupId>
<artifactId>snappy</artifactId>
</dependency>

<dependency>
<groupId>org.iq80.snappy</groupId>
<artifactId>snappy</artifactId>
<groupId>com.intellij</groupId>
<artifactId>annotations</artifactId>
</dependency>

<dependency>
Expand All @@ -206,6 +201,12 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.jdbi</groupId>
<artifactId>jdbi</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.airlift</groupId>
<artifactId>bootstrap</artifactId>
Expand Down
Loading

0 comments on commit 6b98ecc

Please sign in to comment.