Skip to content

Commit

Permalink
Store column order and hidden flag in table comment
Browse files Browse the repository at this point in the history
Cassandra does not preserve column order in with created tables.  Column order is important
for correctness of SELECT * queries.  This change stores the column order and the hidden flag
in the comment field of the table.
  • Loading branch information
dain committed Jun 26, 2014
1 parent 54893f1 commit 9de9c0e
Show file tree
Hide file tree
Showing 15 changed files with 314 additions and 106 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,17 @@
import com.google.inject.Module;
import com.google.inject.Provides;
import com.google.inject.Scopes;
import io.airlift.json.JsonCodec;
import org.apache.cassandra.thrift.Cassandra;

import javax.inject.Singleton;

import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import static io.airlift.configuration.ConfigurationModule.bindConfig;
import static io.airlift.json.JsonCodecBinder.jsonCodecBinder;
import static org.weakref.jmx.ObjectNames.generatedNameOf;
import static org.weakref.jmx.guice.ExportBinder.newExporter;

Expand Down Expand Up @@ -58,6 +61,8 @@ public void configure(Binder binder)
newExporter(binder).export(CachingCassandraSchemaProvider.class).as(generatedNameOf(CachingCassandraSchemaProvider.class, connectorId));

binder.bind(CassandraSessionFactory.class).in(Scopes.SINGLETON);

jsonCodecBinder(binder).bindListJsonCodec(ExtraColumnMetadata.class);
}

@ForCassandra
Expand All @@ -73,9 +78,12 @@ public static ExecutorService createCachingCassandraSchemaExecutor(CassandraConn

@Singleton
@Provides
public static CassandraSession createCassandraSession(CassandraConnectorId connectorId, CassandraClientConfig config)
public static CassandraSession createCassandraSession(
CassandraConnectorId connectorId,
CassandraClientConfig config,
JsonCodec<List<ExtraColumnMetadata>> extraColumnMetadataCodec)
{
CassandraSessionFactory factory = new CassandraSessionFactory(connectorId, config);
CassandraSessionFactory factory = new CassandraSessionFactory(connectorId, config, extraColumnMetadataCodec);
return factory.create();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,15 @@
package com.facebook.presto.cassandra;

import com.facebook.presto.cassandra.util.CassandraCqlUtils;
import com.facebook.presto.spi.ConnectorColumnHandle;
import com.facebook.presto.spi.ColumnMetadata;
import com.facebook.presto.spi.ConnectorColumnHandle;
import com.facebook.presto.spi.type.Type;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Function;
import com.google.common.base.Objects;
import com.google.common.base.Objects.ToStringHelper;
import com.google.common.base.Predicate;

import javax.annotation.Nullable;

Expand All @@ -44,6 +45,7 @@ public class CassandraColumnHandle
private final boolean partitionKey;
private final boolean clusteringKey;
private final boolean indexed;
private final boolean hidden;

@JsonCreator
public CassandraColumnHandle(
Expand All @@ -54,7 +56,8 @@ public CassandraColumnHandle(
@Nullable @JsonProperty("typeArguments") List<CassandraType> typeArguments,
@JsonProperty("partitionKey") boolean partitionKey,
@JsonProperty("clusteringKey") boolean clusteringKey,
@JsonProperty("indexed") boolean indexed)
@JsonProperty("indexed") boolean indexed,
@JsonProperty("hidden") boolean hidden)
{
this.connectorId = checkNotNull(connectorId, "connectorId is null");
this.name = checkNotNull(name, "name is null");
Expand All @@ -73,6 +76,7 @@ public CassandraColumnHandle(
this.partitionKey = partitionKey;
this.clusteringKey = clusteringKey;
this.indexed = indexed;
this.hidden = hidden;
}

@JsonProperty
Expand Down Expand Up @@ -123,9 +127,15 @@ public boolean isIndexed()
return indexed;
}

@JsonProperty
public boolean isHidden()
{
return hidden;
}

public ColumnMetadata getColumnMetadata()
{
return new ColumnMetadata(CassandraCqlUtils.cqlNameToSqlName(name), cassandraType.getNativeType(), ordinalPosition, partitionKey);
return new ColumnMetadata(CassandraCqlUtils.cqlNameToSqlName(name), cassandraType.getNativeType(), ordinalPosition, partitionKey, hidden);
}

public Type getType()
Expand All @@ -144,7 +154,8 @@ public int hashCode()
typeArguments,
partitionKey,
clusteringKey,
indexed);
indexed,
hidden);
}

