Skip to content

Commit

Permalink
Add insert support to Raptor connector
Browse files Browse the repository at this point in the history
  • Loading branch information
electrum committed Aug 8, 2014
1 parent c6a56ae commit cfd7f70
Show file tree
Hide file tree
Showing 9 changed files with 205 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -106,4 +106,11 @@ public void testViewMetadata()
{
// Cassandra connector currently does not support views
}

@Override
public void testInsert()
throws Exception
{
// Cassandra connector currently does not support insert
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -112,4 +112,11 @@ private static ConnectorSession createSession(String schema)
{
return new ConnectorSession("user", "test", "hive", schema, UTC_KEY, ENGLISH, null, null);
}

@Override
public void testInsert()
throws Exception
{
// Hive connector currently does not support insert
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ public boolean canHandle(ConnectorOutputTableHandle tableHandle)
@Override
public boolean canHandle(ConnectorInsertTableHandle tableHandle)
{
return false;
return (tableHandle instanceof RaptorInsertTableHandle) &&
((RaptorInsertTableHandle) tableHandle).getConnectorId().equals(connectorId);
}

@Override
Expand Down Expand Up @@ -107,6 +108,6 @@ public Class<? extends ConnectorIndexHandle> getIndexHandleClass()
@Override
public Class<? extends ConnectorInsertTableHandle> getInsertTableHandleClass()
{
throw new UnsupportedOperationException();
return RaptorInsertTableHandle.class;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.raptor;

import com.facebook.presto.spi.ConnectorInsertTableHandle;
import com.facebook.presto.spi.type.Type;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;

import java.util.List;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;

public class RaptorInsertTableHandle
implements ConnectorInsertTableHandle
{
private final String connectorId;
private final long tableId;
private final List<RaptorColumnHandle> columnHandles;
private final List<Type> columnTypes;

@JsonCreator
public RaptorInsertTableHandle(
@JsonProperty("connectorId") String connectorId,
@JsonProperty("tableId") long tableId,
@JsonProperty("columnHandles") List<RaptorColumnHandle> columnHandles,
@JsonProperty("columnTypes") List<Type> columnTypes)
{
checkArgument(tableId > 0, "tableId must be greater than zero");

this.connectorId = checkNotNull(connectorId, "connectorId is null");
this.tableId = tableId;
this.columnHandles = ImmutableList.copyOf(checkNotNull(columnHandles, "columnHandles is null"));
this.columnTypes = ImmutableList.copyOf(checkNotNull(columnTypes, "columnTypes is null"));
}

@JsonProperty
public String getConnectorId()
{
return connectorId;
}

@JsonProperty
public long getTableId()
{
return tableId;
}

@JsonProperty
public List<RaptorColumnHandle> getColumnHandles()
{
return columnHandles;
}

@JsonProperty
public List<Type> getColumnTypes()
{
return columnTypes;
}

@Override
public String toString()
{
return connectorId + ":" + tableId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import static com.facebook.presto.raptor.metadata.MetadataDaoUtils.createMetadataTablesWithRetry;
import static com.facebook.presto.raptor.metadata.SqlUtils.runIgnoringConstraintViolation;
import static com.facebook.presto.spi.StandardErrorCode.ALREADY_EXISTS;
import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
import static com.facebook.presto.spi.type.BigintType.BIGINT;
import static com.facebook.presto.util.Types.checkType;
import static com.google.common.base.Preconditions.checkArgument;
Expand Down Expand Up @@ -359,27 +360,33 @@ public Long inTransaction(Handle dbiHandle, TransactionStatus status)
}
});

ImmutableMap.Builder<UUID, String> shards = ImmutableMap.builder();
for (String fragment : fragments) {
Iterator<String> split = Splitter.on(':').split(fragment).iterator();
String nodeId = split.next();
UUID shardUuid = UUID.fromString(split.next());
shards.put(shardUuid, nodeId);
}

shardManager.commitUnpartitionedTable(tableId, shards.build());
shardManager.commitUnpartitionedTable(tableId, parseFragments(fragments));
}

@Override
public ConnectorInsertTableHandle beginInsert(ConnectorSession session, ConnectorTableHandle tableHandle)
{
throw new UnsupportedOperationException();
if (!shardManager.getAllPartitionKeys(tableHandle).isEmpty()) {
throw new PrestoException(NOT_SUPPORTED.toErrorCode(), "Inserting into partitioned tables is yet not supported");
}

long tableId = checkType(tableHandle, RaptorTableHandle.class, "tableHandle").getTableId();

ImmutableList.Builder<RaptorColumnHandle> columnHandles = ImmutableList.builder();
ImmutableList.Builder<Type> columnTypes = ImmutableList.builder();
for (TableColumn column : dao.getTableColumns(tableId)) {
columnHandles.add(new RaptorColumnHandle(connectorId, column.getColumnName(), column.getColumnId(), column.getDataType()));
columnTypes.add(column.getDataType());
}

return new RaptorInsertTableHandle(connectorId, tableId, columnHandles.build(), columnTypes.build());
}

@Override
public void commitInsert(ConnectorInsertTableHandle insertHandle, Collection<String> fragments)
{
throw new UnsupportedOperationException();
long tableId = checkType(insertHandle, RaptorInsertTableHandle.class, "insertHandle").getTableId();
shardManager.commitUnpartitionedTable(tableId, parseFragments(fragments));
}

@Override
Expand Down Expand Up @@ -449,6 +456,18 @@ private RaptorColumnHandle getRaptorColumnHandle(TableColumn tableColumn)
return new RaptorColumnHandle(connectorId, tableColumn.getColumnName(), tableColumn.getColumnId(), tableColumn.getDataType());
}

