Skip to content

Commit

Permalink
[FLINK-16611][metrics][datadog] Send report in chunks
Browse files Browse the repository at this point in the history
  • Loading branch information
swhelan091 committed May 20, 2020
1 parent e991de1 commit df16e42
Show file tree
Hide file tree
Showing 6 changed files with 31 additions and 9 deletions.
2 changes: 2 additions & 0 deletions docs/monitoring/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -770,6 +770,7 @@ Parameters:
- `proxyHost` - (optional) The proxy host to use when sending to Datadog.
- `proxyPort` - (optional) The proxy port to use when sending to Datadog, defaults to 8080.
- `dataCenter` - (optional) The data center (`EU`/`US`) to connect to, defaults to `US`.
- `maxMetricsPerRequest` - (optional) The maximum number of metrics to include in each request, defaults to 2000.

Example configuration:

Expand All @@ -781,6 +782,7 @@ metrics.reporter.dghttp.tags: myflinkapp,prod
metrics.reporter.dghttp.proxyHost: my.web.proxy.com
metrics.reporter.dghttp.proxyPort: 8080
metrics.reporter.dhhttp.dataCenter: US
metrics.reporter.dhhttp.maxMetricsPerRequest: 2000

{% endhighlight %}

Expand Down
2 changes: 2 additions & 0 deletions docs/monitoring/metrics.zh.md
Original file line number Diff line number Diff line change
Expand Up @@ -770,6 +770,7 @@ Parameters:
- `proxyHost` - (optional) The proxy host to use when sending to Datadog.
- `proxyPort` - (optional) The proxy port to use when sending to Datadog, defaults to 8080.
- `dataCenter` - (optional) The data center (`EU`/`US`) to connect to, defaults to `US`.
- `maxMetricsPerRequest` - (optional) The maximum number of metrics to include in each request, defaults to 2000.

Example configuration:

Expand All @@ -781,6 +782,7 @@ metrics.reporter.dghttp.tags: myflinkapp,prod
metrics.reporter.dghttp.proxyHost: my.web.proxy.com
metrics.reporter.dghttp.proxyPort: 8080
metrics.reporter.dhhttp.dataCenter: US
metrics.reporter.dhhttp.maxMetricsPerRequest: 2000

{% endhighlight %}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ public Number getMetricValue() {
return difference;
}

@Override
public void ackReport() {
lastReportCount = currentReportCount;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,4 +78,7 @@ public List<List<Number>> getPoints() {

@JsonIgnore
public abstract Number getMetricValue();

public void ackReport() {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ public DSeries() {
series = new ArrayList<>();
}

public DSeries(List<DMetric> series) {
this.series = series;
}

public void addGauge(DGauge gauge) {
series.add(gauge);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public class DatadogHttpReporter implements MetricReporter, Scheduled {

private DatadogHttpClient client;
private List<String> configTags;
private int maxMetricsPerRequestValue;

private final Clock clock = () -> System.currentTimeMillis() / 1000L;

Expand All @@ -64,6 +65,7 @@ public class DatadogHttpReporter implements MetricReporter, Scheduled {
public static final String PROXY_PORT = "proxyPort";
public static final String DATA_CENTER = "dataCenter";
public static final String TAGS = "tags";
public static final String MAX_METRICS_PER_REQUEST = "maxMetricsPerRequest";

@Override
public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group) {
Expand Down Expand Up @@ -113,14 +115,15 @@ public void open(MetricConfig config) {
String proxyHost = config.getString(PROXY_HOST, null);
Integer proxyPort = config.getInteger(PROXY_PORT, 8080);
String rawDataCenter = config.getString(DATA_CENTER, "US");
maxMetricsPerRequestValue = config.getInteger(MAX_METRICS_PER_REQUEST, 2000);
DataCenter dataCenter = DataCenter.valueOf(rawDataCenter);
String tags = config.getString(TAGS, "");

client = new DatadogHttpClient(apiKey, proxyHost, proxyPort, dataCenter, true);

configTags = getTagsFromConfig(tags);

LOGGER.info("Configured DatadogHttpReporter with {tags={}, proxyHost={}, proxyPort={}, dataCenter={}", tags, proxyHost, proxyPort, dataCenter);
LOGGER.info("Configured DatadogHttpReporter with {tags={}, proxyHost={}, proxyPort={}, dataCenter={}, maxMetricsPerRequest={}", tags, proxyHost, proxyPort, dataCenter, maxMetricsPerRequestValue);
}

@Override
Expand All @@ -137,14 +140,21 @@ public void report() {
counters.values().forEach(request::addCounter);
meters.values().forEach(request::addMeter);

try {
client.send(request);
counters.values().forEach(DCounter::ackReport);
LOGGER.debug("Reported series with size {}.", request.getSeries().size());
} catch (SocketTimeoutException e) {
LOGGER.warn("Failed reporting metrics to Datadog because of socket timeout: {}", e.getMessage());
} catch (Exception e) {
LOGGER.warn("Failed reporting metrics to Datadog.", e);
int totalMetrics = request.getSeries().size();
int fromIndex = 0;
while (fromIndex < totalMetrics) {
int toIndex = Math.min(fromIndex + maxMetricsPerRequestValue, totalMetrics);
try {
DSeries chunk = new DSeries(request.getSeries().subList(fromIndex, toIndex));
client.send(chunk);
chunk.getSeries().forEach(DMetric::ackReport);
LOGGER.debug("Reported series with size {}.", chunk.getSeries().size());
} catch (SocketTimeoutException e) {
LOGGER.warn("Failed reporting metrics to Datadog because of socket timeout: {}", e.getMessage());
} catch (Exception e) {
LOGGER.warn("Failed reporting metrics to Datadog.", e);
}
fromIndex = toIndex;
}
}

Expand Down

0 comments on commit df16e42

Please sign in to comment.