From 64fb8029be0796b289b21a40f528ee8e8b0edc05 Mon Sep 17 00:00:00 2001 From: Eric Hwang Date: Thu, 11 Sep 2014 14:28:01 -0700 Subject: [PATCH] Create AsyncResponseHandler builder pattern - Replaces AsyncResponseUtils - Adds ability to asynchronously cancel on client disconnect --- .../presto/server/AsyncResponseHandler.java | 107 ++++++++++++++++ .../presto/server/AsyncResponseUtils.java | 121 ------------------ .../facebook/presto/server/TaskResource.java | 19 ++- 3 files changed, 116 insertions(+), 131 deletions(-) create mode 100644 presto-main/src/main/java/com/facebook/presto/server/AsyncResponseHandler.java delete mode 100644 presto-main/src/main/java/com/facebook/presto/server/AsyncResponseUtils.java diff --git a/presto-main/src/main/java/com/facebook/presto/server/AsyncResponseHandler.java b/presto-main/src/main/java/com/facebook/presto/server/AsyncResponseHandler.java new file mode 100644 index 000000000000..2283468fb504 --- /dev/null +++ b/presto-main/src/main/java/com/facebook/presto/server/AsyncResponseHandler.java @@ -0,0 +1,107 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.server; + +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import io.airlift.units.Duration; + +import javax.ws.rs.container.AsyncResponse; +import javax.ws.rs.container.TimeoutHandler; +import javax.ws.rs.core.Response; + +import java.lang.ref.WeakReference; +import java.util.concurrent.Executor; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static javax.ws.rs.core.Response.status; + +public class AsyncResponseHandler +{ + private final AsyncResponse asyncResponse; + private final WeakReference> futureResponseReference; + + private AsyncResponseHandler(AsyncResponse asyncResponse, ListenableFuture futureResponse) + { + this.asyncResponse = checkNotNull(asyncResponse, "asyncResponse is null"); + // the jaxrs implementation can hold on to the async timeout for a long time, and + // the future can reference large expensive objects. Since we are only interested + // in canceling this future on a timeout, only hold a weak reference to the future + this.futureResponseReference = new WeakReference>(checkNotNull(futureResponse, "futureResponse is null")); + } + + public static AsyncResponseHandler bindAsyncResponse(AsyncResponse asyncResponse, ListenableFuture futureResponse, Executor httpResponseExecutor) + { + Futures.addCallback(futureResponse, toFutureCallback(asyncResponse), httpResponseExecutor); + return new AsyncResponseHandler(asyncResponse, futureResponse); + } + + public AsyncResponseHandler withTimeout(Duration timeout) + { + return withTimeout(timeout, + status(Response.Status.SERVICE_UNAVAILABLE) + .entity("Timed out after waiting for " + timeout.convertToMostSuccinctTimeUnit()) + .build()); + } + + public AsyncResponseHandler withTimeout(Duration timeout, final Response timeoutResponse) + { + asyncResponse.setTimeoutHandler(new TimeoutHandler() + { + @Override + public void handleTimeout(AsyncResponse asyncResponse) + { + asyncResponse.resume(timeoutResponse); + cancelFuture(); + } + }); + asyncResponse.setTimeout(timeout.toMillis(), MILLISECONDS); + return this; + } + + private void cancelFuture() + { + // Cancel the original future if it still exists + ListenableFuture futureResponse = futureResponseReference.get(); + if (futureResponse != null) { + try { + futureResponse.cancel(true); + } + catch (Exception ignored) { + } + } + } + + private static FutureCallback toFutureCallback(final AsyncResponse asyncResponse) + { + return new FutureCallback() + { + @Override + public void onSuccess(T value) + { + checkArgument(!(value instanceof Response.ResponseBuilder), "Value is a ResponseBuilder. Did you forget to call build?"); + asyncResponse.resume(value); + } + + @Override + public void onFailure(Throwable t) + { + asyncResponse.resume(t); + } + }; + } +} diff --git a/presto-main/src/main/java/com/facebook/presto/server/AsyncResponseUtils.java b/presto-main/src/main/java/com/facebook/presto/server/AsyncResponseUtils.java deleted file mode 100644 index 2f623d562f8a..000000000000 --- a/presto-main/src/main/java/com/facebook/presto/server/AsyncResponseUtils.java +++ /dev/null @@ -1,121 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.facebook.presto.server; - -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; -import io.airlift.units.Duration; - -import javax.ws.rs.container.AsyncResponse; -import javax.ws.rs.container.TimeoutHandler; -import javax.ws.rs.core.Response; -import javax.ws.rs.core.Response.ResponseBuilder; -import javax.ws.rs.core.Response.Status; - -import java.lang.ref.WeakReference; -import java.util.concurrent.Executor; - -import static com.google.common.base.Preconditions.checkArgument; -import static java.util.concurrent.TimeUnit.MILLISECONDS; - -public final class AsyncResponseUtils -{ - private AsyncResponseUtils() - { - } - - public static void registerAsyncResponse( - AsyncResponse asyncResponse, - ListenableFuture futureResponse, - Duration timeout, - Executor executor) - { - Response timeoutResponse = Response.status(Status.SERVICE_UNAVAILABLE) - .entity("Timed out after waiting for " + timeout.convertToMostSuccinctTimeUnit()) - .build(); - - registerAsyncResponse(asyncResponse, - futureResponse, - timeout, - executor, - timeoutResponse); - } - - public static void registerAsyncResponse( - AsyncResponse asyncResponse, - final ListenableFuture futureResponse, - Duration requestTimeout, - Executor executor, - final Response timeoutResponse) - { - // when the future completes, send the response - Futures.addCallback(futureResponse, toAsyncResponse(asyncResponse), executor); - - // if the future does not complete in the specified time, send the timeout response - asyncResponse.setTimeoutHandler(new AsyncTimeoutHandler(futureResponse, timeoutResponse)); - asyncResponse.setTimeout(requestTimeout.toMillis(), MILLISECONDS); - } - - private static FutureCallback toAsyncResponse(final AsyncResponse asyncResponse) - { - return new FutureCallback() - { - @Override - public void onSuccess(T value) - { - checkArgument(!(value instanceof ResponseBuilder), "Value is a ResponseBuilder. Did you forget to call build?"); - asyncResponse.resume(value); - } - - @Override - public void onFailure(Throwable t) - { - asyncResponse.resume(t); - } - }; - } - - private static class AsyncTimeoutHandler - implements TimeoutHandler - { - private final WeakReference> futureResponseReference; - private final Response timeoutResponse; - - public AsyncTimeoutHandler(ListenableFuture futureResponse, Response timeoutResponse) - { - // the jaxrs implementation can hold on to the async timeout for a long time, and - // the future can reference large expensive objects. Since we are only interested - // in canceling this future on a timeout, only hold a weak reference to the future - this.futureResponseReference = new WeakReference>(futureResponse); - this.timeoutResponse = timeoutResponse; - } - - @Override - public void handleTimeout(AsyncResponse asyncResponse) - { - asyncResponse.resume(timeoutResponse); - - // cancel the original future if it still exists - ListenableFuture futureResponse = futureResponseReference.get(); - if (futureResponse != null) { - try { - futureResponse.cancel(true); - } - catch (Exception ignored) { - } - } - } - } -} diff --git a/presto-main/src/main/java/com/facebook/presto/server/TaskResource.java b/presto-main/src/main/java/com/facebook/presto/server/TaskResource.java index dce3c86c5eae..aa31867229e8 100644 --- a/presto-main/src/main/java/com/facebook/presto/server/TaskResource.java +++ b/presto-main/src/main/java/com/facebook/presto/server/TaskResource.java @@ -57,6 +57,7 @@ import static com.facebook.presto.client.PrestoHeaders.PRESTO_PAGE_NEXT_TOKEN; import static com.facebook.presto.client.PrestoHeaders.PRESTO_PAGE_TOKEN; import static com.facebook.presto.execution.TaskInfo.summarizeTaskInfo; +import static com.facebook.presto.server.AsyncResponseHandler.bindAsyncResponse; import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.collect.Iterables.transform; import static java.util.concurrent.TimeUnit.MILLISECONDS; @@ -155,7 +156,8 @@ public TaskInfo apply(TaskInfo taskInfo) // For hard timeout, add an additional 5 seconds to max wait for thread scheduling contention and GC Duration timeout = new Duration(maxWait.toMillis() + 5000, MILLISECONDS); - AsyncResponseUtils.registerAsyncResponse(asyncResponse, futureTaskInfo, timeout, executor); + bindAsyncResponse(asyncResponse, futureTaskInfo, executor) + .withTimeout(timeout); } @DELETE @@ -228,15 +230,12 @@ else if (result.isBufferClosed()) { // For hard timeout, add an additional 5 seconds to max wait for thread scheduling contention and GC Duration timeout = new Duration(DEFAULT_MAX_WAIT_TIME.toMillis() + 5000, MILLISECONDS); - AsyncResponseUtils.registerAsyncResponse( - asyncResponse, - responseFuture, - timeout, - executor, - Response.status(Status.NO_CONTENT) - .header(PRESTO_PAGE_TOKEN, token) - .header(PRESTO_PAGE_NEXT_TOKEN, token) - .build()); + bindAsyncResponse(asyncResponse, responseFuture, executor) + .withTimeout(timeout, + Response.status(Status.NO_CONTENT) + .header(PRESTO_PAGE_TOKEN, token) + .header(PRESTO_PAGE_NEXT_TOKEN, token) + .build()); } @DELETE