private static Map<UUID, String> parseFragments(Collection<String> fragments)
{
ImmutableMap.Builder<UUID, String> shards = ImmutableMap.builder();
for (String fragment : fragments) {
Iterator<String> split = Splitter.on(':').split(fragment).iterator();
String nodeId = split.next();
UUID shardUuid = UUID.fromString(split.next());
shards.put(shardUuid, nodeId);
}
return shards.build();
}

private static Predicate<ColumnMetadata> isSampleWeightColumn()
{
return new Predicate<ColumnMetadata>()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,11 @@ public RecordSink getRecordSink(ConnectorOutputTableHandle tableHandle)
@Override
public RecordSink getRecordSink(ConnectorInsertTableHandle tableHandle)
{
throw new UnsupportedOperationException();
RaptorInsertTableHandle handle = checkType(tableHandle, RaptorInsertTableHandle.class, "tableHandle");

ColumnFileHandle fileHandle = createStagingFileHandle(handle.getColumnHandles());

return new RaptorRecordSink(nodeId, fileHandle, storageManager, handle.getColumnTypes(), null);
}

private ColumnFileHandle createStagingFileHandle(List<RaptorColumnHandle> columnHandles)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,15 @@

import com.facebook.presto.raptor.RaptorTableHandle;
import com.facebook.presto.spi.ConnectorTableHandle;
import com.facebook.presto.spi.PrestoException;
import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMultimap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Multimap;
import com.google.common.collect.Multimaps;
import com.google.common.collect.Ordering;
import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.IDBI;
import org.skife.jdbi.v2.TransactionStatus;
Expand All @@ -29,6 +32,9 @@
import javax.annotation.Nullable;
import javax.inject.Inject;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
Expand All @@ -38,6 +44,7 @@
import static com.facebook.presto.raptor.metadata.PartitionKey.partitionNameGetter;
import static com.facebook.presto.raptor.metadata.ShardManagerDaoUtils.createShardTablesWithRetry;
import static com.facebook.presto.raptor.metadata.SqlUtils.runIgnoringConstraintViolation;
import static com.facebook.presto.spi.StandardErrorCode.INTERNAL_ERROR;
import static com.facebook.presto.util.Types.checkType;
import static com.google.common.collect.Maps.immutableEntry;

Expand All @@ -59,20 +66,16 @@ public DatabaseShardManager(@ForShardManager IDBI dbi)
}

