Skip to content

Commit

Permalink
Run integration test suite on cassandra
Browse files Browse the repository at this point in the history
  • Loading branch information
dain committed Jun 26, 2014
1 parent 9de9c0e commit 4126867
Show file tree
Hide file tree
Showing 6 changed files with 190 additions and 24 deletions.
51 changes: 51 additions & 0 deletions presto-cassandra/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,12 @@
<scope>provided</scope>
</dependency>

<dependency>
<groupId>io.airlift</groupId>
<artifactId>concurrent</artifactId>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>io.airlift</groupId>
<artifactId>log</artifactId>
Expand Down Expand Up @@ -218,6 +224,38 @@
</dependency>

<!-- for testing -->
<dependency>
<groupId>com.facebook.presto</groupId>
<artifactId>presto-main</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</exclusion>
<exclusion>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>jul-to-slf4j</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>com.facebook.presto</groupId>
<artifactId>presto-tests</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.facebook.presto</groupId>
<artifactId>presto-tpch</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.cassandraunit</groupId>
<artifactId>cassandra-unit</artifactId>
Expand Down Expand Up @@ -286,6 +324,19 @@

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<!--Integrations tests fail when parallel, due-->
<!--to a bug or configuration error in the embedded-->
<!--cassandra instance. This problem results in either-->
<!--a hang in Thrift calls or broken sockets.-->
<parallel></parallel>
<threadCount>1</threadCount>
</configuration>
</plugin>

<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@
*/
package com.facebook.presto.cassandra;

import com.facebook.presto.spi.ConnectorColumnHandle;
import com.facebook.presto.spi.ColumnMetadata;
import com.facebook.presto.spi.Connector;
import com.facebook.presto.spi.ConnectorColumnHandle;
import com.facebook.presto.spi.ConnectorHandleResolver;
import com.facebook.presto.spi.ConnectorMetadata;
import com.facebook.presto.spi.ConnectorPartitionResult;
Expand Down Expand Up @@ -73,11 +73,11 @@
import static com.google.common.base.Preconditions.checkArgument;
import static io.airlift.testing.Assertions.assertInstanceOf;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

