Skip to content

Commit

Permalink
Merge pull request apache#10804: [BEAM-2535] Fix timer map
Browse files Browse the repository at this point in the history
  • Loading branch information
reuvenlax committed Feb 9, 2020
1 parent e833580 commit 7719708
Show file tree
Hide file tree
Showing 11 changed files with 133 additions and 99 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ public void testOnTimerExceptionsWrappedAsUserCodeException() {

runner.onTimer(
ThrowingDoFn.TIMER_ID,
ThrowingDoFn.TIMER_ID,
"",
GlobalWindow.INSTANCE,
new Instant(0),
new Instant(0),
Expand Down
1 change: 0 additions & 1 deletion runners/google-cloud-dataflow-java/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,6 @@ def commonExcludeCategories = [
'org.apache.beam.sdk.testing.UsesGaugeMetrics',
'org.apache.beam.sdk.testing.UsesSetState',
'org.apache.beam.sdk.testing.UsesMapState',
'org.apache.beam.sdk.testing.UsesTimerMap',
'org.apache.beam.sdk.testing.UsesSplittableParDoWithWindowedSideInputs',
'org.apache.beam.sdk.testing.UsesUnboundedPCollections',
'org.apache.beam.sdk.testing.UsesTestStream',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,8 @@ public void processTimers() throws Exception {
}

private void processUserTimer(TimerData timer) throws Exception {
if (fnSignature.timerDeclarations().containsKey(timer.getTimerId())) {
if (fnSignature.timerDeclarations().containsKey(timer.getTimerId())
|| fnSignature.timerFamilyDeclarations().containsKey(timer.getTimerFamilyId())) {
BoundedWindow window = ((WindowNamespace) timer.getNamespace()).getWindow();
fnRunner.onTimer(
timer.getTimerId(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,9 @@ public void translateTransform(
// TODO: add support of states and timers
DoFnSignature signature = DoFnSignatures.getSignature(doFn.getClass());
boolean stateful =
signature.stateDeclarations().size() > 0 || signature.timerDeclarations().size() > 0;
signature.stateDeclarations().size() > 0
|| signature.timerDeclarations().size() > 0
|| signature.timerFamilyDeclarations().size() > 0;
checkState(!stateful, "States and timers are not supported for the moment.");

DoFnSchemaInformation doFnSchemaInformation =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,8 @@ public static void rejectStateAndTimers(DoFn<?, ?> doFn) {
SparkRunner.class.getSimpleName()));
}

if (signature.timerDeclarations().size() > 0) {
if (signature.timerDeclarations().size() > 0
|| signature.timerFamilyDeclarations().size() > 0) {
throw new UnsupportedOperationException(
String.format(
"Found %s annotations on %s, but %s cannot yet be used with timers in the %s.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -583,7 +583,8 @@ private static <InputT, OutputT> void validate(DoFn<InputT, OutputT> fn) {
}

// Timers are semantically incompatible with splitting
if (!signature.timerDeclarations().isEmpty() && signature.processElement().isSplittable()) {
if ((!signature.timerDeclarations().isEmpty() || !signature.timerFamilyDeclarations().isEmpty())
&& signature.processElement().isSplittable()) {
throw new UnsupportedOperationException(
String.format(
"%s is splittable and uses timers, but these are not compatible",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -96,6 +95,7 @@
import org.apache.beam.vendor.bytebuddy.v1_9_3.net.bytebuddy.jar.asm.Opcodes;
import org.apache.beam.vendor.bytebuddy.v1_9_3.net.bytebuddy.jar.asm.Type;
import org.apache.beam.vendor.bytebuddy.v1_9_3.net.bytebuddy.matcher.ElementMatchers;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.primitives.Primitives;

/** Dynamically generates a {@link DoFnInvoker} instances for invoking a {@link DoFn}. */
Expand Down Expand Up @@ -172,7 +172,8 @@ public abstract static class DoFnInvokerBase<InputT, OutputT, DoFnT extends DoFn
implements DoFnInvoker<InputT, OutputT> {
protected DoFnT delegate;

private Map<String, OnTimerInvoker> onTimerInvokers = new HashMap<>();
private Map<String, OnTimerInvoker> onTimerInvokers = Maps.newHashMap();
private Map<String, OnTimerInvoker> onTimerFamilyInvokers = Maps.newHashMap();

public DoFnInvokerBase(DoFnT delegate) {
this.delegate = delegate;
Expand All @@ -191,75 +192,36 @@ void addOnTimerInvoker(String timerId, OnTimerInvoker onTimerInvoker) {
this.onTimerInvokers.put(timerId, onTimerInvoker);
}

@Override
public void invokeOnTimer(
String timerId,
String timerFamilyId,
DoFnInvoker.ArgumentProvider<InputT, OutputT> arguments) {
@Nullable OnTimerInvoker onTimerInvoker = onTimerInvokers.get(timerId);

if (onTimerInvoker != null) {
onTimerInvoker.invokeOnTimer(arguments);
} else {
throw new IllegalArgumentException(
String.format(
"Attempted to invoke timer %s on %s, but that timer is not registered."
+ " This is the responsibility of the runner, which must only deliver"
+ " registered timers.",
timerId, delegate.getClass().getName()));
}
}

@Override
public DoFn<InputT, OutputT> getFn() {
return delegate;
}
}

/**
* Internal base class for generated {@link DoFnInvoker} instances.
*
* <p>This class should <i>not</i> be extended directly, or by Beam users. It must be public for
* generated instances to have adequate access, as they are generated "inside" the invoked {@link
* DoFn} class.
*/
public abstract static class DoFnInvokerTimerFamily<
InputT, OutputT, DoFnT extends DoFn<InputT, OutputT>>
implements DoFnInvoker<InputT, OutputT> {
protected DoFnT delegate;

private Map<String, OnTimerInvoker> onTimerInvokers = new HashMap<>();

public DoFnInvokerTimerFamily(DoFnT delegate) {
this.delegate = delegate;
}

/**
* Associates the given timerFamily ID with the given {@link OnTimerInvoker}.
*
* <p>ByteBuddy does not like to generate conditional code, so we use a map + lookup of the
* timer ID rather than a generated conditional branch to choose which OnTimerInvoker to invoke.
*/
void addOnTimerFamilyInvoker(String timerFamilyId, OnTimerInvoker onTimerInvoker) {
this.onTimerInvokers.put(timerFamilyId, onTimerInvoker);
this.onTimerFamilyInvokers.put(timerFamilyId, onTimerInvoker);
}

@Override
public void invokeOnTimer(
String timerId,
String timerFamilyId,
DoFnInvoker.ArgumentProvider<InputT, OutputT> arguments) {
@Nullable OnTimerInvoker onTimerInvoker = onTimerInvokers.get(timerFamilyId);
@Nullable
OnTimerInvoker onTimerInvoker =
(timerFamilyId.isEmpty())
? onTimerInvokers.get(timerId)
: onTimerFamilyInvokers.get(timerFamilyId);

if (onTimerInvoker != null) {
onTimerInvoker.invokeOnTimer(arguments);
} else {
throw new IllegalArgumentException(
String.format(
"Attempted to invoke timerFamily %s on %s, but that timerFamily is not registered."
"Attempted to invoke timer %s on %s, but that timer is not registered."
+ " This is the responsibility of the runner, which must only deliver"
+ " registered timers.",
timerFamilyId, delegate.getClass().getName()));
timerId, delegate.getClass().getName()));
}
}

Expand All @@ -279,33 +241,21 @@ public <InputT, OutputT> DoFnInvoker<InputT, OutputT> newByteBuddyInvoker(
fn.getClass());

try {
if (signature.timerFamilyDeclarations().size() > 0) {
@SuppressWarnings("unchecked")
DoFnInvokerTimerFamily<InputT, OutputT, DoFn<InputT, OutputT>> invoker =
(DoFnInvokerTimerFamily<InputT, OutputT, DoFn<InputT, OutputT>>)
getByteBuddyInvokerConstructor(signature).newInstance(fn);

for (DoFnSignature.OnTimerFamilyMethod onTimerFamilyMethod :
signature.onTimerFamilyMethods().values()) {
invoker.addOnTimerFamilyInvoker(
onTimerFamilyMethod.id(),
OnTimerInvokers.forTimerFamily(fn, onTimerFamilyMethod.id()));
}
return invoker;
} else {

@SuppressWarnings("unchecked")
DoFnInvokerBase<InputT, OutputT, DoFn<InputT, OutputT>> invoker =
(DoFnInvokerBase<InputT, OutputT, DoFn<InputT, OutputT>>)
getByteBuddyInvokerConstructor(signature).newInstance(fn);

for (OnTimerMethod onTimerMethod : signature.onTimerMethods().values()) {
invoker.addOnTimerInvoker(
onTimerMethod.id(), OnTimerInvokers.forTimer(fn, onTimerMethod.id()));
}
return invoker;
@SuppressWarnings("unchecked")
DoFnInvokerBase<InputT, OutputT, DoFn<InputT, OutputT>> invoker =
(DoFnInvokerBase<InputT, OutputT, DoFn<InputT, OutputT>>)
getByteBuddyInvokerConstructor(signature).newInstance(fn);

for (OnTimerMethod onTimerMethod : signature.onTimerMethods().values()) {
invoker.addOnTimerInvoker(
onTimerMethod.id(), OnTimerInvokers.forTimer(fn, onTimerMethod.id()));
}

for (DoFnSignature.OnTimerFamilyMethod onTimerFamilyMethod :
signature.onTimerFamilyMethods().values()) {
invoker.addOnTimerFamilyInvoker(
onTimerFamilyMethod.id(), OnTimerInvokers.forTimerFamily(fn, onTimerFamilyMethod.id()));
}
return invoker;
} catch (InstantiationException
| IllegalAccessException
| IllegalArgumentException
Expand All @@ -325,12 +275,7 @@ private synchronized Constructor<?> getByteBuddyInvokerConstructor(DoFnSignature
Class<? extends DoFn<?, ?>> fnClass = signature.fnClass();
Constructor<?> constructor = byteBuddyInvokerConstructorCache.get(fnClass);
if (constructor == null) {
Class<? extends DoFnInvoker<?, ?>> invokerClass =
generateInvokerClass(
signature,
signature.timerFamilyDeclarations().size() > 0
? DoFnInvokerTimerFamily.class
: DoFnInvokerBase.class);
Class<? extends DoFnInvoker<?, ?>> invokerClass = generateInvokerClass(signature);
try {
constructor = invokerClass.getConstructor(fnClass);
} catch (IllegalArgumentException | NoSuchMethodException | SecurityException e) {
Expand Down Expand Up @@ -391,8 +336,7 @@ public static <InputT, OutputT> double invokeGetSize(
}

/** Generates a {@link DoFnInvoker} class for the given {@link DoFnSignature}. */
private static Class<? extends DoFnInvoker<?, ?>> generateInvokerClass(
DoFnSignature signature, Class<? extends DoFnInvoker> clazz) {
private static Class<? extends DoFnInvoker<?, ?>> generateInvokerClass(DoFnSignature signature) {
Class<? extends DoFn<?, ?>> fnClass = signature.fnClass();

final TypeDescription clazzDescription = new TypeDescription.ForLoadedType(fnClass);
Expand All @@ -406,12 +350,12 @@ public static <InputT, OutputT> double invokeGetSize(
.withSuffix(DoFnInvoker.class.getSimpleName()))

// class <invoker class> extends DoFnInvokerBase {
.subclass(clazz, ConstructorStrategy.Default.NO_CONSTRUCTORS)
.subclass(DoFnInvokerBase.class, ConstructorStrategy.Default.NO_CONSTRUCTORS)

// public <invoker class>(<fn class> delegate) { this.delegate = delegate; }
.defineConstructor(Visibility.PUBLIC)
.withParameter(fnClass)
.intercept(new InvokerConstructor(clazz))
.intercept(new InvokerConstructor(DoFnInvokerBase.class))

// public invokeProcessElement(ProcessContext, ExtraContextFactory) {
// delegate.<@ProcessElement>(... pass just the right args ...);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1559,6 +1559,9 @@ private static void validateTimerFamilyField(
Map<String, TimerFamilyDeclaration> declarations,
String id,
Field field) {
if (id.isEmpty()) {
errors.throwIllegalArgument("TimerFamily id must not be empty");
}

if (declarations.containsKey(id)) {
errors.throwIllegalArgument(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollection.IsBounded;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PDone;
Expand Down Expand Up @@ -4482,7 +4483,17 @@ public static class TimerFamilyTests extends SharedTestBase implements Serializa

@Test
@Category({ValidatesRunner.class, UsesTimersInParDo.class, UsesTimerMap.class})
public void testTimerFamilyEventTime() throws Exception {
public void testTimerFamilyEventTimeBounded() throws Exception {
runTestTimerFamilyEventTime(false);
}

@Test
@Category({ValidatesRunner.class, UsesTimersInParDo.class, UsesTimerMap.class})
public void testTimerFamilyEventTimeUnbounded() throws Exception {
runTestTimerFamilyEventTime(true);
}

public void runTestTimerFamilyEventTime(boolean useStreaming) {
final String timerFamilyId = "foo";

DoFn<KV<String, Integer>, String> fn =
Expand Down Expand Up @@ -4512,14 +4523,27 @@ public void onTimer(
};

PCollection<String> output =
pipeline.apply(Create.of(KV.of("hello", 37))).apply(ParDo.of(fn));
pipeline
.apply(Create.of(KV.of("hello", 37)))
.setIsBoundedInternal(useStreaming ? IsBounded.UNBOUNDED : IsBounded.BOUNDED)
.apply(ParDo.of(fn));
PAssert.that(output).containsInAnyOrder("process", "timer1", "timer2");
pipeline.run();
}

@Test
@Category({ValidatesRunner.class, UsesTimersInParDo.class, UsesTimerMap.class})
public void testTimerWithMultipleTimerFamily() throws Exception {
public void testTimerWithMultipleTimerFamilyBounded() throws Exception {
runTestTimerWithMultipleTimerFamily(false);
}

@Test
@Category({ValidatesRunner.class, UsesTimersInParDo.class, UsesTimerMap.class})
public void testTimerWithMultipleTimerFamilyUnbounded() throws Exception {
runTestTimerWithMultipleTimerFamily(true);
}

public void runTestTimerWithMultipleTimerFamily(boolean useStreaming) throws Exception {
final String timerFamilyId1 = "foo";
final String timerFamilyId2 = "bar";

Expand Down Expand Up @@ -4556,11 +4580,70 @@ public void onTimer2(
};

PCollection<String> output =
pipeline.apply(Create.of(KV.of("hello", 37))).apply(ParDo.of(fn));
pipeline
.apply(Create.of(KV.of("hello", 37)))
.setIsBoundedInternal(useStreaming ? IsBounded.UNBOUNDED : IsBounded.BOUNDED)
.apply(ParDo.of(fn));
PAssert.that(output).containsInAnyOrder("process", "timer", "timer");
pipeline.run();
}

@Test
@Category({ValidatesRunner.class, UsesTimersInParDo.class, UsesTimerMap.class})
public void testTimerFamilyAndTimerBounded() throws Exception {
runTestTimerFamilyAndTimer(false);
}

@Test
@Category({ValidatesRunner.class, UsesTimersInParDo.class, UsesTimerMap.class})
public void testTimerFamilyAndTimerUnbounded() throws Exception {
runTestTimerFamilyAndTimer(true);
}

public void runTestTimerFamilyAndTimer(boolean useStreaming) throws Exception {
final String timerFamilyId = "foo";
final String timerId = "timer";

DoFn<KV<String, Integer>, String> fn =
new DoFn<KV<String, Integer>, String>() {

@TimerFamily(timerFamilyId)
private final TimerSpec spec1 = TimerSpecs.timerMap(TimeDomain.EVENT_TIME);

@TimerId(timerId)
private final TimerSpec spec2 = TimerSpecs.timer(TimeDomain.EVENT_TIME);

@ProcessElement
public void processElement(
@TimerFamily(timerFamilyId) TimerMap timerMap,
@TimerId(timerId) Timer timer,
OutputReceiver<String> r) {
timerMap.set("timer", new Instant(1));
timer.set(new Instant(2));
r.output("process");
}

@OnTimerFamily(timerFamilyId)
public void onTimer1(
@TimerId String timerId, @Timestamp Instant ts, OutputReceiver<String> r) {
r.output("family:" + timerFamilyId + ":" + timerId);
}

@OnTimer(timerId)
public void onTimer2(@Timestamp Instant ts, OutputReceiver<String> r) {
r.output(timerId);
}
};

PCollection<String> output =
pipeline
.apply(Create.of(KV.of("hello", 37)))
.setIsBoundedInternal(useStreaming ? IsBounded.UNBOUNDED : IsBounded.BOUNDED)
.apply(ParDo.of(fn));
PAssert.that(output).containsInAnyOrder("process", "family:foo:timer", "timer");
pipeline.run();
}

@Test
@Category({
ValidatesRunner.class,
Expand Down
Loading

0 comments on commit 7719708

Please sign in to comment.