Skip to content

Commit

Permalink
[FLINK-11326] Allow negative offsets in window assigners
Browse files Browse the repository at this point in the history
  • Loading branch information
kezhuw authored and aljoscha committed Jan 30, 2019
1 parent e154375 commit 53044a0
Show file tree
Hide file tree
Showing 8 changed files with 148 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,9 @@ public class SlidingEventTimeWindows extends WindowAssigner<Object, TimeWindow>
private final long offset;

protected SlidingEventTimeWindows(long size, long slide, long offset) {
if (offset < 0 || offset >= slide || size <= 0) {
throw new IllegalArgumentException("SlidingEventTimeWindows parameters must satisfy 0 <= offset < slide and size > 0");
if (Math.abs(offset) >= slide || size <= 0) {
throw new IllegalArgumentException("SlidingEventTimeWindows parameters must satisfy " +
"abs(offset) < slide and size > 0");
}

this.size = size;
Expand Down Expand Up @@ -130,8 +131,7 @@ public static SlidingEventTimeWindows of(Time size, Time slide) {
* @return The time policy.
*/
public static SlidingEventTimeWindows of(Time size, Time slide, Time offset) {
return new SlidingEventTimeWindows(size.toMilliseconds(), slide.toMilliseconds(),
offset.toMilliseconds() % slide.toMilliseconds());
return new SlidingEventTimeWindows(size.toMilliseconds(), slide.toMilliseconds(), offset.toMilliseconds());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,9 @@ public class SlidingProcessingTimeWindows extends WindowAssigner<Object, TimeWin
private final long slide;

private SlidingProcessingTimeWindows(long size, long slide, long offset) {
if (offset < 0 || offset >= slide || size <= 0) {
throw new IllegalArgumentException("SlidingProcessingTimeWindows parameters must satisfy 0 <= offset < slide and size > 0");
if (Math.abs(offset) >= slide || size <= 0) {
throw new IllegalArgumentException("SlidingProcessingTimeWindows parameters must satisfy " +
"abs(offset) < slide and size > 0");
}

this.size = size;
Expand Down Expand Up @@ -123,8 +124,7 @@ public static SlidingProcessingTimeWindows of(Time size, Time slide) {
* @return The time policy.
*/
public static SlidingProcessingTimeWindows of(Time size, Time slide, Time offset) {
return new SlidingProcessingTimeWindows(size.toMilliseconds(), slide.toMilliseconds(),
offset.toMilliseconds() % slide.toMilliseconds());
return new SlidingProcessingTimeWindows(size.toMilliseconds(), slide.toMilliseconds(), offset.toMilliseconds());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ public class TumblingEventTimeWindows extends WindowAssigner<Object, TimeWindow>
private final long offset;

protected TumblingEventTimeWindows(long size, long offset) {
if (offset < 0 || offset >= size) {
throw new IllegalArgumentException("TumblingEventTimeWindows parameters must satisfy 0 <= offset < size");
if (Math.abs(offset) >= size) {
throw new IllegalArgumentException("TumblingEventTimeWindows parameters must satisfy abs(offset) < size");
}

this.size = size;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ public class TumblingProcessingTimeWindows extends WindowAssigner<Object, TimeWi
private final long offset;

private TumblingProcessingTimeWindows(long size, long offset) {
if (offset < 0 || offset >= size) {
throw new IllegalArgumentException("TumblingProcessingTimeWindows parameters must satisfy 0 <= offset < size");
if (Math.abs(offset) >= size) {
throw new IllegalArgumentException("TumblingProcessingTimeWindows parameters must satisfy abs(offset) < size");
}

this.size = size;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,36 @@ public void testWindowAssignmentWithOffset() {
timeWindow(5100, 10100)));
}

@Test
public void testWindowAssignmentWithNegativeOffset() {
WindowAssigner.WindowAssignerContext mockContext =
mock(WindowAssigner.WindowAssignerContext.class);

SlidingEventTimeWindows assigner =
SlidingEventTimeWindows.of(Time.milliseconds(5000), Time.milliseconds(1000), Time.milliseconds(-100));

assertThat(assigner.assignWindows("String", 0L, mockContext), containsInAnyOrder(
timeWindow(-4100, 900),
timeWindow(-3100, 1900),
timeWindow(-2100, 2900),
timeWindow(-1100, 3900),
timeWindow(-100, 4900)));

assertThat(assigner.assignWindows("String", 4899L, mockContext), containsInAnyOrder(
timeWindow(-100, 4900),
timeWindow(900, 5900),
timeWindow(1900, 6900),
timeWindow(2900, 7900),
timeWindow(3900, 8900)));

assertThat(assigner.assignWindows("String", 4900L, mockContext), containsInAnyOrder(
timeWindow(900, 5900),
timeWindow(1900, 6900),
timeWindow(2900, 7900),
timeWindow(3900, 8900),
timeWindow(4900, 9900)));
}

@Test
public void testTimeUnits() {
// sanity check with one other time unit
Expand Down Expand Up @@ -141,21 +171,35 @@ public void testInvalidParameters() {
SlidingEventTimeWindows.of(Time.seconds(-2), Time.seconds(1));
fail("should fail");
} catch (IllegalArgumentException e) {
assertThat(e.toString(), containsString("0 <= offset < slide and size > 0"));
assertThat(e.toString(), containsString("abs(offset) < slide and size > 0"));
}

try {
SlidingEventTimeWindows.of(Time.seconds(2), Time.seconds(-1));
fail("should fail");
} catch (IllegalArgumentException e) {
assertThat(e.toString(), containsString("0 <= offset < slide and size > 0"));
assertThat(e.toString(), containsString("abs(offset) < slide and size > 0"));
}

try {
SlidingEventTimeWindows.of(Time.seconds(-20), Time.seconds(10), Time.seconds(-1));
fail("should fail");
} catch (IllegalArgumentException e) {
assertThat(e.toString(), containsString("abs(offset) < slide and size > 0"));
}

try {
SlidingEventTimeWindows.of(Time.seconds(20), Time.seconds(10), Time.seconds(-11));
fail("should fail");
} catch (IllegalArgumentException e) {
assertThat(e.toString(), containsString("abs(offset) < slide and size > 0"));
}

try {
SlidingEventTimeWindows.of(Time.seconds(20), Time.seconds(10), Time.seconds(-1));
SlidingEventTimeWindows.of(Time.seconds(20), Time.seconds(10), Time.seconds(11));
fail("should fail");
} catch (IllegalArgumentException e) {
assertThat(e.toString(), containsString("0 <= offset < slide and size > 0"));
assertThat(e.toString(), containsString("abs(offset) < slide and size > 0"));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,39 @@ public void testWindowAssignmentWithOffset() {
timeWindow(5100, 10100)));
}

@Test
public void testWindowAssignmentWithNegativeOffset() {
WindowAssigner.WindowAssignerContext mockContext =
mock(WindowAssigner.WindowAssignerContext.class);

SlidingProcessingTimeWindows assigner =
SlidingProcessingTimeWindows.of(Time.milliseconds(5000), Time.milliseconds(1000), Time.milliseconds(-100));

when(mockContext.getCurrentProcessingTime()).thenReturn(0L);
assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), containsInAnyOrder(
timeWindow(-4100, 900),
timeWindow(-3100, 1900),
timeWindow(-2100, 2900),
timeWindow(-1100, 3900),
timeWindow(-100, 4900)));

when(mockContext.getCurrentProcessingTime()).thenReturn(4899L);
assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), containsInAnyOrder(
timeWindow(-100, 4900),
timeWindow(900, 5900),
timeWindow(1900, 6900),
timeWindow(2900, 7900),
timeWindow(3900, 8900)));

when(mockContext.getCurrentProcessingTime()).thenReturn(4900L);
assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), containsInAnyOrder(
timeWindow(900, 5900),
timeWindow(1900, 6900),
timeWindow(2900, 7900),
timeWindow(3900, 8900),
timeWindow(4900, 9900)));
}

@Test
public void testTimeUnits() {
// sanity check with one other time unit
Expand Down Expand Up @@ -151,21 +184,35 @@ public void testInvalidParameters() {
SlidingProcessingTimeWindows.of(Time.seconds(-2), Time.seconds(1));
fail("should fail");
} catch (IllegalArgumentException e) {
assertThat(e.toString(), containsString("0 <= offset < slide and size > 0"));
assertThat(e.toString(), containsString("abs(offset) < slide and size > 0"));
}

try {
SlidingProcessingTimeWindows.of(Time.seconds(2), Time.seconds(-1));
fail("should fail");
} catch (IllegalArgumentException e) {
assertThat(e.toString(), containsString("0 <= offset < slide and size > 0"));
assertThat(e.toString(), containsString("abs(offset) < slide and size > 0"));
}

try {
SlidingProcessingTimeWindows.of(Time.seconds(-20), Time.seconds(10), Time.seconds(-1));
fail("should fail");
} catch (IllegalArgumentException e) {
assertThat(e.toString(), containsString("abs(offset) < slide and size > 0"));
}

try {
SlidingProcessingTimeWindows.of(Time.seconds(20), Time.seconds(10), Time.seconds(-11));
fail("should fail");
} catch (IllegalArgumentException e) {
assertThat(e.toString(), containsString("abs(offset) < slide and size > 0"));
}

try {
SlidingProcessingTimeWindows.of(Time.seconds(20), Time.seconds(10), Time.seconds(-1));
SlidingProcessingTimeWindows.of(Time.seconds(20), Time.seconds(10), Time.seconds(11));
fail("should fail");
} catch (IllegalArgumentException e) {
assertThat(e.toString(), containsString("0 <= offset < slide and size > 0"));
assertThat(e.toString(), containsString("abs(offset) < slide and size > 0"));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,18 @@ public void testWindowAssignmentWithOffset() {
assertThat(assigner.assignWindows("String", 5100L, mockContext), contains(timeWindow(5100, 10100)));
}

@Test
public void testWindowAssignmentWithNegativeOffset() {
WindowAssigner.WindowAssignerContext mockContext =
mock(WindowAssigner.WindowAssignerContext.class);

TumblingEventTimeWindows assigner = TumblingEventTimeWindows.of(Time.milliseconds(5000), Time.milliseconds(-100));

assertThat(assigner.assignWindows("String", 0L, mockContext), contains(timeWindow(-100, 4900)));
assertThat(assigner.assignWindows("String", 4899L, mockContext), contains(timeWindow(-100, 4900)));
assertThat(assigner.assignWindows("String", 4900L, mockContext), contains(timeWindow(4900, 9900)));
}

@Test
public void testTimeUnits() {
// sanity check with one other time unit
Expand All @@ -88,21 +100,21 @@ public void testInvalidParameters() {
TumblingEventTimeWindows.of(Time.seconds(-1));
fail("should fail");
} catch (IllegalArgumentException e) {
assertThat(e.toString(), containsString("0 <= offset < size"));
assertThat(e.toString(), containsString("abs(offset) < size"));
}

try {
TumblingEventTimeWindows.of(Time.seconds(10), Time.seconds(20));
fail("should fail");
} catch (IllegalArgumentException e) {
assertThat(e.toString(), containsString("0 <= offset < size"));
assertThat(e.toString(), containsString("abs(offset) < size"));
}

try {
TumblingEventTimeWindows.of(Time.seconds(10), Time.seconds(-1));
TumblingEventTimeWindows.of(Time.seconds(10), Time.seconds(-11));
fail("should fail");
} catch (IllegalArgumentException e) {
assertThat(e.toString(), containsString("0 <= offset < size"));
assertThat(e.toString(), containsString("abs(offset) < size"));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,23 @@ public void testWindowAssignmentWithOffset() {
assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), contains(timeWindow(5100, 10100)));
}

@Test
public void testWindowAssignmentWithNegativeOffset() {
WindowAssigner.WindowAssignerContext mockContext =
mock(WindowAssigner.WindowAssignerContext.class);

TumblingProcessingTimeWindows assigner = TumblingProcessingTimeWindows.of(Time.milliseconds(5000), Time.milliseconds(-100));

when(mockContext.getCurrentProcessingTime()).thenReturn(100L);
assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), contains(timeWindow(-100, 4900)));

when(mockContext.getCurrentProcessingTime()).thenReturn(4899L);
assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), contains(timeWindow(-100, 4900)));

when(mockContext.getCurrentProcessingTime()).thenReturn(4900L);
assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), contains(timeWindow(4900, 9900)));
}

@Test
public void testTimeUnits() {
// sanity check with one other time unit
Expand All @@ -104,21 +121,21 @@ public void testInvalidParameters() {
TumblingProcessingTimeWindows.of(Time.seconds(-1));
fail("should fail");
} catch (IllegalArgumentException e) {
assertThat(e.toString(), containsString("0 <= offset < size"));
assertThat(e.toString(), containsString("abs(offset) < size"));
}

try {
TumblingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(20));
fail("should fail");
} catch (IllegalArgumentException e) {
assertThat(e.toString(), containsString("0 <= offset < size"));
assertThat(e.toString(), containsString("abs(offset) < size"));
}

try {
TumblingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(-1));
TumblingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(-11));
fail("should fail");
} catch (IllegalArgumentException e) {
assertThat(e.toString(), containsString("0 <= offset < size"));
assertThat(e.toString(), containsString("abs(offset) < size"));
}
}

Expand Down

0 comments on commit 53044a0

Please sign in to comment.