Skip to content

Commit

Permalink
Pushdown index column predicate to CQL token range query
Browse files Browse the repository at this point in the history
  • Loading branch information
alexoss68 authored and dain committed Jun 26, 2014
1 parent ae71952 commit 115f252
Show file tree
Hide file tree
Showing 9 changed files with 91 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public class CassandraColumnHandle
private final List<CassandraType> typeArguments;
private final boolean partitionKey;
private final boolean clusteringKey;
private final boolean indexed;

@JsonCreator
public CassandraColumnHandle(
Expand All @@ -50,7 +51,8 @@ public CassandraColumnHandle(
@JsonProperty("cassandraType") CassandraType cassandraType,
@Nullable @JsonProperty("typeArguments") List<CassandraType> typeArguments,
@JsonProperty("partitionKey") boolean partitionKey,
@JsonProperty("clusteringKey") boolean clusteringKey)
@JsonProperty("clusteringKey") boolean clusteringKey,
@JsonProperty("indexed") boolean indexed)
{
this.connectorId = checkNotNull(connectorId, "connectorId is null");
this.name = checkNotNull(name, "name is null");
Expand All @@ -68,6 +70,7 @@ public CassandraColumnHandle(
}
this.partitionKey = partitionKey;
this.clusteringKey = clusteringKey;
this.indexed = indexed;
}

@JsonProperty
Expand Down Expand Up @@ -112,6 +115,12 @@ public boolean isClusteringKey()
return clusteringKey;
}

@JsonProperty
public boolean isIndexed()
{
return indexed;
}

public ColumnMetadata getColumnMetadata()
{
return new ColumnMetadata(CassandraCqlUtils.cqlNameToSqlName(name), cassandraType.getNativeType(), ordinalPosition, partitionKey);
Expand All @@ -132,7 +141,8 @@ public int hashCode()
cassandraType,
typeArguments,
partitionKey,
clusteringKey);
clusteringKey,
indexed);
}

@Override
Expand All @@ -151,7 +161,8 @@ public boolean equals(Object obj)
&& Objects.equal(this.cassandraType, other.cassandraType)
&& Objects.equal(this.typeArguments, other.typeArguments)
&& Objects.equal(this.partitionKey, other.partitionKey)
&& Objects.equal(this.clusteringKey, other.clusteringKey);
&& Objects.equal(this.clusteringKey, other.clusteringKey)
&& Objects.equal(this.indexed, other.indexed);
}

@Override
Expand All @@ -168,7 +179,8 @@ public String toString()
}

helper.add("partitionKey", partitionKey)
.add("clusteringKey", clusteringKey);
.add("clusteringKey", clusteringKey)
.add("indexed", indexed);

return helper.toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,32 +28,41 @@ public class CassandraPartition
private final String partitionId;
private final byte[] key;
private final TupleDomain<ConnectorColumnHandle> tupleDomain;
private final boolean indexedColumnPredicatePushdown;

private CassandraPartition()
{
partitionId = UNPARTITIONED_ID;
tupleDomain = TupleDomain.all();
key = null;
indexedColumnPredicatePushdown = false;
}

public CassandraPartition(byte[] key, String partitionId, TupleDomain<ConnectorColumnHandle> tupleDomain)
public CassandraPartition(byte[] key, String partitionId, TupleDomain<ConnectorColumnHandle> tupleDomain, boolean indexedColumnPredicatePushdown)
{
this.key = key;
this.partitionId = partitionId;
this.tupleDomain = tupleDomain;
this.indexedColumnPredicatePushdown = indexedColumnPredicatePushdown;
}

public boolean isUnpartitioned()
{
return partitionId.equals(UNPARTITIONED_ID);
}

public boolean isIndexedColumnPredicatePushdown()
{
return indexedColumnPredicatePushdown;
}

@Override
public TupleDomain<ConnectorColumnHandle> getTupleDomain()
{
return tupleDomain;
}

@Override
public String getPartitionId()
{
return partitionId;
Expand All @@ -69,4 +78,9 @@ public ByteBuffer getKeyAsByteBuffer()
{
return ByteBuffer.wrap(key);
}

public byte[] getKey()
{
return key;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,8 @@ private CassandraColumnHandle buildColumnHandle(ColumnMetadata columnMeta, boole
throw new IllegalArgumentException("Invalid type arguments: " + typeArgs);
}
}
return new CassandraColumnHandle(connectorId, columnMeta.getName(), index, cassandraType, typeArguments, partitionKey, clusteringKey);
boolean indexed = columnMeta.getIndex() != null;
return new CassandraColumnHandle(connectorId, columnMeta.getName(), index, cassandraType, typeArguments, partitionKey, clusteringKey, indexed);
}

