Skip to content

Commit

Permalink
[FLINK-28948][table] Increase test coverage for lookup full caching +…
Browse files Browse the repository at this point in the history
… fix metrics
  • Loading branch information
SmirAlex authored and PatrickRen committed Sep 1, 2022
1 parent 655184c commit 20e00fd
Show file tree
Hide file tree
Showing 11 changed files with 590 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@
public class InternalCacheMetricGroup extends ProxyMetricGroup<MetricGroup>
implements CacheMetricGroup {

public static final long UNINITIALIZED = -1;

/**
* Creates a subgroup with the specified subgroup name under the parent group. Metrics will be
* registered under the new created subgroup.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@

import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.groups.CacheMetricGroup;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.table.connector.source.lookup.LookupOptions;
Expand Down Expand Up @@ -371,51 +369,4 @@ private CompletableFuture<Void> runAsync(Runnable runnable, ExecutorService exec
}),
executor);
}

// -------------------------- Helper classes ------------------------
private static class InterceptingCacheMetricGroup extends UnregisteredMetricsGroup
implements CacheMetricGroup {
private Counter hitCounter;
private Counter missCounter;
private Counter loadCounter;
private Counter numLoadFailuresCounter;
private Gauge<Long> latestLoadTimeGauge;
private Gauge<Long> numCachedRecordsGauge;
private Gauge<Long> numCachedBytesGauge;

@Override
public void hitCounter(Counter hitCounter) {
this.hitCounter = hitCounter;
}

@Override
public void missCounter(Counter missCounter) {
this.missCounter = missCounter;
}

@Override
public void loadCounter(Counter loadCounter) {
this.loadCounter = loadCounter;
}

@Override
public void numLoadFailuresCounter(Counter numLoadFailuresCounter) {
this.numLoadFailuresCounter = numLoadFailuresCounter;
}

@Override
public void latestLoadTimeGauge(Gauge<Long> latestLoadTimeGauge) {
this.latestLoadTimeGauge = latestLoadTimeGauge;
}

@Override
public void numCachedRecordsGauge(Gauge<Long> numCachedRecordsGauge) {
this.numCachedRecordsGauge = numCachedRecordsGauge;
}

@Override
public void numCachedBytesGauge(Gauge<Long> numCachedBytesGauge) {
this.numCachedBytesGauge = numCachedBytesGauge;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.connector.source.lookup.cache;

import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.groups.CacheMetricGroup;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;

/** {@link CacheMetricGroup} that intercepts all registered metrics. */
public class InterceptingCacheMetricGroup extends UnregisteredMetricsGroup
implements CacheMetricGroup {
public Counter hitCounter;
public Counter missCounter;
public Counter loadCounter;
public Counter numLoadFailuresCounter;
public Gauge<Long> latestLoadTimeGauge;
public Gauge<Long> numCachedRecordsGauge;
public Gauge<Long> numCachedBytesGauge;

@Override
public void hitCounter(Counter hitCounter) {
this.hitCounter = hitCounter;
}

@Override
public void missCounter(Counter missCounter) {
this.missCounter = missCounter;
}

@Override
public void loadCounter(Counter loadCounter) {
this.loadCounter = loadCounter;
}

@Override
public void numLoadFailuresCounter(Counter numLoadFailuresCounter) {
this.numLoadFailuresCounter = numLoadFailuresCounter;
}

@Override
public void latestLoadTimeGauge(Gauge<Long> latestLoadTimeGauge) {
this.latestLoadTimeGauge = latestLoadTimeGauge;
}

@Override
public void numCachedRecordsGauge(Gauge<Long> numCachedRecordsGauge) {
this.numCachedRecordsGauge = numCachedRecordsGauge;
}

@Override
public void numCachedBytesGauge(Gauge<Long> numCachedBytesGauge) {
this.numCachedBytesGauge = numCachedBytesGauge;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@
import org.apache.flink.table.planner.runtime.utils.FailingCollectionSource;
import org.apache.flink.table.planner.utils.FilterUtils;
import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
import org.apache.flink.table.runtime.functions.table.fullcache.FullCacheTestInputFormat;
import org.apache.flink.table.runtime.functions.table.fullcache.inputformat.FullCacheTestInputFormat;
import org.apache.flink.table.runtime.generated.GeneratedProjection;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.BigIntType;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import java.util.Collection;
import java.util.Collections;

import static org.apache.flink.runtime.metrics.groups.InternalCacheMetricGroup.UNINITIALIZED;
import static org.apache.flink.util.Preconditions.checkNotNull;

/**
Expand All @@ -54,7 +55,6 @@ public class CachingLookupFunction extends LookupFunction {

// Constants
public static final String LOOKUP_CACHE_METRIC_GROUP_NAME = "cache";
private static final long UNINITIALIZED = -1;

// The actual user-provided lookup function
@Nullable private final LookupFunction delegate;
Expand Down Expand Up @@ -101,11 +101,12 @@ public void open(FunctionContext context) throws Exception {
cacheMetricGroup =
new InternalCacheMetricGroup(
context.getMetricGroup(), LOOKUP_CACHE_METRIC_GROUP_NAME);
loadCounter = new SimpleCounter();
cacheMetricGroup.loadCounter(loadCounter);
numLoadFailuresCounter = new SimpleCounter();
cacheMetricGroup.numLoadFailuresCounter(numLoadFailuresCounter);

if (!(cache instanceof LookupFullCache)) {
loadCounter = new SimpleCounter();
cacheMetricGroup.loadCounter(loadCounter);
numLoadFailuresCounter = new SimpleCounter();
cacheMetricGroup.numLoadFailuresCounter(numLoadFailuresCounter);
}
// Initialize cache and the delegating function
cache.open(cacheMetricGroup);
if (cache instanceof LookupFullCache) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.locks.ReentrantLock;

import static org.apache.flink.runtime.metrics.groups.InternalCacheMetricGroup.UNINITIALIZED;

/**
* Abstract task that loads data in Full cache from source provided by {@link ScanRuntimeProvider}.
*/
Expand All @@ -53,23 +55,31 @@ public abstract class CacheLoader extends AbstractRichFunction implements Runnab
// Cache metrics
private transient Counter loadCounter;
private transient Counter loadFailuresCounter;
private transient volatile long latestLoadTimeMs;
private transient volatile long latestLoadTimeMs = UNINITIALIZED;

protected abstract void reloadCache() throws Exception;

@Override
public void open(Configuration parameters) throws Exception {
firstLoadLatch = new CountDownLatch(1);
loadCounter = new ThreadSafeSimpleCounter();
loadFailuresCounter = new ThreadSafeSimpleCounter();
}

public void open(CacheMetricGroup cacheMetricGroup) {
if (loadCounter == null) {
loadCounter = new ThreadSafeSimpleCounter();
}
if (loadFailuresCounter == null) {
loadFailuresCounter = new ThreadSafeSimpleCounter();
}
if (cache == null) {
cache = new ConcurrentHashMap<>();
}
// Register metrics
cacheMetricGroup.loadCounter(loadCounter);
cacheMetricGroup.numLoadFailuresCounter(loadFailuresCounter);
cacheMetricGroup.numCachedRecordsGauge(() -> (long) cache.size());
cacheMetricGroup.latestLoadTimeGauge(() -> latestLoadTimeMs);
// TODO support metric numCachedBytesGauge
}

public ConcurrentHashMap<RowData, Collection<RowData>> getCache() {
Expand Down Expand Up @@ -111,6 +121,8 @@ public void run() {

@Override
public void close() throws Exception {
cache.clear();
if (cache != null) {
cache.clear();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
package org.apache.flink.table.runtime.functions.table.lookup.fullcache;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.metrics.ThreadSafeSimpleCounter;
import org.apache.flink.metrics.groups.CacheMetricGroup;
import org.apache.flink.table.connector.source.lookup.LookupOptions.LookupCacheType;
import org.apache.flink.table.connector.source.lookup.cache.LookupCache;
Expand All @@ -39,13 +42,21 @@ public class LookupFullCache implements LookupCache {
private transient volatile ReloadTriggerContext reloadTriggerContext;
private transient volatile Throwable reloadFailCause;

// Cache metrics
private transient Counter hitCounter; // equals to number of requests

public LookupFullCache(CacheLoader cacheLoader, CacheReloadTrigger reloadTrigger) {
this.cacheLoader = Preconditions.checkNotNull(cacheLoader);
this.reloadTrigger = Preconditions.checkNotNull(reloadTrigger);
}

@Override
public synchronized void open(CacheMetricGroup metricGroup) {
if (hitCounter == null) {
hitCounter = new ThreadSafeSimpleCounter();
}
metricGroup.hitCounter(hitCounter);
metricGroup.missCounter(new SimpleCounter()); // always zero
cacheLoader.open(metricGroup);
}

Expand Down Expand Up @@ -73,7 +84,10 @@ public Collection<RowData> getIfPresent(RowData key) {
if (reloadFailCause != null) {
throw new RuntimeException(reloadFailCause);
}
return cacheLoader.getCache().getOrDefault(key, Collections.emptyList());
Collection<RowData> result =
cacheLoader.getCache().getOrDefault(key, Collections.emptyList());
hitCounter.inc();
return result;
}

@Override
Expand Down
Loading

0 comments on commit 20e00fd

Please sign in to comment.