@Override
Expand All @@ -164,7 +175,8 @@ public boolean equals(Object obj)
&& Objects.equal(this.typeArguments, other.typeArguments)
&& Objects.equal(this.partitionKey, other.partitionKey)
&& Objects.equal(this.clusteringKey, other.clusteringKey)
&& Objects.equal(this.indexed, other.indexed);
&& Objects.equal(this.indexed, other.indexed)
&& Objects.equal(this.hidden, other.hidden);
}

@Override
Expand All @@ -182,7 +194,8 @@ public String toString()

helper.add("partitionKey", partitionKey)
.add("clusteringKey", clusteringKey)
.add("indexed", indexed);
.add("indexed", indexed)
.add("hidden", hidden);

return helper.toString();
}
Expand Down Expand Up @@ -242,4 +255,16 @@ public FullCassandraType apply(CassandraColumnHandle input)
}
};
}

public static Predicate<CassandraColumnHandle> partitionKeyPredicate()
{
return new Predicate<CassandraColumnHandle>()
{
@Override
public boolean apply(CassandraColumnHandle columnHandle)
{
return columnHandle.isPartitionKey();
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;

public class CassandraConnectorRecordSinkProvider implements ConnectorRecordSinkProvider
public class CassandraConnectorRecordSinkProvider
implements ConnectorRecordSinkProvider
{
private final CassandraSession cassandraSession;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,18 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.inject.Inject;
import io.airlift.json.JsonCodec;

import java.util.Collection;
import java.util.List;
import java.util.Map;

import static com.facebook.presto.cassandra.CassandraColumnHandle.SAMPLE_WEIGHT_COLUMN_NAME;
import static com.facebook.presto.cassandra.CassandraColumnHandle.columnMetadataGetter;
import static com.facebook.presto.cassandra.CassandraType.BIGINT;
import static com.facebook.presto.cassandra.CassandraType.toCassandraType;
import static com.facebook.presto.cassandra.util.Types.checkType;
import static com.facebook.presto.spi.StandardErrorCode.CANNOT_DROP_TABLE;
import static com.facebook.presto.spi.type.BigintType.BIGINT;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Strings.isNullOrEmpty;
Expand All @@ -50,20 +51,27 @@
import static java.util.Collections.emptyMap;

public class CassandraMetadata
implements ConnectorMetadata
implements ConnectorMetadata
{
private final String connectorId;
private final CachingCassandraSchemaProvider schemaProvider;
private final CassandraSession cassandraSession;
private final boolean allowDropTable;

private final JsonCodec<List<ExtraColumnMetadata>> extraColumnMetadataCodec;

@Inject
public CassandraMetadata(CassandraConnectorId connectorId, CachingCassandraSchemaProvider schemaProvider, CassandraSession cassandraSession, CassandraClientConfig config)
public CassandraMetadata(CassandraConnectorId connectorId,
CachingCassandraSchemaProvider schemaProvider,
CassandraSession cassandraSession,
JsonCodec<List<ExtraColumnMetadata>> extraColumnMetadataCodec,
CassandraClientConfig config)
{
this.connectorId = checkNotNull(connectorId, "connectorId is null").toString();
this.schemaProvider = checkNotNull(schemaProvider, "schemaProvider is null");
this.cassandraSession = checkNotNull(cassandraSession, "cassandraSession is null");
allowDropTable = checkNotNull(config, "config is null").getAllowDropTable();
this.allowDropTable = checkNotNull(config, "config is null").getAllowDropTable();
this.extraColumnMetadataCodec = checkNotNull(extraColumnMetadataCodec, "extraColumnMetadataCodec is null");
}

@Override
Expand Down Expand Up @@ -137,19 +145,13 @@ public ConnectorColumnHandle getColumnHandle(ConnectorTableHandle tableHandle, S
{
checkNotNull(tableHandle, "tableHandle is null");
checkNotNull(columnName, "columnName is null");
return getColumnHandles(tableHandle).get(columnName);
return getColumnHandles(tableHandle, false).get(columnName);
}

@Override
public ConnectorColumnHandle getSampleWeightColumnHandle(ConnectorTableHandle tableHandle)
{
for (ConnectorColumnHandle handle : getColumnHandles(tableHandle, true).values()) {
CassandraColumnHandle columnHandle = (CassandraColumnHandle) handle;
if (columnHandle.getName().equals(SAMPLE_WEIGHT_COLUMN_NAME)) {
return columnHandle;
}
}
return null;
return getColumnHandles(tableHandle, true).get(SAMPLE_WEIGHT_COLUMN_NAME);
}

@Override
Expand Down Expand Up @@ -246,13 +248,12 @@ public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, Con

ImmutableList.Builder<String> columnNames = ImmutableList.builder();
ImmutableList.Builder<Type> columnTypes = ImmutableList.builder();
ImmutableList.Builder<ExtraColumnMetadata> columnExtra = ImmutableList.builder();
columnExtra.add(new ExtraColumnMetadata("id", true));
for (ColumnMetadata column : tableMetadata.getColumns()) {
columnNames.add(column.getName());
columnTypes.add(column.getType());
}
if (tableMetadata.isSampled()) {
columnNames.add(SAMPLE_WEIGHT_COLUMN_NAME);
columnTypes.add(BIGINT);
columnExtra.add(new ExtraColumnMetadata(column.getName(), column.isHidden()));
}

// get the root directory for the database
Expand All @@ -262,17 +263,23 @@ public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, Con
List<String> columns = columnNames.build();
List<Type> types = columnTypes.build();
StringBuilder queryBuilder = new StringBuilder(String.format("CREATE TABLE \"%s\".\"%s\"(id uuid primary key", schemaName, tableName));
if (tableMetadata.isSampled()) {
queryBuilder.append(", ").append(SAMPLE_WEIGHT_COLUMN_NAME).append(" ").append(BIGINT.name().toLowerCase());
columnExtra.add(new ExtraColumnMetadata(SAMPLE_WEIGHT_COLUMN_NAME, true));
}
for (int i = 0; i < columns.size(); i++) {
String name = columns.get(i);
Type type = types.get(i);
if (!name.equals(SAMPLE_WEIGHT_COLUMN_NAME)) {
queryBuilder.append(", ")
.append(name)
.append(" ")
.append(toCassandraType(type).name().toLowerCase());
}
queryBuilder.append(", ")
.append(name)
.append(" ")
.append(toCassandraType(type).name().toLowerCase());
}
queryBuilder.append(")");
queryBuilder.append(") ");

// encode column ordering in the cassandra table comment field since there is no better place to store this
String columnMetadata = extraColumnMetadataCodec.toJson(columnExtra.build());
queryBuilder.append("WITH comment='").append(CassandraSession.PRESTO_COMMENT_METADATA).append(" ").append(columnMetadata).append("'");

// We need create Cassandra table before commit because record need to be written to the table .
cassandraSession.executeQuery(queryBuilder.toString());
Expand All @@ -282,6 +289,7 @@ public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, Con
tableName,
columnNames.build(),
columnTypes.build(),
tableMetadata.isSampled(),
tableMetadata.getOwner());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public class CassandraOutputTableHandle
private final String tableName;
private final List<String> columnNames;
private final List<Type> columnTypes;
private final boolean sampled;
private final String tableOwner;

@JsonCreator
Expand All @@ -41,8 +42,10 @@ public CassandraOutputTableHandle(
@JsonProperty("tableName") String tableName,
@JsonProperty("columnNames") List<String> columnNames,
@JsonProperty("columnTypes") List<Type> columnTypes,
@JsonProperty("sampled") boolean sampled,
@JsonProperty("tableOwner") String tableOwner)
{
this.sampled = sampled;
this.connectorId = checkNotNull(connectorId, "clientId is null");
this.schemaName = checkNotNull(schemaName, "schemaName is null");
this.tableName = checkNotNull(tableName, "tableName is null");
Expand All @@ -67,12 +70,6 @@ public String getSchemaName()
return schemaName;
}

@JsonProperty
public String getTableOwner()
{
return tableOwner;
}

@JsonProperty
public String getTableName()
{
Expand All @@ -91,6 +88,18 @@ public List<Type> getColumnTypes()
return columnTypes;
}

@JsonProperty
public boolean isSampled()
{
return sampled;
}

@JsonProperty
public String getTableOwner()
{
return tableOwner;
}

@Override
public String toString()
{
Expand Down
Loading

0 comments on commit 9de9c0e

Please sign in to comment.