public List<CassandraPartition> getPartitions(CassandraTable table, List<Comparable<?>> filterPrefix)
Expand Down Expand Up @@ -256,7 +257,7 @@ public List<CassandraPartition> getPartitions(CassandraTable table, List<Compara
TupleDomain<ConnectorColumnHandle> tupleDomain = TupleDomain.withFixedValues(map);
String partitionId = stringBuilder.toString();
if (uniquePartitionIds.add(partitionId)) {
partitions.add(new CassandraPartition(key, partitionId, tupleDomain));
partitions.add(new CassandraPartition(key, partitionId, tupleDomain, false));
}
}
return partitions.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package com.facebook.presto.cassandra;

import com.datastax.driver.core.Host;
import com.facebook.presto.cassandra.util.CassandraCqlUtils;
import com.facebook.presto.cassandra.util.HostAddressFactory;
import com.facebook.presto.spi.ConnectorColumnHandle;
import com.facebook.presto.spi.ConnectorPartition;
Expand All @@ -34,6 +35,7 @@
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ImmutableSet.Builder;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.inject.Inject;
Expand Down Expand Up @@ -123,6 +125,32 @@ public ConnectorPartitionResult getPartitions(ConnectorTableHandle tableHandle,
remainingTupleDomain = TupleDomain.withColumnDomains(Maps.filterKeys(tupleDomain.getDomains(), not(in(partitionColumns))));
}

// push down indexed column fixed value predicates only for unpartitioned partition which uses token range query
if (partitions.size() == 1 && ((CassandraPartition) partitions.get(0)).isUnpartitioned()) {
Map<ConnectorColumnHandle, Domain> domains = tupleDomain.getDomains();
List<ConnectorColumnHandle> indexedColumns = Lists.newArrayList();
// compose partitionId by using indexed column
StringBuilder sb = new StringBuilder();
for (Map.Entry<ConnectorColumnHandle, Domain> entry : domains.entrySet()) {
CassandraColumnHandle column = (CassandraColumnHandle) entry.getKey();
Domain domain = entry.getValue();
if (column.isIndexed() && domain.isSingleValue()) {
sb.append(CassandraCqlUtils.validColumnName(column.getName()))
.append(" = ")
.append(CassandraCqlUtils.cqlValue(entry.getValue().getSingleValue().toString(), column.getCassandraType()));
indexedColumns.add(column);
// Only one indexed column predicate can be pushed down.
break;
}
}
if (sb.length() > 0) {
CassandraPartition partition = (CassandraPartition) partitions.get(0);
TupleDomain<ConnectorColumnHandle> filterIndexedColumn = TupleDomain.withColumnDomains(Maps.filterKeys(remainingTupleDomain.getDomains(), not(in(indexedColumns))));
partitions = Lists.newArrayList();
partitions.add(new CassandraPartition(partition.getKey(), sb.toString(), filterIndexedColumn, true));
return new ConnectorPartitionResult(partitions, filterIndexedColumn);
}
}
return new ConnectorPartitionResult(partitions, remainingTupleDomain);
}

