Skip to content

Commit

Permalink
Create scanner after quota check
Browse files Browse the repository at this point in the history
  • Loading branch information
Hernan Gelaf-Romer committed Jul 8, 2024
1 parent 610af48 commit 9d17135
Showing 1 changed file with 41 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,18 @@ public void run() {
}
}

static class QuotaAndRegionScannerHolder {
private final String scannerName;
private final RegionScannerHolder rsh;
private final OperationQuota quota;

QuotaAndRegionScannerHolder(String scannerName, RegionScannerHolder rsh, OperationQuota quota) {
this.scannerName = scannerName;
this.rsh = rsh;
this.quota = quota;
}
}

/**
* Holder class which holds the RegionScanner, nextCallSeq and RpcCallbacks together.
*/
Expand Down Expand Up @@ -3135,9 +3147,8 @@ private RegionScannerHolder getRegionScanner(ScanRequest request) throws IOExcep
* @return Pair with scannerName key to use with this new Scanner and its RegionScannerHolder
* value.
*/
private Pair<String, RegionScannerHolder> newRegionScanner(ScanRequest request,
private Pair<String, RegionScannerHolder> newRegionScanner(ScanRequest request, HRegion region,
ScanResponse.Builder builder) throws IOException {
HRegion region = getRegion(request.getRegion());
rejectIfInStandByState(region);
ClientProtos.Scan protoScan = request.getScan();
boolean isLoadingCfsOnDemandSet = protoScan.hasLoadColumnFamiliesOnDemand();
Expand Down Expand Up @@ -3551,24 +3562,13 @@ public ScanResponse scan(final RpcController controller, final ScanRequest reque
}
throw new ServiceException(e);
}

requestCount.increment();
rpcScanRequestCount.increment();
RegionScannerHolder rsh;
QuotaAndRegionScannerHolder qrsh;
ScanResponse.Builder builder = ScanResponse.newBuilder();
String scannerName;
try {
if (request.hasScannerId()) {
// The downstream projects such as AsyncHBase in OpenTSDB need this value. See HBASE-18000
// for more details.
long scannerId = request.getScannerId();
builder.setScannerId(scannerId);
scannerName = toScannerName(scannerId);
rsh = getRegionScanner(request);
} else {
Pair<String, RegionScannerHolder> scannerNameAndRSH = newRegionScanner(request, builder);
scannerName = scannerNameAndRSH.getFirst();
rsh = scannerNameAndRSH.getSecond();
}
qrsh = checkQuotaAndGetRegionScannerHolder(request, builder);
} catch (IOException e) {
if (e == SCANNER_ALREADY_CLOSED) {
// Now we will close scanner automatically if there are no more results for this region but
Expand All @@ -3577,6 +3577,9 @@ public ScanResponse scan(final RpcController controller, final ScanRequest reque
}
throw new ServiceException(e);
}
RegionScannerHolder rsh = qrsh.rsh;
String scannerName = qrsh.scannerName;
OperationQuota quota = qrsh.quota;
if (rsh.fullRegionScan) {
rpcFullScanRequestCount.increment();
}
Expand All @@ -3599,14 +3602,6 @@ public ScanResponse scan(final RpcController controller, final ScanRequest reque
}
return builder.build();
}
OperationQuota quota;
try {
quota = getRpcQuotaManager().checkScanQuota(region, request, maxScannerResultSize,
rsh.getMaxBlockBytesScanned(), rsh.getPrevBlockBytesScannedDifference());
} catch (IOException e) {
addScannerLeaseBack(lease);
throw new ServiceException(e);
}
try {
checkScanNextCallSeq(request, rsh);
} catch (OutOfOrderScannerNextException e) {
Expand Down Expand Up @@ -3985,4 +3980,26 @@ public GetCachedFilesListResponse getCachedFilesList(RpcController controller,
});
return responseBuilder.addAllCachedFiles(fullyCachedFiles).build();
}

private QuotaAndRegionScannerHolder checkQuotaAndGetRegionScannerHolder(ScanRequest request,
ScanResponse.Builder builder) throws IOException {
if (request.hasScannerId()) {
// The downstream projects such as AsyncHBase in OpenTSDB need this value. See HBASE-18000
// for more details.
long scannerId = request.getScannerId();
RegionScannerHolder rsh = getRegionScanner(request);
String scannerName = toScannerName(scannerId);
builder.setScannerId(scannerId);
OperationQuota quota =
getRpcQuotaManager().checkScanQuota(rsh.r, request, maxScannerResultSize,
rsh.getMaxBlockBytesScanned(), rsh.getPrevBlockBytesScannedDifference());
return new QuotaAndRegionScannerHolder(scannerName, rsh, quota);
}

HRegion region = getRegion(request.getRegion());
OperationQuota quota =
getRpcQuotaManager().checkScanQuota(region, request, maxScannerResultSize, 0L, 0L);
Pair<String, RegionScannerHolder> pair = newRegionScanner(request, region, builder);
return new QuotaAndRegionScannerHolder(pair.getFirst(), pair.getSecond(), quota);
}
}

0 comments on commit 9d17135

Please sign in to comment.