Skip to content

Commit

Permalink
Support multiple ranges in domain when retrieving partitions
Browse files Browse the repository at this point in the history
  • Loading branch information
alexoss68 authored and dain committed Jun 26, 2014
1 parent 09ee1b9 commit 3fcf0b4
Show file tree
Hide file tree
Showing 4 changed files with 119 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.util.concurrent.ExecutorService;

import static com.facebook.presto.cassandra.RetryDriver.retry;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static java.util.concurrent.TimeUnit.MILLISECONDS;

Expand Down Expand Up @@ -268,17 +269,20 @@ public CassandraTable call()
});
}

public List<CassandraPartition> getPartitions(CassandraTable table, List<Comparable<?>> filterPrefix)
public List<CassandraPartition> getAllPartitions(CassandraTable table)
{
LoadingCache<PartitionListKey, List<CassandraPartition>> cache;
if (filterPrefix.size() == table.getPartitionKeyColumns().size()) {
cache = partitionsCacheFull;
}
else {
cache = partitionsCache;
}
PartitionListKey key = new PartitionListKey(table, filterPrefix);
return getCacheValue(cache, key, RuntimeException.class);
PartitionListKey key = new PartitionListKey(table, ImmutableList.<Comparable<?>>of());
return getCacheValue(partitionsCache, key, RuntimeException.class);
}

public List<CassandraPartition> getPartitions(CassandraTable table, List<Comparable<?>> partitionKeys)
{
checkNotNull(table, "table is null");
checkNotNull(partitionKeys, "partitionKeys is null");
checkArgument(partitionKeys.size() == table.getPartitionKeyColumns().size());

PartitionListKey key = new PartitionListKey(table, partitionKeys);
return getCacheValue(partitionsCacheFull, key, RuntimeException.class);
}

private List<CassandraPartition> loadPartitions(final PartitionListKey key)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import com.google.common.collect.ImmutableList;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -283,17 +284,24 @@ protected Iterable<Row> queryPartitionKeys(CassandraTable table, List<Comparable
Select partitionKeys = CassandraCqlUtils.selectDistinctFrom(tableHandle, partitionKeyColumns);
partitionKeys.limit(limit);
partitionKeys.setFetchSize(fetchSizeForPartitionKeySelect);
addWhereClause(partitionKeys.where(), partitionKeyColumns, filterPrefix);
ResultSetFuture partitionKeyFuture = session.executeAsync(partitionKeys);

if (!fullPartitionKey) {
addWhereClause(partitionKeys.where(), partitionKeyColumns, new ArrayList<Comparable<?>>());
ResultSetFuture partitionKeyFuture = session.executeAsync(partitionKeys);
long count = countFuture.getUninterruptibly().one().getLong(0);
if (count == limitForPartitionKeySelect) {
partitionKeyFuture.cancel(true);
return null; // too much effort to query all partition keys
}
else {
return partitionKeyFuture.getUninterruptibly();
}
}
else {
addWhereClause(partitionKeys.where(), partitionKeyColumns, filterPrefix);
ResultSetFuture partitionKeyFuture = session.executeAsync(partitionKeys);
return partitionKeyFuture.getUninterruptibly();
}
return partitionKeyFuture.getUninterruptibly();
}

private static void addWhereClause(Where where, List<CassandraColumnHandle> partitionKeyColumns, List<Comparable<?>> filterPrefix)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,28 +26,34 @@
import com.facebook.presto.spi.Domain;
import com.facebook.presto.spi.FixedSplitSource;
import com.facebook.presto.spi.HostAddress;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.Range;
import com.facebook.presto.spi.TupleDomain;
import com.google.common.base.Objects;
import com.google.common.base.Predicate;
import com.google.common.base.Throwables;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ImmutableSet.Builder;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.inject.Inject;
import io.airlift.log.Logger;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;

import static com.facebook.presto.cassandra.util.Types.checkType;
import static com.facebook.presto.spi.StandardErrorCode.EXTERNAL;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Predicates.in;
Expand All @@ -63,52 +69,34 @@ public class CassandraSplitManager
private final CachingCassandraSchemaProvider schemaProvider;
private final int partitionSizeForBatchSelect;
private final CassandraTokenSplitManager tokenSplitMgr;
private final ListeningExecutorService executor;

@Inject
public CassandraSplitManager(CassandraConnectorId connectorId,
CassandraClientConfig cassandraClientConfig,
CassandraSession cassandraSession,
CachingCassandraSchemaProvider schemaProvider,
CassandraTokenSplitManager tokenSplitMgr)
CassandraTokenSplitManager tokenSplitMgr,
@ForCassandra ExecutorService executor)
{
this.connectorId = checkNotNull(connectorId, "connectorId is null").toString();
this.schemaProvider = checkNotNull(schemaProvider, "schemaProvider is null");
this.cassandraSession = checkNotNull(cassandraSession, "cassandraSession is null");
this.partitionSizeForBatchSelect = cassandraClientConfig.getPartitionSizeForBatchSelect();
this.tokenSplitMgr = tokenSplitMgr;
this.executor = MoreExecutors.listeningDecorator(executor);
}