@Override
public void commitPartition(final long tableId, final String partition, final List<PartitionKey> partitionKeys, final Map<UUID, String> shards)
public void commitPartition(final long tableId, String partition, List<PartitionKey> partitionKeys, final Map<UUID, String> shards)
{
final long partitionId = getOrCreatePartitionId(tableId, partition, partitionKeys);

dbi.inTransaction(new VoidTransactionCallback()
{
@Override
protected void execute(Handle handle, TransactionStatus status)
{
ShardManagerDao dao = handle.attach(ShardManagerDao.class);
long partitionId = dao.insertPartition(tableId, partition);

for (PartitionKey partitionKey : partitionKeys) {
dao.insertPartitionKey(tableId, partition, partitionKey.getName(), partitionKey.getType().getName(), partitionKey.getValue());
}

for (Map.Entry<UUID, String> entry : shards.entrySet()) {
long nodeId = getOrCreateNodeId(entry.getValue());
UUID shardUuid = entry.getKey();
Expand Down Expand Up @@ -206,8 +209,49 @@ public void run()

id = dao.getNodeId(nodeIdentifier);
if (id == null) {
throw new IllegalStateException("node does not exist after insert");
throw new PrestoException(INTERNAL_ERROR.toErrorCode(), "node does not exist after insert");
}
return id;
}

private long getOrCreatePartitionId(final long tableId, final String partition, final List<PartitionKey> partitionKeys)
{
Long id = dao.getPartitionId(tableId, partition);
if (id != null) {
return id;
}

// creating a partition is idempotent
runIgnoringConstraintViolation(new Runnable()
{
@Override
public void run()
{
dao.insertPartition(tableId, partition);

// use consistent insertion ordering to avoid deadlocks
for (PartitionKey key : sorted(partitionKeys, partitionNameGetter())) {
dao.insertPartitionKey(
tableId,
partition,
key.getName(),
key.getType().getName(),
key.getValue());
}
}
});

id = dao.getPartitionId(tableId, partition);
if (id == null) {
throw new PrestoException(INTERNAL_ERROR.toErrorCode(), "partition does not exist after insert");
}
return id;
}

private static <F, T extends Comparable<T>> List<F> sorted(Collection<F> collection, Function<F, T> function)
{
List<F> list = new ArrayList<>(collection);
Collections.sort(list, Ordering.natural().onResultOf(function));
return list;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,12 @@ void insertPartitionShard(
@SqlQuery("SELECT node_id FROM nodes WHERE node_identifier = :nodeIdentifier")
Long getNodeId(@Bind("nodeIdentifier") String nodeIdentifier);

@SqlQuery("SELECT partition_id\n" +
"FROM table_partitions\n" +
"WHERE table_id = :tableId\n" +
" AND partition_name = :partitionName")
Long getPartitionId(@Bind("tableId") long tableId, @Bind("partitionName") String partitionName);

@SqlQuery("SELECT partition_name, key_name, key_type, key_value\n" +
" FROM partition_keys\n" +
" WHERE table_id = :tableId")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,22 @@ public void testRenameTable()
assertFalse(queryRunner.tableExists(getSession(), "test_rename_new"));
}

@Test
public void testInsert()
throws Exception
{
@Language("SQL") String query = "SELECT orderdate, orderkey FROM orders";

assertQuery("CREATE TABLE test_insert AS " + query, "SELECT count(*) FROM orders");
assertQuery("SELECT * FROM test_insert", query);

assertQuery("INSERT INTO test_insert " + query, "SELECT count(*) FROM orders");

assertQuery("SELECT * FROM test_insert", query + " UNION ALL " + query);

assertQueryTrue("DROP TABLE test_insert");
}

@Test
public void testView()
throws Exception
Expand Down

0 comments on commit cfd7f70

Please sign in to comment.