Skip to content

Commit

Permalink
Fix SpannerIO service call metrics and improve tests.
Browse files Browse the repository at this point in the history
Adds metrics to NaiveSpannerRead.
Fixes metric resource identifiers.
Refactor and improve SpannerIOReadTest unit tests to add additional coverage.
Add unit tests for NaiveSpannerRead (non-partitioned read)
Fix metrics for SpannerIO.Write
  • Loading branch information
nielm committed May 30, 2022
1 parent 9a6f769 commit ef68037
Show file tree
Hide file tree
Showing 8 changed files with 1,073 additions and 851 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,17 @@ public static String datastoreResource(String projectId, String namespace) {
"//bigtable.googleapis.com/projects/%s/namespaces/%s", projectId, namespace);
}

public static String spannerTable(String projectId, String databaseId, String tableId) {
public static String spannerTable(
String projectId, String instanceId, String databaseId, String tableId) {
return String.format(
"//spanner.googleapis.com/projects/%s/topics/%s/tables/%s", projectId, databaseId, tableId);
"//spanner.googleapis.com/projects/%s/instances/%s/databases/%s/tables/%s",
projectId, instanceId, databaseId, tableId);
}

public static String spannerQuery(String projectId, String queryName) {
return String.format("//spanner.googleapis.com/projects/%s/queries/%s", projectId, queryName);
public static String spannerQuery(
String projectId, String instanceId, String databaseId, String queryName) {
return String.format(
"//spanner.googleapis.com/projects/%s/instances/%s/databases/%s/queries/%s",
projectId, instanceId, databaseId, queryName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,26 +20,27 @@
import com.google.auto.value.AutoValue;
import com.google.cloud.spanner.BatchReadOnlyTransaction;
import com.google.cloud.spanner.Options;
import com.google.cloud.spanner.Options.RpcPriority;
import com.google.cloud.spanner.Partition;
import com.google.cloud.spanner.ResultSet;
import com.google.cloud.spanner.SpannerException;
import com.google.cloud.spanner.SpannerOptions;
import com.google.cloud.spanner.Struct;
import com.google.cloud.spanner.TimestampBound;
import java.util.HashMap;
import java.io.Serializable;
import java.util.List;
import org.apache.beam.runners.core.metrics.GcpResourceIdentifiers;
import org.apache.beam.runners.core.metrics.MonitoringInfoConstants;
import org.apache.beam.runners.core.metrics.ServiceCallMetric;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.spanner.SpannerIO.ReadAll;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Reshuffle;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheBuilder;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheLoader;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LoadingCache;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -69,6 +70,21 @@ public static BatchSpannerRead create(

abstract TimestampBound getTimestampBound();

/**
* Container class to combine a ReadOperation with a Partition so that Metrics are implemented
* properly.
*/
@AutoValue
protected abstract static class PartitionedReadOperation implements Serializable {
abstract ReadOperation getReadOperation();

abstract Partition getPartition();

static PartitionedReadOperation create(ReadOperation readOperation, Partition partition) {
return new AutoValue_BatchSpannerRead_PartitionedReadOperation(readOperation, partition);
}
}

@Override
public PCollection<Struct> expand(PCollection<ReadOperation> input) {
PCollectionView<Transaction> txView = getTxView();
Expand All @@ -84,14 +100,14 @@ public PCollection<Struct> expand(PCollection<ReadOperation> input) {
.apply(
"Generate Partitions",
ParDo.of(new GeneratePartitionsFn(getSpannerConfig(), txView)).withSideInputs(txView))
.apply("Shuffle partitions", Reshuffle.<Partition>viaRandomKey())
.apply("Shuffle partitions", Reshuffle.viaRandomKey())
.apply(
"Read from Partitions",
ParDo.of(new ReadFromPartitionFn(getSpannerConfig(), txView)).withSideInputs(txView));
}

@VisibleForTesting
static class GeneratePartitionsFn extends DoFn<ReadOperation, Partition> {
static class GeneratePartitionsFn extends DoFn<ReadOperation, PartitionedReadOperation> {

private final SpannerConfig config;
private final PCollectionView<? extends Transaction> txView;
Expand All @@ -102,6 +118,8 @@ public GeneratePartitionsFn(
SpannerConfig config, PCollectionView<? extends Transaction> txView) {
this.config = config;
this.txView = txView;
Preconditions.checkNotNull(config.getRpcPriority());
Preconditions.checkNotNull(config.getRpcPriority().get());
}

@Setup
Expand All @@ -117,75 +135,62 @@ public void teardown() throws Exception {
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
Transaction tx = c.sideInput(txView);
BatchReadOnlyTransaction context =
BatchReadOnlyTransaction batchTx =
spannerAccessor.getBatchClient().batchReadOnlyTransaction(tx.transactionId());
for (Partition p : execute(c.element(), context)) {
c.output(p);
}
}

private List<Partition> execute(ReadOperation op, BatchReadOnlyTransaction tx) {
if (config.getRpcPriority() != null && config.getRpcPriority().get() != null) {
return executeWithPriority(op, tx, config.getRpcPriority().get());
} else {
return executeWithoutPriority(op, tx);
}
}

private List<Partition> executeWithoutPriority(ReadOperation op, BatchReadOnlyTransaction tx) {
// Query was selected.
if (op.getQuery() != null) {
return tx.partitionQuery(op.getPartitionOptions(), op.getQuery());
}
// Read with index was selected.
if (op.getIndex() != null) {
return tx.partitionReadUsingIndex(
op.getPartitionOptions(),
op.getTable(),
op.getIndex(),
op.getKeySet(),
op.getColumns());
}
// Read from table was selected.
return tx.partitionRead(
op.getPartitionOptions(), op.getTable(), op.getKeySet(), op.getColumns());
}

private List<Partition> executeWithPriority(
ReadOperation op, BatchReadOnlyTransaction tx, RpcPriority rpcPriority) {
// Query was selected.
if (op.getQuery() != null) {
return tx.partitionQuery(
op.getPartitionOptions(), op.getQuery(), Options.priority(rpcPriority));
ReadOperation op = c.element();

// While this creates a ServiceCallMetric for every input element, in reality, the number
// of input elements will either be very few (normally 1!), or they will differ and
// need different metrics.
ServiceCallMetric metric = ReadAll.buildServiceCallMetricForReadOp(config, op);

List<Partition> partitions;
try {
if (op.getQuery() != null) {
// Query was selected.
partitions =
batchTx.partitionQuery(
op.getPartitionOptions(),
op.getQuery(),
Options.priority(config.getRpcPriority().get()));
} else if (op.getIndex() != null) {
// Read with index was selected.
partitions =
batchTx.partitionReadUsingIndex(
op.getPartitionOptions(),
op.getTable(),
op.getIndex(),
op.getKeySet(),
op.getColumns(),
Options.priority(config.getRpcPriority().get()));
} else {
// Read from table was selected.
partitions =
batchTx.partitionRead(
op.getPartitionOptions(),
op.getTable(),
op.getKeySet(),
op.getColumns(),
Options.priority(config.getRpcPriority().get()));
}
metric.call("ok");
} catch (SpannerException e) {
metric.call(e.getErrorCode().getGrpcStatusCode().toString());
throw e;
}
// Read with index was selected.
if (op.getIndex() != null) {
return tx.partitionReadUsingIndex(
op.getPartitionOptions(),
op.getTable(),
op.getIndex(),
op.getKeySet(),
op.getColumns(),
Options.priority(rpcPriority));
for (Partition p : partitions) {
c.output(PartitionedReadOperation.create(op, p));
}
// Read from table was selected.
return tx.partitionRead(
op.getPartitionOptions(),
op.getTable(),
op.getKeySet(),
op.getColumns(),
Options.priority(rpcPriority));
}
}

private static class ReadFromPartitionFn extends DoFn<Partition, Struct> {
private static class ReadFromPartitionFn extends DoFn<PartitionedReadOperation, Struct> {

private final SpannerConfig config;
private final PCollectionView<? extends Transaction> txView;

private transient SpannerAccessor spannerAccessor;
private transient String projectId;
private transient ServiceCallMetric serviceCallMetric;
private transient LoadingCache<ReadOperation, ServiceCallMetric> metricsForReadOperation;

public ReadFromPartitionFn(
SpannerConfig config, PCollectionView<? extends Transaction> txView) {
Expand All @@ -196,24 +201,28 @@ public ReadFromPartitionFn(
@Setup
public void setup() throws Exception {
spannerAccessor = SpannerAccessor.getOrCreate(config);
projectId =
this.config.getProjectId() == null
|| this.config.getProjectId().get() == null
|| this.config.getProjectId().get().isEmpty()
? SpannerOptions.getDefaultProjectId()
: this.config.getProjectId().get();

// Use a LoadingCache for metrics as there can be different read operations which result in
// different service call metrics labels. ServiceCallMetric items are created on-demand and
// added to the cache.
metricsForReadOperation =
CacheBuilder.newBuilder()
.maximumSize(SpannerIO.METRICS_CACHE_SIZE)
// worker.
.build(
new CacheLoader<ReadOperation, ServiceCallMetric>() {
@Override
public ServiceCallMetric load(ReadOperation op) {
return ReadAll.buildServiceCallMetricForReadOp(config, op);
}
});
}

@Teardown
public void teardown() throws Exception {
spannerAccessor.close();
}

@StartBundle
public void startBundle() throws Exception {
serviceCallMetric =
createServiceCallMetric(
projectId, this.config.getDatabaseId().get(), this.config.getInstanceId().get());
metricsForReadOperation.invalidateAll();
metricsForReadOperation.cleanUp();
}

@ProcessElement
Expand All @@ -223,8 +232,9 @@ public void processElement(ProcessContext c) throws Exception {
BatchReadOnlyTransaction batchTx =
spannerAccessor.getBatchClient().batchReadOnlyTransaction(tx.transactionId());

Partition p = c.element();
try (ResultSet resultSet = batchTx.execute(p)) {
PartitionedReadOperation op = c.element();
ServiceCallMetric serviceCallMetric = metricsForReadOperation.get(op.getReadOperation());
try (ResultSet resultSet = batchTx.execute(op.getPartition())) {
while (resultSet.next()) {
Struct s = resultSet.getCurrentRowAsStruct();
c.output(s);
Expand All @@ -236,22 +246,5 @@ public void processElement(ProcessContext c) throws Exception {
}
serviceCallMetric.call("ok");
}

private ServiceCallMetric createServiceCallMetric(
String projectId, String databaseId, String tableId) {
HashMap<String, String> baseLabels = new HashMap<>();
baseLabels.put(MonitoringInfoConstants.Labels.PTRANSFORM, "");
baseLabels.put(MonitoringInfoConstants.Labels.SERVICE, "Spanner");
baseLabels.put(MonitoringInfoConstants.Labels.METHOD, "Read");
baseLabels.put(
MonitoringInfoConstants.Labels.RESOURCE,
GcpResourceIdentifiers.spannerTable(projectId, databaseId, tableId));
baseLabels.put(MonitoringInfoConstants.Labels.SPANNER_PROJECT_ID, projectId);
baseLabels.put(MonitoringInfoConstants.Labels.SPANNER_DATABASE_ID, databaseId);
baseLabels.put(MonitoringInfoConstants.Labels.SPANNER_INSTANCE_ID, tableId);
ServiceCallMetric serviceCallMetric =
new ServiceCallMetric(MonitoringInfoConstants.Urns.API_REQUEST_COUNT, baseLabels);
return serviceCallMetric;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@
import com.google.cloud.spanner.Options;
import com.google.cloud.spanner.Options.RpcPriority;
import com.google.cloud.spanner.ResultSet;
import com.google.cloud.spanner.SpannerException;
import com.google.cloud.spanner.Struct;
import com.google.cloud.spanner.TimestampBound;
import org.apache.beam.runners.core.metrics.ServiceCallMetric;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
Expand Down Expand Up @@ -97,37 +99,26 @@ public void teardown() throws Exception {
public void processElement(ProcessContext c) throws Exception {
Transaction tx = c.sideInput(txView);
ReadOperation op = c.element();
ServiceCallMetric serviceCallMetric =
SpannerIO.ReadAll.buildServiceCallMetricForReadOp(config, op);
BatchReadOnlyTransaction context =
spannerAccessor.getBatchClient().batchReadOnlyTransaction(tx.transactionId());
try (ResultSet resultSet = execute(op, context)) {
while (resultSet.next()) {
c.output(resultSet.getCurrentRowAsStruct());
}
} catch (SpannerException e) {
serviceCallMetric.call(e.getErrorCode().getGrpcStatusCode().toString());
throw (e);
}
serviceCallMetric.call("ok");
}

private ResultSet execute(ReadOperation op, BatchReadOnlyTransaction readOnlyTransaction) {
RpcPriority rpcPriority = SpannerConfig.DEFAULT_RPC_PRIORITY;
if (config.getRpcPriority() != null && config.getRpcPriority().get() != null) {
return executeWithPriority(op, readOnlyTransaction, config.getRpcPriority().get());
} else {
return executeWithoutPriority(op, readOnlyTransaction);
rpcPriority = config.getRpcPriority().get();
}
}

private ResultSet executeWithoutPriority(
ReadOperation op, BatchReadOnlyTransaction readOnlyTransaction) {
if (op.getQuery() != null) {
return readOnlyTransaction.executeQuery(op.getQuery());
}
if (op.getIndex() != null) {
return readOnlyTransaction.readUsingIndex(
op.getTable(), op.getIndex(), op.getKeySet(), op.getColumns());
}
return readOnlyTransaction.read(op.getTable(), op.getKeySet(), op.getColumns());
}

private ResultSet executeWithPriority(
ReadOperation op, BatchReadOnlyTransaction readOnlyTransaction, RpcPriority rpcPriority) {
if (op.getQuery() != null) {
return readOnlyTransaction.executeQuery(op.getQuery(), Options.priority(rpcPriority));
}
Expand Down
Loading

0 comments on commit ef68037

Please sign in to comment.