@Override
public ConnectorPartitionResult getPartitions(ConnectorTableHandle tableHandle, TupleDomain<ConnectorColumnHandle> tupleDomain)
{
CassandraTableHandle cassandraTableHandle = checkType(tableHandle, CassandraTableHandle.class, "tableHandle");
checkNotNull(tupleDomain, "tupleDomain is null");

CassandraTable table = schemaProvider.getTable(cassandraTableHandle);

List<CassandraColumnHandle> partitionKeys = table.getPartitionKeyColumns();
List<Comparable<?>> filterPrefix = new ArrayList<>();
for (int i = 0; i < partitionKeys.size(); i++) {
CassandraColumnHandle columnHandle = partitionKeys.get(i);

// only add to prefix if all previous keys have a value
if (filterPrefix.size() == i && !tupleDomain.isNone()) {
Domain domain = tupleDomain.getDomains().get(columnHandle);
if (domain != null && domain.getRanges().getRangeCount() == 1) {
// We intentionally ignore whether NULL is in the domain since partition keys can never be NULL
Range range = Iterables.getOnlyElement(domain.getRanges());
if (range.isSingleValue()) {
Comparable<?> value = range.getLow().getValue();
checkArgument(value instanceof Boolean || value instanceof String || value instanceof Double || value instanceof Long,
"Only Boolean, String, Double and Long partition keys are supported");
filterPrefix.add(value);
}
}
}
}

// fetch the partitions
List<CassandraPartition> allPartitions = schemaProvider.getPartitions(table, filterPrefix);
List<CassandraPartition> allPartitions = getCassandraPartitions(table, tupleDomain);
log.debug("%s.%s #partitions: %d", cassandraTableHandle.getSchemaName(), cassandraTableHandle.getTableName(), allPartitions.size());

// do a final pass to filter based on fields that could not be used to build the prefix
Expand Down Expand Up @@ -159,6 +147,83 @@ public ConnectorPartitionResult getPartitions(ConnectorTableHandle tableHandle,
return new ConnectorPartitionResult(partitions, remainingTupleDomain);
}

private List<CassandraPartition> getCassandraPartitions(final CassandraTable table, TupleDomain<ConnectorColumnHandle> tupleDomain)
{
if (tupleDomain.isNone()) {
return ImmutableList.of();
}

Set<List<Comparable<?>>> partitionKeysSet = getPartitionKeysSet(table, tupleDomain);

// empty filter means, all partitions
if (partitionKeysSet.isEmpty()) {
return schemaProvider.getAllPartitions(table);
}

ImmutableList.Builder<ListenableFuture<List<CassandraPartition>>> getPartitionResults = ImmutableList.builder();
for (final List<Comparable<?>> partitionKeys : partitionKeysSet) {
getPartitionResults.add(executor.submit(new Callable<List<CassandraPartition>>()
{
@Override
public List<CassandraPartition> call()
{
return schemaProvider.getPartitions(table, partitionKeys);
}
}));
}

ImmutableList.Builder<CassandraPartition> partitions = ImmutableList.builder();
for (ListenableFuture<List<CassandraPartition>> result : getPartitionResults.build()) {
try {
partitions.addAll(result.get());
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw Throwables.propagate(e);
}
catch (ExecutionException e) {
throw new PrestoException(EXTERNAL.toErrorCode(), "Error fetching cassandra partitions", e);
}
}

return partitions.build();
}

private static Set<List<Comparable<?>>> getPartitionKeysSet(CassandraTable table, TupleDomain<ConnectorColumnHandle> tupleDomain)
{
ImmutableList.Builder<Set<Comparable<?>>> partitionColumnValues = ImmutableList.builder();
for (CassandraColumnHandle columnHandle : table.getPartitionKeyColumns()) {
Domain domain = tupleDomain.getDomains().get(columnHandle);

// if there is no constraint on a partition key, return an empty set
if (domain == null) {
return ImmutableSet.of();
}

// todo does cassandra allow null partition keys?
if (domain.isNullAllowed()) {
return ImmutableSet.of();
}

ImmutableSet.Builder<Comparable<?>> columnValues = ImmutableSet.builder();
for (Range range : domain.getRanges()) {
// if the range is not a single value, we can not perform partition pruning
if (!range.isSingleValue()) {
return ImmutableSet.of();
}
Comparable<?> value = range.getSingleValue();

// todo should we just skip partition pruning instead of throwing an exception?
checkArgument(value instanceof Boolean || value instanceof String || value instanceof Double || value instanceof Long,
"Only Boolean, String, Double and Long partition keys are supported");

columnValues.add(value);
}
partitionColumnValues.add(columnValues.build());
}
return Sets.cartesianProduct(partitionColumnValues.build());
}

@Override
public ConnectorSplitSource getPartitionSplits(ConnectorTableHandle tableHandle, List<ConnectorPartition> partitions)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import java.util.List;
import java.util.concurrent.TimeUnit;

import static com.facebook.presto.cassandra.MockCassandraSession.BAD_SCHEMA;
Expand Down Expand Up @@ -130,16 +129,15 @@ public void testGetPartitions()

String expectedList = "[column1 = 'testpartition1', column1 = 'testpartition2']";

List<Comparable<?>> empty = ImmutableList.of();
assertEquals(mockSession.getAccessCount(), 1);
assertEquals(expectedList, schemaProvider.getPartitions(table, empty).toString());
assertEquals(expectedList, schemaProvider.getAllPartitions(table).toString());
assertEquals(mockSession.getAccessCount(), 2);
assertEquals(expectedList, schemaProvider.getPartitions(table, empty).toString());
assertEquals(expectedList, schemaProvider.getAllPartitions(table).toString());
assertEquals(mockSession.getAccessCount(), 2);

schemaProvider.flushCache();

assertEquals(expectedList, schemaProvider.getPartitions(table, empty).toString());
assertEquals(expectedList, schemaProvider.getAllPartitions(table).toString());
assertEquals(mockSession.getAccessCount(), 3);
}

Expand Down

0 comments on commit 3fcf0b4

Please sign in to comment.