Skip to content

Commit

Permalink
Use Cassandra.Client in a thread safe manner
Browse files Browse the repository at this point in the history
  • Loading branch information
dain committed Jun 26, 2014
1 parent 4126867 commit 66f78c4
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import com.google.inject.Provides;
import com.google.inject.Scopes;
import io.airlift.json.JsonCodec;
import org.apache.cassandra.thrift.Cassandra;

import javax.inject.Singleton;

Expand Down Expand Up @@ -57,6 +56,8 @@ public void configure(Binder binder)

bindConfig(binder).to(CassandraClientConfig.class);

binder.bind(CassandraThriftConnectionFactory.class).in(Scopes.SINGLETON);

binder.bind(CachingCassandraSchemaProvider.class).in(Scopes.SINGLETON);
newExporter(binder).export(CachingCassandraSchemaProvider.class).as(generatedNameOf(CachingCassandraSchemaProvider.class, connectorId));

Expand Down Expand Up @@ -86,12 +87,4 @@ public static CassandraSession createCassandraSession(
CassandraSessionFactory factory = new CassandraSessionFactory(connectorId, config, extraColumnMetadataCodec);
return factory.create();
}

@Singleton
@Provides
public static Cassandra.Client createCassandraThriftConnection(CassandraClientConfig config)
{
CassandraThriftConnectionFactory factory = new CassandraThriftConnectionFactory(config);
return factory.create();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.cassandra;

import com.google.common.collect.Lists;
import org.apache.cassandra.thrift.Cassandra.Client;
import org.apache.cassandra.thrift.CfSplit;
import org.apache.cassandra.thrift.TokenRange;
import org.apache.thrift.TApplicationException;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.transport.TTransport;

import java.util.List;

public class CassandraThriftClient
{
private final CassandraThriftConnectionFactory connectionFactory;

public CassandraThriftClient(CassandraThriftConnectionFactory connectionFactory)
{
this.connectionFactory = connectionFactory;
}

public List<TokenRange> getRangeMap(String keyspace)
{
Client client = connectionFactory.create();
try {
return client.describe_ring(keyspace);
}
catch (TException e) {
throw new RuntimeException(e);
}
finally {
closeQuietly(client);
}
}

public List<CfSplit> getSubSplits(String keyspace, String columnFamily, TokenRange range, int splitSize)
{
Client client = connectionFactory.create();
try {
client.set_keyspace(keyspace);
try {
return client.describe_splits_ex(columnFamily, range.start_token, range.end_token, splitSize);
}
catch (TApplicationException e) {
// fallback to guessing split size if talking to a server without describe_splits_ex method
if (e.getType() == TApplicationException.UNKNOWN_METHOD) {
List<String> splitPoints = client.describe_splits(columnFamily, range.start_token, range.end_token, splitSize);
return tokenListToSplits(splitPoints, splitSize);
}
throw e;
}
}
catch (TException e) {
throw new RuntimeException(e);
}
finally {
closeQuietly(client);
}
}

private static List<CfSplit> tokenListToSplits(List<String> splitTokens, int splitSize)
{
List<CfSplit> splits = Lists.newArrayListWithExpectedSize(splitTokens.size() - 1);
for (int index = 0; index < splitTokens.size() - 1; index++) {
splits.add(new CfSplit(splitTokens.get(index), splitTokens.get(index + 1), splitSize));
}
return splits;
}

public static void closeQuietly(Client client)
{
try {
TProtocol inputProtocol = client.getInputProtocol();
if (inputProtocol == null) {
return;
}
TTransport transport = inputProtocol.getTransport();
if (transport == null) {
return;
}
transport.close();
}
catch (Exception ignored) {
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,9 @@
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.thrift.Cassandra;
import org.apache.cassandra.thrift.CfSplit;
import org.apache.cassandra.thrift.TokenRange;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.thrift.TApplicationException;
import org.apache.thrift.TException;

import java.io.IOException;
import java.util.ArrayList;
Expand All @@ -42,17 +39,17 @@

public class CassandraTokenSplitManager
{
private final Cassandra.Client client;
private final CassandraThriftClient cassandraThriftClient;
private final ExecutorService executor;
private final int splitSize;
private final IPartitioner<?> partitioner;

@Inject
public CassandraTokenSplitManager(Cassandra.Client client, @ForCassandra ExecutorService executor, CassandraClientConfig config)
public CassandraTokenSplitManager(CassandraThriftConnectionFactory connectionFactory, @ForCassandra ExecutorService executor, CassandraClientConfig config)
{
this.client = checkNotNull(client, "client is null");
this.cassandraThriftClient = new CassandraThriftClient(checkNotNull(connectionFactory, "connectionFactory is null"));
this.executor = checkNotNull(executor, "executor is null");
this.splitSize = checkNotNull(config, "config is null").getSplitSize();
this.splitSize = config.getSplitSize();
try {
this.partitioner = FBUtilities.newPartitioner(config.getPartitioner());
}
Expand All @@ -64,14 +61,14 @@ public CassandraTokenSplitManager(Cassandra.Client client, @ForCassandra Executo
public List<TokenSplit> getSplits(String keyspace, String columnFamily)
throws IOException
{
List<TokenRange> masterRangeNodes = getRangeMap(keyspace, client);
List<TokenRange> masterRangeNodes = cassandraThriftClient.getRangeMap(keyspace);

// canonical ranges, split into pieces, fetching the splits in parallel
List<TokenSplit> splits = new ArrayList<>();
List<Future<List<TokenSplit>>> splitFutures = new ArrayList<>();
for (TokenRange range : masterRangeNodes) {
// for each range, pick a live owner and ask it to compute bite-sized splits
splitFutures.add(executor.submit(new SplitCallable<>(range, keyspace, columnFamily, splitSize, client, partitioner)));
splitFutures.add(executor.submit(new SplitCallable<>(range, keyspace, columnFamily, splitSize, cassandraThriftClient, partitioner)));
}

// wait until we have all the results back
Expand All @@ -85,52 +82,11 @@ public List<TokenSplit> getSplits(String keyspace, String columnFamily)
}

checkState(!splits.isEmpty(), "No splits created");
//noinspection SharedThreadLocalRandom
Collections.shuffle(splits, ThreadLocalRandom.current());
return splits;
}

private static List<TokenRange> getRangeMap(String keyspace, Cassandra.Client client)
throws IOException
{
try {
return client.describe_ring(keyspace);
}
catch (TException e) {
throw new RuntimeException(e);
}
}

private static List<CfSplit> getSubSplits(String keyspace, String columnFamily, TokenRange range, int splitSize, Cassandra.Client client)
throws IOException
{
try {
client.set_keyspace(keyspace);
try {
return client.describe_splits_ex(columnFamily, range.start_token, range.end_token, splitSize);
}
catch (TApplicationException e) {
// fallback to guessing split size if talking to a server without describe_splits_ex method
if (e.getType() == TApplicationException.UNKNOWN_METHOD) {
List<String> splitPoints = client.describe_splits(columnFamily, range.start_token, range.end_token, splitSize);
return tokenListToSplits(splitPoints, splitSize);
}
throw e;
}
}
catch (TException e) {
throw new RuntimeException(e);
}
}

private static List<CfSplit> tokenListToSplits(List<String> splitTokens, int splitSize)
{
ImmutableList.Builder<CfSplit> splits = ImmutableList.builder();
for (int index = 0; index < splitTokens.size() - 1; index++) {
splits.add(new CfSplit(splitTokens.get(index), splitTokens.get(index + 1), splitSize));
}
return splits.build();
}

/**
* Gets a token range and splits it up according to the suggested
* size into input splits that Hadoop can use.
Expand All @@ -142,10 +98,10 @@ private class SplitCallable<T extends Token<?>>
private final String keyspace;
private final String columnFamily;
private final int splitSize;
private final Cassandra.Client client;
private final CassandraThriftClient client;
private final IPartitioner<T> partitioner;

public SplitCallable(TokenRange range, String keyspace, String columnFamily, int splitSize, Cassandra.Client client, IPartitioner<T> partitioner)
public SplitCallable(TokenRange range, String keyspace, String columnFamily, int splitSize, CassandraThriftClient client, IPartitioner<T> partitioner)
{
checkArgument(range.rpc_endpoints.size() == range.endpoints.size(), "rpc_endpoints size must match endpoints size");
this.range = range;
Expand All @@ -161,7 +117,7 @@ public List<TokenSplit> call()
throws Exception
{
ArrayList<TokenSplit> splits = new ArrayList<>();
List<CfSplit> subSplits = getSubSplits(keyspace, columnFamily, range, splitSize, client);
List<CfSplit> subSplits = client.getSubSplits(keyspace, columnFamily, range, splitSize);

// turn the sub-ranges into InputSplits
List<String> endpoints = range.endpoints;
Expand Down

0 comments on commit 66f78c4

Please sign in to comment.