Skip to content

Commit

Permalink
[FLINK-12481][runtime] Use StreamTask mailbox for processing time tri…
Browse files Browse the repository at this point in the history
…ggered actions
  • Loading branch information
1u0 authored and pnowojski committed Sep 17, 2019
1 parent 03aadcc commit c7df381
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 89 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@

import org.apache.flink.runtime.util.FatalExitExceptionHandler;

import javax.annotation.Nullable;

import java.util.concurrent.ThreadFactory;

/**
Expand All @@ -34,41 +32,20 @@ public class DispatcherThreadFactory implements ThreadFactory {

private final String threadName;

@Nullable
private final ClassLoader classLoader;

/**
* Creates a new thread factory.
*
* @param group The group that the threads will be associated with.
* @param threadName The name for the threads.
*/
public DispatcherThreadFactory(ThreadGroup group, String threadName) {
this(group, threadName, null);
}

/**
* Creates a new thread factory.
*
* @param group The group that the threads will be associated with.
* @param threadName The name for the threads.
* @param classLoader The {@link ClassLoader} to be set as context class loader.
*/
public DispatcherThreadFactory(
ThreadGroup group,
String threadName,
@Nullable ClassLoader classLoader) {
this.group = group;
this.threadName = threadName;
this.classLoader = classLoader;
}

