Skip to content

Commit

Permalink
[FLINK-15051][state-backends] Fix typos in RocksDBStateBackend and Ro…
Browse files Browse the repository at this point in the history
…cksDBNativeMetricMonitor methods.

This closes apache#10420
  • Loading branch information
StephanEwen committed Dec 4, 2019
1 parent c79ca44 commit 9829b9c
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public RocksDBNativeMetricMonitor(
*/
void registerColumnFamily(String columnFamilyName, ColumnFamilyHandle handle) {

boolean columnFamilyAsVariable = options.isColumnFaminlyAsVariable();
boolean columnFamilyAsVariable = options.isColumnFamilyAsVariable();
MetricGroup group = columnFamilyAsVariable
? metricGroup.addGroup(COLUMN_FAMILY_KEY, columnFamilyName)
: metricGroup.addGroup(columnFamilyName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,7 @@ public boolean isEnabled() {
*
* @return true is column family to expose variable, false otherwise.
*/
public boolean isColumnFaminlyAsVariable() {
public boolean isColumnFamilyAsVariable() {
return this.columnFamilyAsVariable;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public enum PriorityQueueStateType {
/** Flag whether the native library has been loaded. */
private static boolean rocksDbInitialized = false;

private static final int UNDEFINED_NUMBER_OF_TRANSFERING_THREADS = -1;
private static final int UNDEFINED_NUMBER_OF_TRANSFER_THREADS = -1;

// ------------------------------------------------------------------------

Expand All @@ -136,7 +136,7 @@ public enum PriorityQueueStateType {
private final TernaryBoolean enableIncrementalCheckpointing;

/** Thread number used to transfer (download and upload) state, default value: 1. */
private int numberOfTransferingThreads;
private int numberOfTransferThreads;

/**
* This determines if compaction filter to cleanup state with TTL is enabled.
Expand Down Expand Up @@ -262,7 +262,7 @@ public RocksDBStateBackend(StateBackend checkpointStreamBackend) {
public RocksDBStateBackend(StateBackend checkpointStreamBackend, TernaryBoolean enableIncrementalCheckpointing) {
this.checkpointStreamBackend = checkNotNull(checkpointStreamBackend);
this.enableIncrementalCheckpointing = enableIncrementalCheckpointing;
this.numberOfTransferingThreads = UNDEFINED_NUMBER_OF_TRANSFERING_THREADS;
this.numberOfTransferThreads = UNDEFINED_NUMBER_OF_TRANSFER_THREADS;
// for now, we use still the heap-based implementation as default
this.priorityQueueStateType = PriorityQueueStateType.HEAP;
this.defaultMetricOptions = new RocksDBNativeMetricOptions();
Expand Down Expand Up @@ -303,10 +303,10 @@ private RocksDBStateBackend(RocksDBStateBackend original, Configuration config,
this.enableIncrementalCheckpointing = original.enableIncrementalCheckpointing.resolveUndefined(
config.getBoolean(CheckpointingOptions.INCREMENTAL_CHECKPOINTS));

if (original.numberOfTransferingThreads == UNDEFINED_NUMBER_OF_TRANSFERING_THREADS) {
this.numberOfTransferingThreads = config.getInteger(CHECKPOINT_TRANSFER_THREAD_NUM);
if (original.numberOfTransferThreads == UNDEFINED_NUMBER_OF_TRANSFER_THREADS) {
this.numberOfTransferThreads = config.getInteger(CHECKPOINT_TRANSFER_THREAD_NUM);
} else {
this.numberOfTransferingThreads = original.numberOfTransferingThreads;
this.numberOfTransferThreads = original.numberOfTransferThreads;
}

this.enableTtlCompactionFilter = original.enableTtlCompactionFilter
Expand Down Expand Up @@ -513,7 +513,7 @@ public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
cancelStreamRegistry
).setEnableIncrementalCheckpointing(isIncrementalCheckpointsEnabled())
.setEnableTtlCompactionFilter(isTtlCompactionFilterEnabled())
.setNumberOfTransferingThreads(getNumberOfTransferingThreads())
.setNumberOfTransferingThreads(getNumberOfTransferThreads())
.setNativeMetricOptions(getMemoryWatcherOptions());
return builder.build();
}
Expand Down Expand Up @@ -821,20 +821,36 @@ public RocksDBNativeMetricOptions getMemoryWatcherOptions() {
/**
* Gets the number of threads used to transfer files while snapshotting/restoring.
*/
public int getNumberOfTransferingThreads() {
return numberOfTransferingThreads == UNDEFINED_NUMBER_OF_TRANSFERING_THREADS ?
CHECKPOINT_TRANSFER_THREAD_NUM.defaultValue() : numberOfTransferingThreads;
public int getNumberOfTransferThreads() {
return numberOfTransferThreads == UNDEFINED_NUMBER_OF_TRANSFER_THREADS ?
CHECKPOINT_TRANSFER_THREAD_NUM.defaultValue() : numberOfTransferThreads;
}

/**
* Sets the number of threads used to transfer files while snapshotting/restoring.
*
* @param numberOfTransferingThreads The number of threads used to transfer files while snapshotting/restoring.
* @param numberOfTransferThreads The number of threads used to transfer files while snapshotting/restoring.
*/
public void setNumberOfTransferingThreads(int numberOfTransferingThreads) {
Preconditions.checkArgument(numberOfTransferingThreads > 0,
public void setNumberOfTransferThreads(int numberOfTransferThreads) {
Preconditions.checkArgument(numberOfTransferThreads > 0,
"The number of threads used to transfer files in RocksDBStateBackend should be greater than zero.");
this.numberOfTransferingThreads = numberOfTransferingThreads;
this.numberOfTransferThreads = numberOfTransferThreads;
}

/**
* @deprecated Typo in method name. Use {@link #getNumberOfTransferThreads} instead.
*/
@Deprecated
public int getNumberOfTransferingThreads() {
return getNumberOfTransferThreads();
}

/**
* @deprecated Typo in method name. Use {@link #setNumberOfTransferThreads(int)} instead.
*/
@Deprecated
public void setNumberOfTransferingThreads(int numberOfTransferingThreads) {
setNumberOfTransferThreads(numberOfTransferingThreads);
}

// ------------------------------------------------------------------------
Expand All @@ -847,7 +863,7 @@ public String toString() {
"checkpointStreamBackend=" + checkpointStreamBackend +
", localRocksDbDirectories=" + Arrays.toString(localRocksDbDirectories) +
", enableIncrementalCheckpointing=" + enableIncrementalCheckpointing +
", numberOfTransferingThreads=" + numberOfTransferingThreads +
", numberOfTransferThreads=" + numberOfTransferThreads +
'}';
}

Expand Down

0 comments on commit 9829b9c

Please sign in to comment.