Skip to content

Commit

Permalink
[BEAM-7667] report GCS throttling time to Dataflow autoscaler
Browse files Browse the repository at this point in the history
  • Loading branch information
ihji committed Aug 5, 2019
1 parent f7cbf88 commit 04119cc
Show file tree
Hide file tree
Showing 5 changed files with 92 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,11 @@ public class BatchModeExecutionContext

private final MetricsContainerRegistry<MetricsContainerImpl> containerRegistry;

// TODO: Move throttle time Metric to a dedicated namespace.
// TODO(BEAM-7863): Move throttle time Metric to a dedicated namespace.
protected static final String DATASTORE_THROTTLE_TIME_NAMESPACE =
"org.apache.beam.sdk.io.gcp.datastore.DatastoreV1$DatastoreWriterFn";
protected static final String HTTP_CLIENT_API_THROTTLE_TIME_NAMESPACE =
"org.apache.beam.sdk.extensions.gcp.util.RetryHttpRequestInitializer$LoggingHttpBackOffHandler";

private BatchModeExecutionContext(
CounterFactory counterFactory,
Expand Down Expand Up @@ -498,15 +500,25 @@ public Iterable<CounterUpdate> extractMsecCounters(boolean isFinalUpdate) {
public Long extractThrottleTime() {
Long totalThrottleTime = 0L;
for (MetricsContainerImpl container : containerRegistry.getContainers()) {
// TODO: Update Datastore to use generic throttling-msecs metric.
CounterCell throttleTime =
// TODO(BEAM-7863): Update throttling counters to use generic throttling-msecs metric.
CounterCell dataStoreThrottlingTime =
container.tryGetCounter(
MetricName.named(
BatchModeExecutionContext.DATASTORE_THROTTLE_TIME_NAMESPACE,
"cumulativeThrottlingSeconds"));
if (throttleTime != null) {
totalThrottleTime += throttleTime.getCumulative();
if (dataStoreThrottlingTime != null) {
totalThrottleTime += dataStoreThrottlingTime.getCumulative();
}

CounterCell httpClientApiThrottlingTime =
container.tryGetCounter(
MetricName.named(
BatchModeExecutionContext.HTTP_CLIENT_API_THROTTLE_TIME_NAMESPACE,
"cumulativeThrottlingSeconds"));
if (httpClientApiThrottlingTime != null) {
totalThrottleTime += httpClientApiThrottlingTime.getCumulative();
}

CounterCell throttlingMsecs =
container.tryGetCounter(DataflowSystemMetrics.THROTTLING_MSECS_METRIC_NAME);
if (throttlingMsecs != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
*/
package org.apache.beam.sdk.extensions.gcp.util;

import static com.google.api.client.util.BackOffUtils.next;

import com.google.api.client.http.HttpIOExceptionHandler;
import com.google.api.client.http.HttpRequest;
import com.google.api.client.http.HttpRequestInitializer;
Expand All @@ -36,6 +34,8 @@
import java.util.HashSet;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -68,6 +68,9 @@ private static class LoggingHttpBackOffHandler
private final BackOff ioExceptionBackOff;
private final BackOff unsuccessfulResponseBackOff;
private final Set<Integer> ignoredResponseCodes;
// aggregate the total time spent in exponential backoff
private final Counter throttlingSeconds =
Metrics.counter(LoggingHttpBackOffHandler.class, "cumulativeThrottlingSeconds");
private int ioExceptionRetries;
private int unsuccessfulResponseRetries;
@Nullable private CustomHttpErrors customHttpErrors;
Expand Down Expand Up @@ -172,7 +175,13 @@ && retryOnStatusCode(response.getStatusCode())
/** Returns true iff performing the backoff was successful. */
private boolean backOffWasSuccessful(BackOff backOff) {
try {
return next(sleeper, backOff);
long backOffTime = backOff.nextBackOffMillis();
if (backOffTime == BackOff.STOP) {
return false;
}
throttlingSeconds.inc(backOffTime / 1000);
sleeper.sleep(backOffTime);
return true;
} catch (InterruptedException | IOException e) {
return false;
}
Expand Down
51 changes: 51 additions & 0 deletions sdks/python/apache_beam/io/gcp/gcsio_overrides.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http:https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

from __future__ import absolute_import

import logging
import math
import time

from apache_beam.metrics.metric import Metrics
from apitools.base.py import exceptions
from apitools.base.py import http_wrapper
from apitools.base.py import util


class GcsIOOverrides(object):
"""Functions for overriding Google Cloud Storage I/O client."""

_THROTTLED_SECS = Metrics.counter('StorageV1', "cumulativeThrottlingSeconds")

@classmethod
def retry_func(cls, retry_args):
# handling GCS download throttling errors (BEAM-7424)
if (isinstance(retry_args.exc, exceptions.BadStatusCodeError) and
retry_args.exc.status_code == http_wrapper.TOO_MANY_REQUESTS):
logging.debug(
'Caught GCS quota error (%s), retrying.', retry_args.exc.status_code)
else:
return http_wrapper.HandleExceptionsAndRebuildHttpConnections(retry_args)

http_wrapper.RebuildHttpConnections(retry_args.http)
logging.debug('Retrying request to url %s after exception %s',
retry_args.http_request.url, retry_args.exc)
sleep_seconds = util.CalculateWaitForRetry(
retry_args.num_retries, max_wait=retry_args.max_retry_wait)
cls._THROTTLED_SECS.inc(math.ceil(sleep_seconds))
time.sleep(sleep_seconds)
10 changes: 10 additions & 0 deletions sdks/python/apache_beam/io/gcp/gcsio_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,16 @@ def setUp(self):
self.client = FakeGcsClient()
self.gcs = gcsio.GcsIO(self.client)

def test_num_retries(self):
# BEAM-7424: update num_retries accordingly if storage_client is
# regenerated.
self.assertEqual(gcsio.GcsIO().client.num_retries, 20)

def test_retry_func(self):
# BEAM-7667: update retry_func accordingly if storage_client is
# regenerated.
self.assertIsNotNone(gcsio.GcsIO().client.retry_func)

def test_exists(self):
file_name = 'gs:https://gcsio-test/dummy_file'
file_size = 1234
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

from apitools.base.py import base_api

from apache_beam.io.gcp.gcsio_overrides import GcsIOOverrides
from apache_beam.io.gcp.internal.clients.storage import \
storage_v1_messages as messages

Expand Down Expand Up @@ -56,6 +57,7 @@ def __init__(self, url='', credentials=None,
credentials_args=credentials_args,
default_global_params=default_global_params,
additional_http_headers=additional_http_headers,
retry_func=GcsIOOverrides.retry_func,
response_encoding=response_encoding)
self.bucketAccessControls = self.BucketAccessControlsService(self)
self.buckets = self.BucketsService(self)
Expand Down

0 comments on commit 04119cc

Please sign in to comment.