@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r, threadName);
if (classLoader != null) {
t.setContextClassLoader(classLoader);
}
t.setDaemon(true);
t.setUncaughtExceptionHandler(FatalExitExceptionHandler.INSTANCE);
return t;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -385,8 +385,8 @@ public final void invoke() throws Exception {

// if the clock is not already set, then assign a default TimeServiceProvider
if (timerService == null) {
ThreadFactory timerThreadFactory = new DispatcherThreadFactory(TRIGGER_THREAD_GROUP,
"Time Trigger for " + getName(), getUserCodeClassLoader());
ThreadFactory timerThreadFactory =
new DispatcherThreadFactory(TRIGGER_THREAD_GROUP, "Time Trigger for " + getName());

timerService = new SystemProcessingTimeService(new TimerInvocationContext(), timerThreadFactory);
}
Expand Down Expand Up @@ -1432,12 +1432,18 @@ private static <OUT> RecordWriter<SerializationDelegate<StreamRecord<OUT>>> crea
private class TimerInvocationContext implements SystemProcessingTimeService.ScheduledCallbackExecutionContext {
@Override
public void invoke(ProcessingTimeCallback callback, long timestamp) {
synchronized (getCheckpointLock()) {
try {
callback.onProcessingTime(timestamp);
} catch (Throwable t) {
handleAsyncException("Caught exception while processing timer.", new TimerException(t));
}
try {
mailboxProcessor.getMailboxExecutor(TaskMailbox.MAX_PRIORITY).execute(() -> {
synchronized (getCheckpointLock()) {
try {
callback.onProcessingTime(timestamp);
} catch (Throwable t) {
handleAsyncException("Caught exception while processing timer.", new TimerException(t));
}
}
});
} catch (Throwable t) {
handleAsyncException("Caught exception while processing timer.", new TimerException(t));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -737,6 +737,7 @@ private void testAsyncTimeout(
* <p>Note that this test does not enforce the exact strict ordering because with the fix it is no
* longer possible. However, it provokes the described situation without the fix.
*/
@Ignore("TODO: fix me when AsyncWaitOperator integrates with mailbox")
@Test(timeout = 10000L)
public void testClosingWithBlockedEmitter() throws Exception {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,9 +153,6 @@
import static org.apache.flink.streaming.util.StreamTaskUtil.waitTaskIsRunning;
import static org.apache.flink.util.Preconditions.checkState;
import static org.hamcrest.CoreMatchers.startsWith;
import static org.hamcrest.Matchers.everyItem;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.sameInstance;
Expand Down Expand Up @@ -785,25 +782,6 @@ protected void processInput(DefaultActionContext context) throws Exception {
}
}

/**
* Test set user code ClassLoader before calling ProcessingTimeCallback.
*/
@Test
public void testSetsUserCodeClassLoaderForTimerThreadFactory() throws Throwable {
syncLatch = new OneShotLatch();

try (MockEnvironment mockEnvironment =
new MockEnvironmentBuilder()
.setUserCodeClassLoader(new TestUserCodeClassLoader())
.build()) {
RunningTask<TimeServiceTask> task = runTask(() -> new TimeServiceTask(mockEnvironment));
task.waitForTaskCompletion(false);

assertThat(task.streamTask.getClassLoaders(), hasSize(greaterThanOrEqualTo(1)));
assertThat(task.streamTask.getClassLoaders(), everyItem(instanceOf(TestUserCodeClassLoader.class)));
}
}

/**
* Tests that some StreamTask methods are called only in the main task's thread.
* Currently, the main task's thread is the thread that creates the task.
Expand Down Expand Up @@ -1390,45 +1368,14 @@ protected void cancelTask() throws Exception {

}

/**
* A task that register a processing time service callback.
*/
public static class TimeServiceTask extends NoOpStreamTask<String, AbstractStreamOperator<String>> {

private final List<ClassLoader> classLoaders = Collections.synchronizedList(new ArrayList<>());

public TimeServiceTask(Environment env) {
super(env);
}

public List<ClassLoader> getClassLoaders() {
return classLoaders;
}

@Override
protected void init() throws Exception {
super.init();
getProcessingTimeService().registerTimer(0, new ProcessingTimeCallback() {
@Override
public void onProcessingTime(long timestamp) throws Exception {
classLoaders.add(Thread.currentThread().getContextClassLoader());
syncLatch.trigger();
}
});
}

@Override
protected void processInput(DefaultActionContext context) throws Exception {
syncLatch.await();
super.processInput(context);
}
}

private static class ThreadInspectingTask extends StreamTask<String, AbstractStreamOperator<String>> {

private final long taskThreadId;
private final ClassLoader taskClassLoader;

/** Flag to wait until time trigger has been called. */
private transient boolean hasTimerTriggered;

ThreadInspectingTask(Environment env) {
super(env, null);
Thread currentThread = Thread.currentThread();
Expand All @@ -1444,12 +1391,23 @@ ClassLoader getTaskClassLoader() {
@Override
protected void init() throws Exception {
checkTaskThreadInfo();

// Create a time trigger to validate that it would also be invoked in the task's thread.
getProcessingTimeService().registerTimer(0, new ProcessingTimeCallback() {
@Override
public void onProcessingTime(long timestamp) throws Exception {
hasTimerTriggered = true;
checkTaskThreadInfo();
}
});
}

@Override
protected void processInput(DefaultActionContext context) throws Exception {
checkTaskThreadInfo();
context.allActionsCompleted();
if (hasTimerTriggered) {
context.allActionsCompleted();
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.flink.streaming.api.scala.AsyncDataStreamITCase._
import org.apache.flink.streaming.api.scala.async.{ResultFuture, RichAsyncFunction}
import org.apache.flink.test.util.AbstractTestBase
import org.junit.Assert._
import org.junit.Test
import org.junit.{Ignore, Test}

import scala.collection.mutable
import scala.concurrent.{ExecutionContext, Future}
Expand All @@ -37,11 +37,13 @@ object AsyncDataStreamITCase {

class AsyncDataStreamITCase extends AbstractTestBase {

@Ignore("TODO: fix me when AsyncWaitOperator integrates with mailbox")
@Test
def testOrderedWait(): Unit = {
testAsyncWait(true)
}

@Ignore("TODO: fix me when AsyncWaitOperator integrates with mailbox")
@Test
def testUnorderedWait(): Unit = {
testAsyncWait(false)
Expand Down

0 comments on commit c7df381

Please sign in to comment.