Expand All @@ -142,7 +170,7 @@ public ConnectorSplitSource getPartitionSplits(ConnectorTableHandle tableHandle,
ConnectorPartition partition = partitions.get(0);
CassandraPartition cassandraPartition = checkType(partition, CassandraPartition.class, "partition");

if (cassandraPartition.isUnpartitioned()) {
if (cassandraPartition.isUnpartitioned() || cassandraPartition.isIndexedColumnPredicatePushdown()) {
CassandraTable table = schemaProvider.getTable(cassandraTableHandle);
List<ConnectorSplit> splits = getSplitsByTokenRange(table, cassandraPartition.getPartitionId());
return new FixedSplitSource(connectorId, splits);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.facebook.presto.cassandra.CassandraTableHandle;
import com.facebook.presto.spi.ConnectorColumnHandle;
import com.fasterxml.jackson.core.io.JsonStringEncoder;
import com.facebook.presto.cassandra.CassandraType;

import java.util.Arrays;
import java.util.HashSet;
Expand Down Expand Up @@ -159,4 +160,19 @@ public static Select selectCountAllFrom(CassandraTableHandle tableHandle)
String table = validTableName(tableHandle.getTableName());
return QueryBuilder.select().countAll().from(schema, table);
}

public static String cqlValue(String value, CassandraType cassandraType)
{
switch (cassandraType) {
case ASCII:
case TEXT:
case VARCHAR:
return quoteStringLiteral(value);
case INET:
// remove '/' in the string. e.g. /127.0.0.1
return quoteStringLiteral(value.substring(1));
default:
return value;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,8 @@ public CassandraTable getTable(SchemaTableName tableName)
return new CassandraTable(
new CassandraTableHandle(connectorId, TEST_SCHEMA, TEST_TABLE),
ImmutableList.of(
new CassandraColumnHandle(connectorId, TEST_COLUMN1, 0, CassandraType.VARCHAR, null, true, false),
new CassandraColumnHandle(connectorId, TEST_COLUMN2, 0, CassandraType.INT, null, false, false)));
new CassandraColumnHandle(connectorId, TEST_COLUMN1, 0, CassandraType.VARCHAR, null, true, false, false),
new CassandraColumnHandle(connectorId, TEST_COLUMN2, 0, CassandraType.INT, null, false, false, false)));
}
throw new TableNotFoundException(tableName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public class TestCassandraColumnHandle
@Test
public void testRoundTrip()
{
CassandraColumnHandle expected = new CassandraColumnHandle("connector", "name", 42, CassandraType.FLOAT, null, true, false);
CassandraColumnHandle expected = new CassandraColumnHandle("connector", "name", 42, CassandraType.FLOAT, null, true, false, false);

String json = codec.toJson(expected);
CassandraColumnHandle actual = codec.fromJson(json);
Expand All @@ -50,7 +50,8 @@ public void testRoundTrip2()
CassandraType.MAP,
ImmutableList.of(CassandraType.VARCHAR, CassandraType.UUID),
false,
true);
true,
false);

String json = codec.toJson(expected);
CassandraColumnHandle actual = codec.fromJson(json);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public class TestJsonCassandraHandles
.put("cassandraType", "BIGINT")
.put("partitionKey", false)
.put("clusteringKey", true)
.put("indexed", false)
.build();

private static final Map<String, Object> COLUMN2_HANDLE_AS_MAP = ImmutableMap.<String, Object>builder()
Expand All @@ -52,6 +53,7 @@ public class TestJsonCassandraHandles
.put("typeArguments", ImmutableList.of("INT"))
.put("partitionKey", false)
.put("clusteringKey", false)
.put("indexed", false)
.build();

private final ObjectMapper objectMapper = new ObjectMapperProvider().get();
Expand Down Expand Up @@ -85,7 +87,7 @@ public void testTableHandleDeserialize()
public void testColumnHandleSerialize()
throws Exception
{
CassandraColumnHandle columnHandle = new CassandraColumnHandle("cassandra", "column", 42, CassandraType.BIGINT, null, false, true);
CassandraColumnHandle columnHandle = new CassandraColumnHandle("cassandra", "column", 42, CassandraType.BIGINT, null, false, true, false);

assertTrue(objectMapper.canSerialize(CassandraColumnHandle.class));
String json = objectMapper.writeValueAsString(columnHandle);
Expand All @@ -103,6 +105,7 @@ public void testColumn2HandleSerialize()
CassandraType.SET,
ImmutableList.of(CassandraType.INT),
false,
false,
false);

assertTrue(objectMapper.canSerialize(CassandraColumnHandle.class));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,9 @@ public void testQuoteJson()
public void testAppendSelectColumns()
{
List<CassandraColumnHandle> columns = ImmutableList.of(
new CassandraColumnHandle("", "foo", 0, CassandraType.VARCHAR, null, false, false),
new CassandraColumnHandle("", "bar", 0, CassandraType.VARCHAR, null, false, false),
new CassandraColumnHandle("", "table", 0, CassandraType.VARCHAR, null, false, false));
new CassandraColumnHandle("", "foo", 0, CassandraType.VARCHAR, null, false, false, false),
new CassandraColumnHandle("", "bar", 0, CassandraType.VARCHAR, null, false, false, false),
new CassandraColumnHandle("", "table", 0, CassandraType.VARCHAR, null, false, false, false));

StringBuilder sb = new StringBuilder();
CassandraCqlUtils.appendSelectColumns(sb, columns);
Expand Down

0 comments on commit 115f252

Please sign in to comment.