Skip to content

Commit

Permalink
[FLINK-9599][rest] Implement generic mechanism to access uploaded files
Browse files Browse the repository at this point in the history
This closes apache#6178.
  • Loading branch information
zentol committed Jun 21, 2018
1 parent e6efa17 commit ae8cef3
Show file tree
Hide file tree
Showing 15 changed files with 1,200 additions and 187 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.RestHandlerException;
import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
import org.apache.flink.runtime.rest.messages.FileUpload;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
Expand All @@ -35,6 +35,7 @@
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
Expand All @@ -46,7 +47,7 @@
* Handles .jar file uploads.
*/
public class JarUploadHandler extends
AbstractRestHandler<RestfulGateway, FileUpload, JarUploadResponseBody, EmptyMessageParameters> {
AbstractRestHandler<RestfulGateway, EmptyRequestBody, JarUploadResponseBody, EmptyMessageParameters> {

private final Path jarDir;

Expand All @@ -57,7 +58,7 @@ public JarUploadHandler(
final GatewayRetriever<? extends RestfulGateway> leaderRetriever,
final Time timeout,
final Map<String, String> responseHeaders,
final MessageHeaders<FileUpload, JarUploadResponseBody, EmptyMessageParameters> messageHeaders,
final MessageHeaders<EmptyRequestBody, JarUploadResponseBody, EmptyMessageParameters> messageHeaders,
final Path jarDir,
final Executor executor) {
super(localRestAddress, leaderRetriever, timeout, responseHeaders, messageHeaders);
Expand All @@ -67,41 +68,34 @@ public JarUploadHandler(

@Override
protected CompletableFuture<JarUploadResponseBody> handleRequest(
@Nonnull final HandlerRequest<FileUpload, EmptyMessageParameters> request,
@Nonnull final HandlerRequest<EmptyRequestBody, EmptyMessageParameters> request,
@Nonnull final RestfulGateway gateway) throws RestHandlerException {

final FileUpload fileUpload = request.getRequestBody();
Collection<Path> uploadedFiles = request.getUploadedFiles();
if (uploadedFiles.size() != 1) {
throw new RestHandlerException("Exactly 1 file must be sent, received " + uploadedFiles.size() + '.', HttpResponseStatus.BAD_REQUEST);
}
final Path fileUpload = uploadedFiles.iterator().next();
return CompletableFuture.supplyAsync(() -> {
if (!fileUpload.getPath().getFileName().toString().endsWith(".jar")) {
deleteUploadedFile(fileUpload);
if (!fileUpload.getFileName().toString().endsWith(".jar")) {
throw new CompletionException(new RestHandlerException(
"Only Jar files are allowed.",
HttpResponseStatus.BAD_REQUEST));
} else {
final Path destination = jarDir.resolve(fileUpload.getPath().getFileName());
final Path destination = jarDir.resolve(fileUpload.getFileName());
try {
Files.move(fileUpload.getPath(), destination);
Files.move(fileUpload, destination);
} catch (IOException e) {
deleteUploadedFile(fileUpload);
throw new CompletionException(new RestHandlerException(
String.format("Could not move uploaded jar file [%s] to [%s].",
fileUpload.getPath(),
fileUpload,
destination),
HttpResponseStatus.INTERNAL_SERVER_ERROR,
e));
}
return new JarUploadResponseBody(fileUpload.getPath()
return new JarUploadResponseBody(fileUpload
.normalize()
.toString());
}
}, executor);
}

private void deleteUploadedFile(final FileUpload fileUpload) {
try {
Files.delete(fileUpload.getPath());
} catch (IOException e) {
log.error("Failed to delete file {}.", fileUpload.getPath(), e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,15 @@

import org.apache.flink.runtime.rest.HttpMethodWrapper;
import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
import org.apache.flink.runtime.rest.messages.FileUpload;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.MessageHeaders;

import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;

/**
* {@link MessageHeaders} for uploading jars.
*/
public final class JarUploadHeaders implements MessageHeaders<FileUpload, JarUploadResponseBody, EmptyMessageParameters> {
public final class JarUploadHeaders implements MessageHeaders<EmptyRequestBody, JarUploadResponseBody, EmptyMessageParameters> {

public static final String URL = "/jars/upload";
private static final JarUploadHeaders INSTANCE = new JarUploadHeaders();
Expand All @@ -46,8 +46,8 @@ public HttpResponseStatus getResponseStatusCode() {
}

@Override
public Class<FileUpload> getRequestClass() {
return FileUpload.class;
public Class<EmptyRequestBody> getRequestClass() {
return EmptyRequestBody.class;
}

@Override
Expand Down Expand Up @@ -75,4 +75,9 @@ public String getDescription() {
" header is set to \"application/x-java-archive\", as some http libraries do not add the header by default.\n" +
"Using 'curl' you can upload a jar via 'curl -X POST -H \"Expect:\" -F \"jarfile=#path/to/flink-job.jar\" https://hostname:port" + URL + "'.";
}

@Override
public boolean acceptsFileUploads() {
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import org.apache.flink.runtime.rest.handler.HandlerRequestException;
import org.apache.flink.runtime.rest.handler.RestHandlerException;
import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
import org.apache.flink.runtime.rest.messages.FileUpload;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.TestLogger;

Expand All @@ -38,6 +38,7 @@
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Collections;
Expand Down Expand Up @@ -83,7 +84,7 @@ public void setUp() throws Exception {
@Test
public void testRejectNonJarFiles() throws Exception {
final Path uploadedFile = Files.createFile(jarDir.resolve("katrin.png"));
final HandlerRequest<FileUpload, EmptyMessageParameters> request = createRequest(uploadedFile);
final HandlerRequest<EmptyRequestBody, EmptyMessageParameters> request = createRequest(uploadedFile);

try {
jarUploadHandler.handleRequest(request, mockDispatcherGateway).get();
Expand All @@ -99,7 +100,7 @@ public void testRejectNonJarFiles() throws Exception {
@Test
public void testUploadJar() throws Exception {
final Path uploadedFile = Files.createFile(jarDir.resolve("Kafka010Example.jar"));
final HandlerRequest<FileUpload, EmptyMessageParameters> request = createRequest(uploadedFile);
final HandlerRequest<EmptyRequestBody, EmptyMessageParameters> request = createRequest(uploadedFile);

final JarUploadResponseBody jarUploadResponseBody = jarUploadHandler.handleRequest(request, mockDispatcherGateway).get();
assertThat(jarUploadResponseBody.getStatus(), equalTo(JarUploadResponseBody.UploadStatus.success));
Expand All @@ -109,7 +110,7 @@ public void testUploadJar() throws Exception {
@Test
public void testFailedUpload() throws Exception {
final Path uploadedFile = jarDir.resolve("Kafka010Example.jar");
final HandlerRequest<FileUpload, EmptyMessageParameters> request = createRequest(uploadedFile);
final HandlerRequest<EmptyRequestBody, EmptyMessageParameters> request = createRequest(uploadedFile);

try {
jarUploadHandler.handleRequest(request, mockDispatcherGateway).get();
Expand All @@ -123,12 +124,13 @@ public void testFailedUpload() throws Exception {
}
}

private static HandlerRequest<FileUpload, EmptyMessageParameters> createRequest(
final Path uploadedFile) throws HandlerRequestException {
private static HandlerRequest<EmptyRequestBody, EmptyMessageParameters> createRequest(
final Path uploadedFile) throws HandlerRequestException, IOException {
return new HandlerRequest<>(
new FileUpload(uploadedFile),
EmptyRequestBody.getInstance(),
EmptyMessageParameters.getInstance(),
Collections.emptyMap(),
Collections.emptyMap());
Collections.emptyMap(),
Collections.singleton(uploadedFile));
}
}
7 changes: 7 additions & 0 deletions flink-runtime/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,13 @@ under the License.
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp</artifactId>
<version>3.7.0</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-testkit_${scala.binary.version}</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@
package org.apache.flink.runtime.rest;

import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.rest.handler.FileUploads;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.HandlerRequestException;
import org.apache.flink.runtime.rest.handler.RedirectHandler;
import org.apache.flink.runtime.rest.handler.RestHandlerException;
import org.apache.flink.runtime.rest.handler.router.RoutedRequest;
import org.apache.flink.runtime.rest.handler.util.HandlerUtils;
import org.apache.flink.runtime.rest.messages.ErrorResponseBody;
import org.apache.flink.runtime.rest.messages.FileUpload;
import org.apache.flink.runtime.rest.messages.MessageParameters;
import org.apache.flink.runtime.rest.messages.RequestBody;
import org.apache.flink.runtime.rest.messages.UntypedResponseMessageHeaders;
Expand All @@ -50,7 +50,6 @@

import javax.annotation.Nonnull;

import java.nio.file.Path;
import java.util.Map;
import java.util.concurrent.CompletableFuture;

Expand Down Expand Up @@ -103,77 +102,78 @@ protected void respondAsLeader(ChannelHandlerContext ctx, RoutedRequest routedRe
return;
}

ByteBuf msgContent = ((FullHttpRequest) httpRequest).content();
final ByteBuf msgContent = ((FullHttpRequest) httpRequest).content();

R request;
if (isFileUpload()) {
final Path path = ctx.channel().attr(FileUploadHandler.UPLOADED_FILE).get();
if (path == null) {
try (FileUploads uploadedFiles = FileUploadHandler.getMultipartFileUploads(ctx)) {

if (!untypedResponseMessageHeaders.acceptsFileUploads() && !uploadedFiles.getUploadedFiles().isEmpty()) {
HandlerUtils.sendErrorResponse(
ctx,
httpRequest,
new ErrorResponseBody("Client did not upload a file."),
new ErrorResponseBody("File uploads not allowed."),
HttpResponseStatus.BAD_REQUEST,
responseHeaders);
return;
}
//noinspection unchecked
request = (R) new FileUpload(path);
} else if (msgContent.capacity() == 0) {
try {
request = MAPPER.readValue("{}", untypedResponseMessageHeaders.getRequestClass());
} catch (JsonParseException | JsonMappingException je) {
log.error("Request did not conform to expected format.", je);
HandlerUtils.sendErrorResponse(
ctx,
httpRequest,
new ErrorResponseBody("Bad request received."),
HttpResponseStatus.BAD_REQUEST,
responseHeaders);
return;

R request;
if (msgContent.capacity() == 0) {
try {
request = MAPPER.readValue("{}", untypedResponseMessageHeaders.getRequestClass());
} catch (JsonParseException | JsonMappingException je) {
log.error("Request did not conform to expected format.", je);
HandlerUtils.sendErrorResponse(
ctx,
httpRequest,
new ErrorResponseBody("Bad request received."),
HttpResponseStatus.BAD_REQUEST,
responseHeaders);
return;
}
} else {
try {
ByteBufInputStream in = new ByteBufInputStream(msgContent);
request = MAPPER.readValue(in, untypedResponseMessageHeaders.getRequestClass());
} catch (JsonParseException | JsonMappingException je) {
log.error("Failed to read request.", je);
HandlerUtils.sendErrorResponse(
ctx,
httpRequest,
new ErrorResponseBody(String.format("Request did not match expected format %s.", untypedResponseMessageHeaders.getRequestClass().getSimpleName())),
HttpResponseStatus.BAD_REQUEST,
responseHeaders);
return;
}
}
} else {

final HandlerRequest<R, M> handlerRequest;

try {
ByteBufInputStream in = new ByteBufInputStream(msgContent);
request = MAPPER.readValue(in, untypedResponseMessageHeaders.getRequestClass());
} catch (JsonParseException | JsonMappingException je) {
log.error("Failed to read request.", je);
handlerRequest = new HandlerRequest<R, M>(
request,
untypedResponseMessageHeaders.getUnresolvedMessageParameters(),
routedRequest.getRouteResult().pathParams(),
routedRequest.getRouteResult().queryParams(),
uploadedFiles.getUploadedFiles());
} catch (HandlerRequestException hre) {
log.error("Could not create the handler request.", hre);

HandlerUtils.sendErrorResponse(
ctx,
httpRequest,
new ErrorResponseBody(String.format("Request did not match expected format %s.", untypedResponseMessageHeaders.getRequestClass().getSimpleName())),
new ErrorResponseBody(String.format("Bad request, could not parse parameters: %s", hre.getMessage())),
HttpResponseStatus.BAD_REQUEST,
responseHeaders);
return;
}
}

final HandlerRequest<R, M> handlerRequest;

try {
handlerRequest = new HandlerRequest<>(
request,
untypedResponseMessageHeaders.getUnresolvedMessageParameters(),
routedRequest.getRouteResult().pathParams(),
routedRequest.getRouteResult().queryParams());
} catch (HandlerRequestException hre) {
log.error("Could not create the handler request.", hre);

HandlerUtils.sendErrorResponse(
respondToRequest(
ctx,
httpRequest,
new ErrorResponseBody(String.format("Bad request, could not parse parameters: %s", hre.getMessage())),
HttpResponseStatus.BAD_REQUEST,
responseHeaders);
return;
handlerRequest,
gateway);
}

respondToRequest(
ctx,
httpRequest,
handlerRequest,
gateway);

} catch (Throwable e) {
log.error("Request processing failed.", e);
HandlerUtils.sendErrorResponse(
Expand All @@ -185,10 +185,6 @@ protected void respondAsLeader(ChannelHandlerContext ctx, RoutedRequest routedRe
}
}

private boolean isFileUpload() {
return untypedResponseMessageHeaders.getRequestClass() == FileUpload.class;
}

/**
* Respond to the given {@link HandlerRequest}.
*
Expand Down
Loading

0 comments on commit ae8cef3

Please sign in to comment.