Skip to content

Commit

Permalink
[FLINK-26126][metrics] Introduce new counter metrics for sending reco…
Browse files Browse the repository at this point in the history
…rds by SinkWriter.

We found that the new sink v2 interface will have a wrong numRecordsOut metric for the sink writers. We send a fixed number of records to the source, but the numRecordsOut of the sink continues to increase by the time.

The problem is that both the SinkWriterOperator and the KafkaWriter are using the same counter metric for counting the outgoing records. Same records sent by the SinkWriterOperator to the post topology and written by the KafkaWriter to the downstream system will be count twice in the same counter metric.
  • Loading branch information
JingGe authored and fapaul committed Mar 7, 2022
1 parent fb05fab commit 91c2b8d
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,20 @@ public interface SinkWriterMetricGroup extends OperatorMetricGroup {
/** The total number of records failed to send. */
Counter getNumRecordsOutErrorsCounter();

/**
* The total number of records have been sent to the downstream system.
*
* <p>Note: this counter will count all records the SinkWriter sent. From SinkWirter's
* perspective, these records have been sent to the downstream system, but the downstream system
* may have issue to perform the persistence action within its scope. Therefore, this count may
* include the number of records that are failed to write by the downstream system, which should
* be counted by {@link #getNumRecordsOutErrorsCounter()}.
*/
Counter getNumRecordsSendCounter();

/** The total number of output send bytes since the task started. */
Counter getNumBytesSendCounter();

/**
* Sets an optional gauge for the time it takes to send the last record.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ public static String currentInputWatermarkName(int index) {
// FLIP-33 sink
public static final String NUM_RECORDS_OUT_ERRORS = "numRecordsOutErrors";
public static final String CURRENT_SEND_TIME = "currentSendTime";
public static final String NUM_RECORDS_SEND = "numRecordsSend";
public static final String NUM_BYTES_SEND = "numBytesSend";

// FLIP-33 source
public static final String NUM_RECORDS_IN_ERRORS = "numRecordsInErrors";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,16 @@ public class InternalSinkWriterMetricGroup extends ProxyMetricGroup<MetricGroup>
implements SinkWriterMetricGroup {

private final Counter numRecordsOutErrors;
private final Counter numRecordsWritten;
private final Counter numBytesWritten;
private final OperatorIOMetricGroup operatorIOMetricGroup;

private InternalSinkWriterMetricGroup(
MetricGroup parentMetricGroup, OperatorIOMetricGroup operatorIOMetricGroup) {
super(parentMetricGroup);
numRecordsOutErrors = parentMetricGroup.counter(MetricNames.NUM_RECORDS_OUT_ERRORS);
numRecordsWritten = parentMetricGroup.counter(MetricNames.NUM_RECORDS_SEND);
numBytesWritten = parentMetricGroup.counter(MetricNames.NUM_BYTES_SEND);
this.operatorIOMetricGroup = operatorIOMetricGroup;
}

Expand Down Expand Up @@ -71,6 +75,16 @@ public Counter getNumRecordsOutErrorsCounter() {
return numRecordsOutErrors;
}

@Override
public Counter getNumRecordsSendCounter() {
return numRecordsWritten;
}

@Override
public Counter getNumBytesSendCounter() {
return numBytesWritten;
}

@Override
public void setCurrentSendTimeGauge(Gauge<Long> currentSendTimeGauge) {
parentMetricGroup.gauge(MetricNames.CURRENT_SEND_TIME, currentSendTimeGauge);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,7 @@ public void testMetrics(
externalContext,
jobClient.getJobID(),
sinkName,
MetricNames.NUM_RECORDS_SEND,
testRecords.size());
} catch (Exception e) {
// skip failed assert try
Expand Down Expand Up @@ -531,16 +532,23 @@ private boolean compareSinkMetrics(
DataStreamSinkExternalContext<T> context,
JobID jobId,
String sinkName,
long allRecordSize)
String metricsName,
long expectedSize)
throws Exception {
double sumNumRecordsOut =
metricQuerier.getAggregatedMetricsByRestAPI(
testEnv.getRestEndpoint(),
jobId,
sinkName,
MetricNames.IO_NUM_RECORDS_OUT,
metricsName,
getSinkMetricFilter(context));
return Precision.equals(allRecordSize, sumNumRecordsOut);

if (Precision.equals(expectedSize, sumNumRecordsOut)) {
return true;
} else {
LOG.info("expected:<{}> but was <{}>({})", expectedSize, sumNumRecordsOut, metricsName);
return false;
}
}

/** Sort the list. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.flink.api.connector.sink.Sink;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Metric;
import org.apache.flink.metrics.groups.OperatorMetricGroup;
import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
Expand Down Expand Up @@ -53,7 +52,12 @@

/** Tests whether all provided metrics of a {@link Sink} are of the expected values (FLIP-33). */
public class SinkMetricsITCase extends TestLogger {

private static final String TEST_SINK_NAME = "MetricTestSink";
// please refer to SinkTransformationTranslator#WRITER_NAME
private static final String DEFAULT_WRITER_NAME = "Writer";
private static final int DEFAULT_PARALLELISM = 4;

@Rule public final SharedObjects sharedObjects = SharedObjects.create();
private static final InMemoryReporter reporter = InMemoryReporter.createWithRetainedMetrics();

Expand Down Expand Up @@ -96,8 +100,12 @@ public void testMetrics() throws Exception {
}
return i;
})
.sinkTo(TestSink.newBuilder().setWriter(new MetricWriter()).build())
.name("MetricTestSink");
.sinkTo(
TestSink.newBuilder()
.setDefaultCommitter()
.setWriter(new MetricWriter())
.build())
.name(TEST_SINK_NAME);
JobClient jobClient = env.executeAsync();
final JobID jobId = jobClient.getJobID();

