Skip to content

Commit

Permalink
[FLINK-15146][core][ttl] Fix check that incremental cleanup size must…
Browse files Browse the repository at this point in the history
… be greater than zero.

At the moment, we allow zero value for the TTL incremental cleanup size.
Although technically zero value will not break the cleanup but practically there will be no cleanup.
We change cleanupSize to be strictly greater than zero to avoid confusion.
  • Loading branch information
hehuiyuan authored and Myasuka committed Apr 10, 2021
1 parent 023568b commit 4dd8c53
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -413,8 +413,8 @@ public static class IncrementalCleanupStrategy implements CleanupStrategies.Clea

private IncrementalCleanupStrategy(int cleanupSize, boolean runCleanupForEveryRecord) {
Preconditions.checkArgument(
cleanupSize >= 0,
"Number of incrementally cleaned up state entries cannot be negative.");
cleanupSize > 0,
"Number of incrementally cleaned up state entries should be positive.");
this.cleanupSize = cleanupSize;
this.runCleanupForEveryRecord = runCleanupForEveryRecord;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,12 @@
import org.apache.flink.api.common.state.StateTtlConfig.RocksdbCompactFilterCleanupStrategy;
import org.apache.flink.api.common.time.Time;

import org.junit.Assert;
import org.junit.Test;

import java.util.Arrays;
import java.util.List;

import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
Expand Down Expand Up @@ -71,4 +75,20 @@ public void testStateTtlConfigBuildWithCleanupInBackground() {
assertThat(incrementalCleanupStrategy.runCleanupForEveryRecord(), is(false));
assertThat(rocksdbCleanupStrategy.getQueryTimeAfterNumEntries(), is(1000L));
}

@Test
public void testStateTtlConfigBuildWithNonPositiveCleanupIncrementalSize() {
List<Integer> illegalCleanUpSizes = Arrays.asList(0, -2);

for (Integer illegalCleanUpSize : illegalCleanUpSizes) {
try {
StateTtlConfig ttlConfig =
StateTtlConfig.newBuilder(Time.seconds(1))
.cleanupIncrementally(illegalCleanUpSize, false)
.build();
Assert.fail();
} catch (IllegalArgumentException e) {
}
}
}
}

0 comments on commit 4dd8c53

Please sign in to comment.