@Test(singleThreaded = true)
public class TestCassandraConnector
{
private static final ConnectorSession SESSION = new ConnectorSession("user", "test", "catalog", "test", UTC_KEY, Locale.ENGLISH, null, null);
Expand All @@ -91,6 +91,8 @@ public class TestCassandraConnector
protected SchemaTableName table;
protected SchemaTableName tableUnpartitioned;
protected SchemaTableName invalidTable;
private static final String CLUSTER_NAME = "TestCluster";
private static final String HOST = "localhost:9160";

@BeforeClass
public void setup()
Expand All @@ -99,7 +101,7 @@ public void setup()
EmbeddedCassandraServerHelper.startEmbeddedCassandra();
EmbeddedCassandraServerHelper.cleanEmbeddedCassandra();

createTestData();
createTestData("Presto_Database", "Presto_Test");

String connectorId = "cassandra-test";
CassandraConnectorFactory connectorFactory = new CassandraConnectorFactory(
Expand Down Expand Up @@ -294,18 +296,15 @@ private static ImmutableMap<String, Integer> indexColumns(List<ConnectorColumnHa
return index.build();
}

public static void createTestData()
public static Keyspace createOrReplaceKeyspace(String keyspaceName)
{
String clusterName = "TestCluster";
String host = "localhost:9160";
return createOrReplaceKeyspace(keyspaceName, ImmutableList.<ColumnFamilyDefinition>of());
}

Cluster cluster = HFactory.getOrCreateCluster(clusterName, host);
Keyspace keyspace = HFactory.createKeyspace("beautifulKeyspaceName", cluster);
assertNotNull(keyspace);
public static Keyspace createOrReplaceKeyspace(String keyspaceName, List<ColumnFamilyDefinition> columnFamilyDefinitions)
{
Cluster cluster = getOrCreateCluster();

String keyspaceName = "Presto_Database";
String columnFamilyName = "Presto_Test";
List<ColumnFamilyDefinition> columnFamilyDefinitions = createColumnFamilyDefinitions(keyspaceName, columnFamilyName);
KeyspaceDefinition keyspaceDefinition = HFactory.createKeyspaceDefinition(
keyspaceName,
StrategyModel.SIMPLE_STRATEGY.value(),
Expand All @@ -316,7 +315,14 @@ public static void createTestData()
cluster.dropKeyspace(keyspaceName, true);
}
cluster.addKeyspace(keyspaceDefinition, true);
keyspace = HFactory.createKeyspace(keyspaceName, cluster);
return HFactory.createKeyspace(keyspaceName, cluster);
}

public static void createTestData(String keyspaceName, String columnFamilyName)
{
List<ColumnFamilyDefinition> columnFamilyDefinitions = createColumnFamilyDefinitions(keyspaceName, columnFamilyName);
Keyspace keyspace = createOrReplaceKeyspace(keyspaceName, columnFamilyDefinitions);

Mutator<String> mutator = HFactory.createMutator(keyspace, StringSerializer.get());

long timestamp = System.currentTimeMillis();
Expand All @@ -326,6 +332,11 @@ public static void createTestData()
mutator.execute();
}

private static Cluster getOrCreateCluster()
{
return HFactory.getOrCreateCluster(CLUSTER_NAME, HOST);
}

private static void addRow(String columnFamilyName, Mutator<String> mutator, long timestamp, int rowNumber)
{
String key = String.format("key %04d", rowNumber);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.cassandra;

import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.testing.QueryRunner;
import com.facebook.presto.tests.AbstractTestDistributedQueries;
import com.facebook.presto.tests.DistributedQueryRunner;
import com.facebook.presto.tpch.TpchMetadata;
import com.facebook.presto.tpch.TpchPlugin;
import com.facebook.presto.tpch.testing.SampledTpchPlugin;
import com.google.common.collect.ImmutableMap;
import io.airlift.log.Logger;
import org.cassandraunit.utils.EmbeddedCassandraServerHelper;
import org.testng.annotations.AfterClass;
import org.testng.annotations.Test;

import static com.facebook.presto.spi.type.TimeZoneKey.UTC_KEY;
import static io.airlift.units.Duration.nanosSince;
import static java.util.Locale.ENGLISH;
import static java.util.concurrent.TimeUnit.SECONDS;

@Test(singleThreaded = true)
public class TestCassandraDistributed
extends AbstractTestDistributedQueries
{
private static final Logger log = Logger.get("TestQueries");
private static final String TPCH_SAMPLED_SCHEMA = "tpch_sampled";

public TestCassandraDistributed()
throws Exception
{
super(createQueryRunner(), createSession(TPCH_SAMPLED_SCHEMA));
}

@AfterClass(alwaysRun = true)
public void destroy()
{
queryRunner.close();
}

private static QueryRunner createQueryRunner()
throws Exception
{
EmbeddedCassandraServerHelper.startEmbeddedCassandra();
EmbeddedCassandraServerHelper.cleanEmbeddedCassandra();
TestCassandraConnector.createOrReplaceKeyspace("tpch");
TestCassandraConnector.createOrReplaceKeyspace("tpch_sampled");

DistributedQueryRunner queryRunner = new DistributedQueryRunner(createSession("tpch"), 4);

queryRunner.installPlugin(new TpchPlugin());
queryRunner.createCatalog("tpch", "tpch");

queryRunner.installPlugin(new SampledTpchPlugin());
queryRunner.createCatalog("tpch_sampled", "tpch_sampled");

queryRunner.installPlugin(new CassandraPlugin());
queryRunner.createCatalog("cassandra", "cassandra", ImmutableMap.of(
"cassandra.contact-points", "localhost",
"cassandra.native-protocol-port", "9142",
"cassandra.allow-drop-table", "true"));

log.info("Loading data...");
long startTime = System.nanoTime();
copyAllTables(queryRunner, "tpch", TpchMetadata.TINY_SCHEMA_NAME, createSession("tpch"));
copyAllTables(queryRunner, "tpch_sampled", TpchMetadata.TINY_SCHEMA_NAME, createSession(TPCH_SAMPLED_SCHEMA));
log.info("Loading complete in %s", nanosSince(startTime).toString(SECONDS));

return queryRunner;
}

private static ConnectorSession createSession(String schema)
{
return new ConnectorSession("user", "test", "cassandra", schema, UTC_KEY, ENGLISH, null, null);
}

@Override
public void testView()
throws Exception
{
// Cassandra connector currently does not support views
}

@Override
public void testViewMetadata()
throws Exception
{
// Cassandra connector currently does not support views
}
}
12 changes: 6 additions & 6 deletions presto-cassandra/src/test/resources/cu-cassandra.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -410,20 +410,20 @@ compaction_preheat_key_cache: true
# stream_throughput_outbound_megabits_per_sec: 200

# How long the coordinator should wait for read operations to complete
read_request_timeout_in_ms: 5000
read_request_timeout_in_ms: 30000
# How long the coordinator should wait for seq or index scans to complete
range_request_timeout_in_ms: 10000
range_request_timeout_in_ms: 30000
# How long the coordinator should wait for writes to complete
write_request_timeout_in_ms: 2000
write_request_timeout_in_ms: 30000
# How long a coordinator should continue to retry a CAS operation
# that contends with other proposals for the same row
cas_contention_timeout_in_ms: 1000
cas_contention_timeout_in_ms: 30000
# How long the coordinator should wait for truncates to complete
# (This can be much longer, because unless auto_snapshot is disabled
# we need to flush first so we can snapshot before removing the data.)
truncate_request_timeout_in_ms: 60000
# The default timeout for other, miscellaneous operations
request_timeout_in_ms: 10000
request_timeout_in_ms: 30000

# Enable operation timeout information exchange between nodes to accurately
# measure request timeouts. If disabled, replicas will assume that requests
Expand Down Expand Up @@ -583,4 +583,4 @@ encryption_options:
# protocol: TLS
# algorithm: SunX509
# store_type: JKS
# cipher_suites: [TLS_RSA_WITH_AES_128_CBC_SHA,TLS_RSA_WITH_AES_256_CBC_SHA]
# cipher_suites: [TLS_RSA_WITH_AES_128_CBC_SHA,TLS_RSA_WITH_AES_256_CBC_SHA]
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import java.net.URI;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
Expand Down Expand Up @@ -129,10 +130,16 @@ public TestingPrestoServer(boolean coordinator, Map<String, String> properties,

Bootstrap app = new Bootstrap(modules.build());

Map<String, String> optionalProperties = new HashMap<>();
if (environment != null) {
optionalProperties.put("node.environment", environment);
}

Injector injector = app
.strictConfig()
.doNotInitializeLogging()
.setRequiredConfigurationProperties(serverProperties.build())
.setOptionalConfigurationProperties(optionalProperties)
.initialize();

injector.getInstance(Announcer.class).start();
Expand Down
5 changes: 0 additions & 5 deletions presto-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,6 @@
<artifactId>log</artifactId>
</dependency>

<dependency>
<groupId>io.airlift</groupId>
<artifactId>log-manager</artifactId>
</dependency>

<dependency>
<groupId>io.airlift</groupId>
<artifactId>node</artifactId>
Expand Down

0 comments on commit 4126867

Please sign in to comment.