Expand All @@ -115,23 +123,24 @@ public void testMetrics() throws Exception {
private void assertSinkMetrics(
JobID jobId, long processedRecordsPerSubtask, int parallelism, int numSplits) {
List<OperatorMetricGroup> groups =
reporter.findOperatorMetricGroups(jobId, "MetricTestSink");
reporter.findOperatorMetricGroups(
jobId, TEST_SINK_NAME + ": " + DEFAULT_WRITER_NAME);
assertThat(groups, hasSize(parallelism));

int subtaskWithMetrics = 0;
for (OperatorMetricGroup group : groups) {
Map<String, Metric> metrics = reporter.getMetricsByGroup(group);
// there are only 2 splits assigned; so two groups will not update metrics
if (group.getIOMetricGroup().getNumRecordsOutCounter().getCount() == 0) {
if (group.getIOMetricGroup().getNumRecordsOutCounter().getCount() != 0) {
continue;
}
subtaskWithMetrics++;
// I/O metrics
// SinkWriterMetricGroup metrics
assertThat(
group.getIOMetricGroup().getNumRecordsOutCounter(),
metrics.get(MetricNames.NUM_RECORDS_SEND),
isCounter(equalTo(processedRecordsPerSubtask)));
assertThat(
group.getIOMetricGroup().getNumBytesOutCounter(),
metrics.get(MetricNames.NUM_BYTES_SEND),
isCounter(
equalTo(
processedRecordsPerSubtask
Expand All @@ -156,24 +165,22 @@ private static class MetricWriter extends TestSink.DefaultSinkWriter<Long> {
static final long RECORD_SIZE_IN_BYTES = 10;
private SinkWriterMetricGroup metricGroup;
private long sendTime;
private Counter recordsOutCounter;

@Override
public void init(Sink.InitContext context) {
this.metricGroup = context.metricGroup();
this.recordsOutCounter = metricGroup.getIOMetricGroup().getNumRecordsOutCounter();
metricGroup.setCurrentSendTimeGauge(() -> sendTime);
}

@Override
public void write(Long element, Context context) {
super.write(element, context);
sendTime = element * BASE_SEND_TIME;
recordsOutCounter.inc();
metricGroup.getNumRecordsSendCounter().inc();
if (element % 2 == 0) {
metricGroup.getNumRecordsOutErrorsCounter().inc();
}
metricGroup.getIOMetricGroup().getNumBytesOutCounter().inc(RECORD_SIZE_IN_BYTES);
metricGroup.getNumBytesSendCounter().inc(RECORD_SIZE_IN_BYTES);
}
}
}

0 comments on commit 91c2b8d

Please sign in to comment.