Skip to content

Commit

Permalink
[FLINK-3952][runtine] Upgrade to Netty 4.1
Browse files Browse the repository at this point in the history
This commit includes possible bug fix to file uploading cleanup in FileUploadHandler and
HttpRequestHandler. For mor information look here:

netty/netty#7611

This closes apache#6071.
  • Loading branch information
pnowojski authored and tillrohrmann committed Jun 13, 2018
1 parent cc41285 commit 8169cf4
Show file tree
Hide file tree
Showing 17 changed files with 2,165 additions and 245 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.util.Preconditions;

import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufAllocator;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
import org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedInput;
import org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedWriteHandler;
Expand Down Expand Up @@ -73,6 +74,15 @@ public void close() throws Exception {

@Override
public ByteBuf readChunk(ChannelHandlerContext ctx) throws Exception {
return readChunk();
}

@Override
public ByteBuf readChunk(ByteBufAllocator byteBufAllocator) throws Exception {
return readChunk();
}

private ByteBuf readChunk() {
if (isClosed) {
return null;
} else if (buf.readableBytes() <= chunkSize) {
Expand All @@ -88,6 +98,16 @@ public ByteBuf readChunk(ChannelHandlerContext ctx) throws Exception {
}
}

@Override
public long length() {
return -1;
}

@Override
public long progress() {
return buf.readerIndex();
}

@Override
public String toString() {
return "ChunkedByteBuf{" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,4 +168,9 @@ public CompositeByteBuf compositeDirectBuffer(int maxNumComponents) {
public boolean isDirectBufferPooled() {
return alloc.isDirectBufferPooled();
}

@Override
public int calculateNewCapacity(int minNewCapacity, int maxCapacity) {
return alloc.calculateNewCapacity(minNewCapacity, maxCapacity);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,6 @@ else if (currentDecoder != null && msg instanceof HttpContent) {
currentRequest.setUri(encoder.toString());
}
}

data.release();
}
}
catch (EndOfDataDecoderException ignored) {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.FileChannel;
import java.nio.channels.GatheringByteChannel;
import java.nio.channels.ScatteringByteChannel;

Expand Down Expand Up @@ -181,22 +182,43 @@ protected short _getShort(int index) {
return memorySegment.getShortBigEndian(index);
}

@Override
protected short _getShortLE(int index) {
return memorySegment.getShortLittleEndian(index);
}

@Override
protected int _getUnsignedMedium(int index) {
// from UnpooledDirectByteBuf:
return (getByte(index) & 0xff) << 16 | (getByte(index + 1) & 0xff) << 8 | getByte(index + 2) & 0xff;
}

@Override
protected int _getUnsignedMediumLE(int index) {
// from UnpooledDirectByteBuf:
return getByte(index) & 255 | (getByte(index + 1) & 255) << 8 | (getByte(index + 2) & 255) << 16;
}

@Override
protected int _getInt(int index) {
return memorySegment.getIntBigEndian(index);
}

@Override
protected int _getIntLE(int index) {
return memorySegment.getIntLittleEndian(index);
}

@Override
protected long _getLong(int index) {
return memorySegment.getLongBigEndian(index);
}

@Override
protected long _getLongLE(int index) {
return memorySegment.getLongLittleEndian(index);
}

@Override
protected void _setByte(int index, int value) {
memorySegment.put(index, (byte) value);
Expand All @@ -207,6 +229,11 @@ protected void _setShort(int index, int value) {
memorySegment.putShortBigEndian(index, (short) value);
}

@Override
protected void _setShortLE(int index, int value) {
memorySegment.putShortLittleEndian(index, (short) value);
}

@Override
protected void _setMedium(int index, int value) {
// from UnpooledDirectByteBuf:
Expand All @@ -215,16 +242,34 @@ protected void _setMedium(int index, int value) {
setByte(index + 2, (byte) value);
}

@Override
protected void _setMediumLE(int index, int value){
// from UnpooledDirectByteBuf:
setByte(index, (byte) value);
setByte(index + 1, (byte) (value >>> 8));
setByte(index + 2, (byte) (value >>> 16));
}

@Override
protected void _setInt(int index, int value) {
memorySegment.putIntBigEndian(index, value);
}

@Override
protected void _setIntLE(int index, int value) {
memorySegment.putIntLittleEndian(index, value);
}

@Override
protected void _setLong(int index, long value) {
memorySegment.putLongBigEndian(index, value);
}

@Override
protected void _setLongLE(int index, long value) {
memorySegment.putLongLittleEndian(index, value);
}

@Override
public int capacity() {
return currentSize;
Expand Down Expand Up @@ -356,6 +401,18 @@ public int getBytes(int index, GatheringByteChannel out, int length) throws IOEx
return out.write(tmpBuf);
}

@Override
public int getBytes(int index, FileChannel out, long position, int length) throws IOException {
// adapted from UnpooledDirectByteBuf:
checkIndex(index, length);
if (length == 0) {
return 0;
}

ByteBuffer tmpBuf = memorySegment.wrap(index, length);
return out.write(tmpBuf, position);
}

@Override
public ByteBuf setBytes(int index, ByteBuf src, int srcIndex, int length) {
// from UnpooledDirectByteBuf:
Expand Down Expand Up @@ -424,6 +481,19 @@ public int setBytes(int index, ScatteringByteChannel in, int length) throws IOEx
}
}

@Override
public int setBytes(int index, FileChannel in, long position, int length) throws IOException {
// adapted from UnpooledDirectByteBuf:
checkIndex(index, length);

ByteBuffer tmpBuf = memorySegment.wrap(index, length);
try {
return in.read(tmpBuf, position);
} catch (ClosedChannelException ignored) {
return -1;
}
}

@Override
public ByteBufAllocator alloc() {
return checkNotNull(allocator);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,12 @@ private ReadOnlySlicedNetworkBuffer(ByteBuf buffer, int index, int length, int m

@Override
public ByteBuf unwrap() {
return super.unwrap().unwrap();
return super.unwrap();
}

@Override
public boolean isBuffer() {
return ((Buffer) unwrap()).isBuffer();
return getBuffer().isBuffer();
}

@Override
Expand All @@ -98,7 +98,7 @@ public void tagAsEvent() {
*/
@Override
public MemorySegment getMemorySegment() {
return ((Buffer) unwrap()).getMemorySegment();
return getBuffer().getMemorySegment();
}

@Override
Expand All @@ -108,22 +108,22 @@ public int getMemorySegmentOffset() {

@Override
public BufferRecycler getRecycler() {
return ((Buffer) unwrap()).getRecycler();
return getBuffer().getRecycler();
}

@Override
public void recycleBuffer() {
((Buffer) unwrap()).recycleBuffer();
getBuffer().recycleBuffer();
}

@Override
public boolean isRecycled() {
return ((Buffer) unwrap()).isRecycled();
return getBuffer().isRecycled();
}

@Override
public ReadOnlySlicedNetworkBuffer retainBuffer() {
((Buffer) unwrap()).retainBuffer();
getBuffer().retainBuffer();
return this;
}

Expand Down Expand Up @@ -203,11 +203,15 @@ public ByteBuf ensureWritable(int minWritableBytes) {

@Override
public void setAllocator(ByteBufAllocator allocator) {
((Buffer) unwrap()).setAllocator(allocator);
getBuffer().setAllocator(allocator);
}

@Override
public ByteBuf asByteBuf() {
return this;
}

private Buffer getBuffer() {
return ((Buffer) unwrap().unwrap());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ protected AbstractHandler(
protected void respondAsLeader(ChannelHandlerContext ctx, RoutedRequest routedRequest, T gateway) throws Exception {
HttpRequest httpRequest = routedRequest.getRequest();
if (log.isTraceEnabled()) {
log.trace("Received request " + httpRequest.getUri() + '.');
log.trace("Received request " + httpRequest.uri() + '.');
}

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,6 @@ protected void channelRead0(final ChannelHandlerContext ctx, final HttpObject ms
fileUpload.renameTo(dest.toFile());
ctx.channel().attr(UPLOADED_FILE).set(dest);
}
data.release();
}

if (httpContent instanceof LastHttpContent) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;

import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus.REQUEST_ENTITY_TOO_LARGE;

/**
* This client is the counter-part to the {@link RestServerEndpoint}.
*/
Expand Down Expand Up @@ -247,7 +249,14 @@ CompletableFuture<JsonResponse> getJsonFuture() {

@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) {
if (msg instanceof FullHttpResponse) {
if (msg instanceof HttpResponse && ((HttpResponse) msg).status().equals(REQUEST_ENTITY_TOO_LARGE)) {
jsonFuture.completeExceptionally(
new RestClientException(
String.format(
REQUEST_ENTITY_TOO_LARGE + ". Try to raise [%s]",
RestOptions.CLIENT_MAX_CONTENT_LENGTH.key()),
((HttpResponse) msg).status()));
} else if (msg instanceof FullHttpResponse) {
readRawResponse((FullHttpResponse) msg);
} else {
LOG.error("Implementation error: Received a response that wasn't a FullHttpResponse.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.QueryStringDecoder;
import org.apache.flink.shaded.netty4.io.netty.util.ReferenceCountUtil;
import org.apache.flink.shaded.netty4.io.netty.util.ReferenceCounted;

import java.util.Optional;
Expand All @@ -40,7 +41,7 @@ public RoutedRequest(RouteResult<T> result, HttpRequest request) {
this.result = checkNotNull(result);
this.request = checkNotNull(request);
this.requestAsReferenceCounted = Optional.ofNullable((request instanceof ReferenceCounted) ? (ReferenceCounted) request : null);
this.queryStringDecoder = new QueryStringDecoder(request.getUri());
this.queryStringDecoder = new QueryStringDecoder(request.uri());
}

public RouteResult<T> getRouteResult() {
Expand Down Expand Up @@ -94,4 +95,20 @@ public ReferenceCounted retain(int arg0) {
}
return this;
}

@Override
public ReferenceCounted touch() {
if (requestAsReferenceCounted.isPresent()) {
ReferenceCountUtil.touch(requestAsReferenceCounted.get());
}
return this;
}

@Override
public ReferenceCounted touch(Object hint) {
if (requestAsReferenceCounted.isPresent()) {
ReferenceCountUtil.touch(requestAsReferenceCounted.get(), hint);
}
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ protected void channelRead0(ChannelHandlerContext channelHandlerContext, HttpReq

// Route
HttpMethod method = httpRequest.getMethod();
QueryStringDecoder qsd = new QueryStringDecoder(httpRequest.getUri());
QueryStringDecoder qsd = new QueryStringDecoder(httpRequest.uri());
RouteResult<?> routeResult = router.route(method, qsd.path(), qsd.parameters());

if (routeResult == null) {
Expand Down
Loading

0 comments on commit 8169cf4

Please sign in to comment.