Skip to content

Commit

Permalink
[FLINK-28890][table] Fix semantic of latestLoadTime in caching lookup…
Browse files Browse the repository at this point in the history
… function
  • Loading branch information
PatrickRen committed Sep 23, 2022
1 parent a54b2a8 commit 24c685a
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ public CompletableFuture<Collection<RowData>> asyncLookup(RowData keyRow) {
if (cachedValues != null) {
return CompletableFuture.completedFuture(cachedValues);
} else {
long loadStartTime = System.currentTimeMillis();
return delegate.asyncLookup(keyRow)
.whenComplete(
(lookupValues, throwable) -> {
Expand All @@ -99,8 +100,8 @@ public CompletableFuture<Collection<RowData>> asyncLookup(RowData keyRow) {
String.format("Failed to lookup key '%s'", keyRow),
throwable);
}
updateLatestLoadTime(System.currentTimeMillis() - loadStartTime);
loadCounter.inc();
updateLatestLoadTime();
Collection<RowData> cachingValues = lookupValues;
if (lookupValues == null || lookupValues.isEmpty()) {
cachingValues = Collections.emptyList();
Expand All @@ -124,10 +125,10 @@ public LookupCache getCache() {
}

// --------------------------------- Helper functions ----------------------------
private void updateLatestLoadTime() {
private synchronized void updateLatestLoadTime(long loadTime) {
if (latestLoadTime == UNINITIALIZED) {
cacheMetricGroup.latestLoadTimeGauge(() -> latestLoadTime);
}
latestLoadTime = System.currentTimeMillis();
latestLoadTime = loadTime;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -159,9 +159,10 @@ private Collection<RowData> lookupByDelegate(RowData keyRow) throws IOException
Preconditions.checkState(
delegate != null,
"User's lookup function can't be null, if there are possible cache misses.");
long loadStart = System.currentTimeMillis();
Collection<RowData> lookupValues = delegate.lookup(keyRow);
updateLatestLoadTime(System.currentTimeMillis() - loadStart);
loadCounter.inc();
updateLatestLoadTime();
return lookupValues;
} catch (Exception e) {
// TODO: Should implement retry on failure logic as proposed in FLIP-234
Expand All @@ -170,7 +171,7 @@ private Collection<RowData> lookupByDelegate(RowData keyRow) throws IOException
}
}

private void updateLatestLoadTime() {
private void updateLatestLoadTime(long loadTime) {
checkNotNull(
cacheMetricGroup,
"Could not register metric '%s' as cache metric group is not initialized",
Expand All @@ -179,6 +180,6 @@ private void updateLatestLoadTime() {
if (latestLoadTime == UNINITIALIZED) {
cacheMetricGroup.latestLoadTimeGauge(() -> latestLoadTime);
}
latestLoadTime = System.currentTimeMillis();
latestLoadTime = loadTime;
}
}

0 comments on commit 24c685a

Please sign in to comment.