Skip to content

Commit

Permalink
[FLINK-28417][table] Add interfaces for LookupCache and default imple…
Browse files Browse the repository at this point in the history
…mentation (apache#20196)
  • Loading branch information
PatrickRen committed Aug 3, 2022
1 parent 1c04475 commit ed8870e
Show file tree
Hide file tree
Showing 8 changed files with 1,040 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.metrics.groups;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.MetricGroup;

/**
* Pre-defined metrics for cache.
*
* <p>Please note that these methods should only be invoked once. Registering a metric with same
* name for multiple times would lead to an undefined behavior.
*/
@PublicEvolving
public interface CacheMetricGroup extends MetricGroup {
/** The number of cache hits. */
void hitCounter(Counter hitCounter);

/** The number of cache misses. */
void missCounter(Counter missCounter);

/** The number of times to load data into cache from external system. */
void loadCounter(Counter loadCounter);

/** The number of load failures. */
void numLoadFailuresCounter(Counter numLoadFailuresCounter);

/** The time spent for the latest load operation. */
void latestLoadTimeGauge(Gauge<Long> latestLoadTimeGauge);

/** The number of records in cache. */
void numCachedRecordsGauge(Gauge<Long> numCachedRecordsGauge);

/** The number of bytes used by cache. */
void numCachedBytesGauge(Gauge<Long> numCachedBytesGauge);
}
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,10 @@ public static SplitEnumeratorMetricGroup createSplitEnumeratorMetricGroup() {
return new UnregisteredSplitEnumeratorMetricGroup();
}

public static CacheMetricGroup createCacheMetricGroup() {
return new UnregisteredCacheMetricGroup();
}

private static class UnregisteredOperatorMetricGroup extends UnregisteredMetricsGroup
implements OperatorMetricGroup {
@Override
Expand Down Expand Up @@ -178,4 +182,28 @@ public <G extends Gauge<Long>> G setUnassignedSplitsGauge(G unassignedSplitsGaug
return null;
}
}

private static class UnregisteredCacheMetricGroup extends UnregisteredMetricsGroup
implements CacheMetricGroup {
@Override
public void hitCounter(Counter hitCounter) {}

@Override
public void missCounter(Counter missCounter) {}

@Override
public void loadCounter(Counter loadCounter) {}

@Override
public void numLoadFailuresCounter(Counter numLoadFailuresCounter) {}

@Override
public void latestLoadTimeGauge(Gauge<Long> latestLoadTimeGauge) {}

@Override
public void numCachedRecordsGauge(Gauge<Long> numCachedRecordsGauge) {}

@Override
public void numCachedBytesGauge(Gauge<Long> numCachedBytesGauge) {}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -114,4 +114,13 @@ public static String currentInputWatermarkName(int index) {
public static final String NUM_SLOW_EXECUTION_VERTICES = "numSlowExecutionVertices";
public static final String NUM_EFFECTIVE_SPECULATIVE_EXECUTIONS =
"numEffectiveSpeculativeExecutions";

// FLIP-221 for caches
public static final String HIT_COUNT = "hitCount";
public static final String MISS_COUNT = "missCount";
public static final String LOAD_COUNT = "loadCount";
public static final String NUM_LOAD_FAILURES = "numLoadFailures";
public static final String LATEST_LOAD_TIME = "latestLoadTime";
public static final String NUM_CACHED_RECORDS = "numCachedRecords";
public static final String NUM_CACHED_BYTES = "numCachedBytes";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.metrics.groups;

import org.apache.flink.annotation.Internal;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.groups.CacheMetricGroup;

import static org.apache.flink.runtime.metrics.MetricNames.HIT_COUNT;
import static org.apache.flink.runtime.metrics.MetricNames.LATEST_LOAD_TIME;
import static org.apache.flink.runtime.metrics.MetricNames.LOAD_COUNT;
import static org.apache.flink.runtime.metrics.MetricNames.MISS_COUNT;
import static org.apache.flink.runtime.metrics.MetricNames.NUM_CACHED_BYTES;
import static org.apache.flink.runtime.metrics.MetricNames.NUM_CACHED_RECORDS;
import static org.apache.flink.runtime.metrics.MetricNames.NUM_LOAD_FAILURES;

/**
* A {@link CacheMetricGroup} which register all cache related metrics under a subgroup of the
* parent metric group.
*/
@Internal
public class InternalCacheMetricGroup extends ProxyMetricGroup<MetricGroup>
implements CacheMetricGroup {

/**
* Creates a subgroup with the specified subgroup name under the parent group. Metrics will be
* registered under the new created subgroup.
*
* <p>For example the hit counter will be registered as "root.cache.hitCount", with {@code
* parentMetricGroup = root} and {@code subGroupName = "cache"}.
*
* @param parentMetricGroup parent metric group of the subgroup
* @param subGroupName name of the subgroup
*/
public InternalCacheMetricGroup(MetricGroup parentMetricGroup, String subGroupName) {
super(parentMetricGroup.addGroup(subGroupName));
}

@Override
public void hitCounter(Counter hitCounter) {
counter(HIT_COUNT, hitCounter);
}

@Override
public void missCounter(Counter missCounter) {
counter(MISS_COUNT, missCounter);
}

@Override
public void loadCounter(Counter loadCounter) {
counter(LOAD_COUNT, loadCounter);
}

@Override
public void numLoadFailuresCounter(Counter numLoadFailuresCounter) {
counter(NUM_LOAD_FAILURES, numLoadFailuresCounter);
}

@Override
public void latestLoadTimeGauge(Gauge<Long> latestLoadTimeGauge) {
gauge(LATEST_LOAD_TIME, latestLoadTimeGauge);
}

@Override
public void numCachedRecordsGauge(Gauge<Long> numCachedRecordsGauge) {
gauge(NUM_CACHED_RECORDS, numCachedRecordsGauge);
}

@Override
public void numCachedBytesGauge(Gauge<Long> numCachedBytesGauge) {
gauge(NUM_CACHED_BYTES, numCachedBytesGauge);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.connector.source.lookup;

import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.description.Description;

import java.time.Duration;

import static org.apache.flink.configuration.description.TextElement.code;

/** Predefined options for lookup table. */
public class LookupOptions {
public static final ConfigOption<LookupCacheType> CACHE_TYPE =
ConfigOptions.key("lookup.cache")
.enumType(LookupCacheType.class)
.defaultValue(LookupCacheType.NONE)
.withDescription(
Description.builder()
.text(
"The caching strategy for this lookup table, including %s, %s and %s",
code(LookupCacheType.NONE.toString()),
code(LookupCacheType.PARTIAL.toString()),
code(LookupCacheType.FULL.toString()))
.build());

public static final ConfigOption<Integer> MAX_RETRIES =
ConfigOptions.key("lookup.max-retries")
.intType()
.defaultValue(3)
.withDescription("The maximum allowed retries if a lookup operation fails");

public static final ConfigOption<Duration> PARTIAL_CACHE_EXPIRE_AFTER_ACCESS =
ConfigOptions.key("lookup.partial-cache.expire-after-access")
.durationType()
.noDefaultValue()
.withDescription("Duration to expire an entry in the cache after accessing");

public static final ConfigOption<Duration> PARTIAL_CACHE_EXPIRE_AFTER_WRITE =
ConfigOptions.key("lookup.partial-cache.expire-after-write")
.durationType()
.noDefaultValue()
.withDescription("Duration to expire an entry in the cache after writing");

public static final ConfigOption<Boolean> PARTIAL_CACHE_CACHE_MISSING_KEY =
ConfigOptions.key("lookup.partial-cache.cache-missing-key")
.booleanType()
.defaultValue(true)
.withDescription(
"Whether to store an empty value into the cache if the lookup key doesn't match any rows in the table");

public static final ConfigOption<Long> PARTIAL_CACHE_MAX_ROWS =
ConfigOptions.key("lookup.partial-cache.max-rows")
.longType()
.noDefaultValue()
.withDescription("The maximum number of rows to store in the cache");

/** Types of the lookup cache. */
public enum LookupCacheType {
NONE,
PARTIAL,
FULL
}
}
Loading

0 comments on commit ed8870e

Please sign in to comment.