Skip to content

Commit

Permalink
Add fast smoke test for distributed Cassandra
Browse files Browse the repository at this point in the history
Disable full Cassandra distrbuted tests in normal build as they are simply too slow
  • Loading branch information
dain committed Jun 27, 2014
1 parent 5e80dfb commit c66fc6d
Show file tree
Hide file tree
Showing 7 changed files with 303 additions and 9 deletions.
29 changes: 29 additions & 0 deletions presto-cassandra/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,12 @@
<artifactId>javax.servlet-api</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.intellij</groupId>
<artifactId>annotations</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand All @@ -334,6 +340,11 @@
<!--a hang in Thrift calls or broken sockets.-->
<parallel></parallel>
<threadCount>1</threadCount>

<!-- integration tests take a very long time so only run them in the CI server -->
<excludes>
<exclude>**/TestCassandraDistributed.java</exclude>
</excludes>
</configuration>
</plugin>

Expand Down Expand Up @@ -387,4 +398,22 @@
</plugin>
</plugins>
</build>

<profiles>
<profile>
<id>test-cassandra</id>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration combine.self="override">
<excludes>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</profile>
</profiles>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -296,8 +296,8 @@ public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, Con
@Override
public void commitCreateTable(ConnectorOutputTableHandle tableHandle, Collection<String> fragments)
{
checkNotNull(tableHandle, "tableHandle is null");
checkArgument(tableHandle instanceof CassandraOutputTableHandle, "tableHandle is not an instance of CassandraOutputTableHandle");
CassandraOutputTableHandle outputTableHandle = checkType(tableHandle, CassandraOutputTableHandle.class, "tableHandle");
schemaProvider.flushTable(new SchemaTableName(outputTableHandle.getSchemaName(), outputTableHandle.getTableName()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,6 @@ public void setup()
throws Exception
{
EmbeddedCassandraServerHelper.startEmbeddedCassandra();
EmbeddedCassandraServerHelper.cleanEmbeddedCassandra();

createTestData("Presto_Database", "Presto_Test");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ private static QueryRunner createQueryRunner()
throws Exception
{
EmbeddedCassandraServerHelper.startEmbeddedCassandra();
EmbeddedCassandraServerHelper.cleanEmbeddedCassandra();
TestCassandraConnector.createOrReplaceKeyspace("tpch");
TestCassandraConnector.createOrReplaceKeyspace("tpch_sampled");

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.cassandra;

import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.testing.QueryRunner;
import com.facebook.presto.tests.AbstractTestIntegrationSmokeTest;
import com.facebook.presto.tests.DistributedQueryRunner;
import com.facebook.presto.tpch.TpchMetadata;
import com.facebook.presto.tpch.TpchPlugin;
import com.facebook.presto.tpch.testing.SampledTpchPlugin;
import com.google.common.collect.ImmutableMap;
import io.airlift.log.Logger;
import org.cassandraunit.utils.EmbeddedCassandraServerHelper;
import org.testng.annotations.AfterClass;
import org.testng.annotations.Test;

import static com.facebook.presto.spi.type.TimeZoneKey.UTC_KEY;
import static com.facebook.presto.tests.QueryAssertions.copyTable;
import static io.airlift.units.Duration.nanosSince;
import static java.util.Locale.ENGLISH;
import static java.util.concurrent.TimeUnit.SECONDS;

@Test(singleThreaded = true)
public class TestCassandraIntegrationSmokeTest
extends AbstractTestIntegrationSmokeTest
{
private static final Logger log = Logger.get("TestQueries");
private static final String TPCH_SAMPLED_SCHEMA = "tpch_sampled";

public TestCassandraIntegrationSmokeTest()
throws Exception
{
super(createQueryRunner(), createSession(TPCH_SAMPLED_SCHEMA));
}

@AfterClass(alwaysRun = true)
public void destroy()
{
queryRunner.close();
}

private static QueryRunner createQueryRunner()
throws Exception
{
EmbeddedCassandraServerHelper.startEmbeddedCassandra();
TestCassandraConnector.createOrReplaceKeyspace("tpch");
TestCassandraConnector.createOrReplaceKeyspace("tpch_sampled");

DistributedQueryRunner queryRunner = new DistributedQueryRunner(createSession("tpch"), 4);

queryRunner.installPlugin(new TpchPlugin());
queryRunner.createCatalog("tpch", "tpch");

queryRunner.installPlugin(new SampledTpchPlugin());
queryRunner.createCatalog("tpch_sampled", "tpch_sampled");

queryRunner.installPlugin(new CassandraPlugin());
queryRunner.createCatalog("cassandra", "cassandra", ImmutableMap.of(
"cassandra.contact-points", "localhost",
"cassandra.native-protocol-port", "9142",
"cassandra.allow-drop-table", "true"));

log.info("Loading data...");
long startTime = System.nanoTime();
copyTable(queryRunner, "tpch", TpchMetadata.TINY_SCHEMA_NAME, "orders", createSession("tpch"));
copyTable(queryRunner, "tpch_sampled", TpchMetadata.TINY_SCHEMA_NAME, "orders", createSession(TPCH_SAMPLED_SCHEMA));
log.info("Loading complete in %s", nanosSince(startTime).toString(SECONDS));

return queryRunner;
}

private static ConnectorSession createSession(String schema)
{
return new ConnectorSession("user", "test", "cassandra", schema, UTC_KEY, ENGLISH, null, null);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.tests;

import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.testing.MaterializedResult;
import com.facebook.presto.testing.QueryRunner;
import org.testng.annotations.Test;

import static com.facebook.presto.spi.type.BooleanType.BOOLEAN;
import static com.facebook.presto.spi.type.VarcharType.VARCHAR;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;

public abstract class AbstractTestIntegrationSmokeTest
extends AbstractTestQueryFramework
{
private final ConnectorSession sampledSession;

public AbstractTestIntegrationSmokeTest(QueryRunner queryRunner, ConnectorSession sampledSession)
{
super(queryRunner);
this.sampledSession = checkNotNull(sampledSession, "sampledSession is null");
}

@Test
public void testAggregateSingleColumn()
throws Exception
{
assertQuery("SELECT SUM(orderkey) FROM ORDERS");
assertQuery("SELECT SUM(totalprice) FROM ORDERS");
assertQuery("SELECT MAX(comment) FROM ORDERS");
}

@Test
public void testApproximateQuerySum()
throws Exception
{
assertApproximateQuery(sampledSession, "SELECT SUM(totalprice) FROM orders APPROXIMATE AT 99.999 CONFIDENCE", "SELECT 2 * SUM(totalprice) FROM orders");
}

@Test
public void testColumnsInReverseOrder()
throws Exception
{
assertQuery("SELECT shippriority, clerk, totalprice FROM ORDERS");
}

@Test
public void testCountAll()
throws Exception
{
assertQuery("SELECT COUNT(*) FROM ORDERS");
}

@Test
public void testExactPredicate()
throws Exception
{
assertQuery("SELECT * FROM ORDERS WHERE orderkey = 10");
}

@Test
public void testInListPredicate()
throws Exception
{
assertQuery("SELECT * FROM ORDERS WHERE orderkey IN (10, 11, 20, 21)");
}

@Test
public void testIsNullPredicate()
throws Exception
{
assertQuery("SELECT * FROM ORDERS WHERE orderkey = 10 OR orderkey IS NULL");
}

@Test
public void testMultipleRangesPredicate()
throws Exception
{
assertQuery("SELECT * FROM ORDERS WHERE orderkey BETWEEN 10 AND 50 or orderkey BETWEEN 100 AND 150");
}

@Test
public void testRangePredicate()
throws Exception
{
assertQuery("SELECT * FROM ORDERS WHERE orderkey BETWEEN 10 AND 50");
}

@Test
public void testSelectAll()
throws Exception
{
assertQuery("SELECT * FROM ORDERS");
}

@Test
public void testTableSampleSystem()
throws Exception
{
int total = computeActual("SELECT orderkey FROM orders").getMaterializedRows().size();

boolean sampleSizeFound = false;
for (int i = 0; i < 100; i++) {
int sampleSize = computeActual("SELECT orderkey FROM ORDERS TABLESAMPLE SYSTEM (50)").getMaterializedRows().size();
if (sampleSize > 0 && sampleSize < total) {
sampleSizeFound = true;
break;
}
}
assertTrue(sampleSizeFound, "Table sample returned unexpected number of rows");
}

@Test
public void testShowSchemas()
throws Exception
{
MaterializedResult actualSchemas = computeActual("SHOW SCHEMAS").toJdbcTypes();
MaterializedResult expectedSchemas = MaterializedResult.resultBuilder(queryRunner.getDefaultSession(), VARCHAR)
.row("tpch")
.row("tpch_sampled")
.build();
assertTrue(actualSchemas.getMaterializedRows().containsAll(expectedSchemas.getMaterializedRows()));
}

@Test
public void testShowTables()
throws Exception
{
MaterializedResult actualTables = computeActual("SHOW TABLES").toJdbcTypes();
MaterializedResult expectedTables = MaterializedResult.resultBuilder(queryRunner.getDefaultSession(), VARCHAR)
.row("orders")
.build();
assertEquals(actualTables, expectedTables);
}

@Test
public void testDescribeTable()
throws Exception
{
MaterializedResult actualColumns = computeActual("DESC ORDERS").toJdbcTypes();
MaterializedResult expectedColumns = MaterializedResult.resultBuilder(queryRunner.getDefaultSession(), VARCHAR, VARCHAR, BOOLEAN, BOOLEAN)
.row("orderkey", "bigint", true, false)
.row("custkey", "bigint", true, false)
.row("orderstatus", "varchar", true, false)
.row("totalprice", "double", true, false)
.row("orderdate", "varchar", true, false)
.row("orderpriority", "varchar", true, false)
.row("clerk", "varchar", true, false)
.row("shippriority", "bigint", true, false)
.row("comment", "varchar", true, false)
.build();
assertEquals(actualColumns, expectedColumns);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -131,11 +131,22 @@ public static void copyAllTables(QueryRunner queryRunner, String sourceCatalog,
if (table.getTableName().equalsIgnoreCase("dual")) {
continue;
}

log.info("Running import for %s", table.getTableName());
@Language("SQL") String sql = format("CREATE TABLE %s AS SELECT * FROM %s", table.getTableName(), table);
long rows = checkType(queryRunner.execute(session, sql).getMaterializedRows().get(0).getField(0), Long.class, "rows");
log.info("Imported %s rows for %s", rows, table.getTableName());
copyTable(queryRunner, table, session);
}
}

public static void copyTable(QueryRunner queryRunner, String sourceCatalog, String sourceSchema, String sourceTable, ConnectorSession session)
throws Exception
{
QualifiedTableName table = new QualifiedTableName(sourceCatalog, sourceSchema, sourceTable);
copyTable(queryRunner, table, session);
}

public static void copyTable(QueryRunner queryRunner, QualifiedTableName table, ConnectorSession session)
{
log.info("Running import for %s", table.getTableName());
@Language("SQL") String sql = format("CREATE TABLE %s AS SELECT * FROM %s", table.getTableName(), table);
long rows = checkType(queryRunner.execute(session, sql).getMaterializedRows().get(0).getField(0), Long.class, "rows");
log.info("Imported %s rows for %s", rows, table.getTableName());
}
}

0 comments on commit c66fc6d

Please sign in to comment.