Skip to content

Commit

Permalink
[hotfix][network] Introduce PartititonRequestClient interface for cre…
Browse files Browse the repository at this point in the history
…ating simple client instance in tests
  • Loading branch information
zhijiangW authored and tillrohrmann committed May 23, 2019
1 parent 22f87ee commit 0189db4
Show file tree
Hide file tree
Showing 12 changed files with 113 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@

package org.apache.flink.runtime.io.network;

import org.apache.flink.runtime.io.network.netty.PartitionRequestClient;

import java.io.IOException;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@

package org.apache.flink.runtime.io.network;

import org.apache.flink.runtime.io.network.netty.PartitionRequestClient;

/**
* A connection manager implementation to bypass setup overhead for task managers running in local
* execution mode.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.io.network;

import org.apache.flink.runtime.event.TaskEvent;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;

import java.io.IOException;

/**
* Client to send messages or task events via network for {@link RemoteInputChannel}.
*/
public interface PartitionRequestClient {

/**
* Requests a remote sub partition.
*
* @param partitionId The identifier of result partition to be requested.
* @param subpartitionIndex The sub partition index in the requested result partition.
* @param inputChannel The remote input channel for requesting the sub partition.
* @param delayMs The request is scheduled within a delay time.
*/
void requestSubpartition(
ResultPartitionID partitionId,
int subpartitionIndex,
RemoteInputChannel inputChannel,
int delayMs) throws IOException;

/**
* Notifies available credits from one remote input channel.
*
* @param inputChannel The remote input channel who announces the available credits.
*/
void notifyCreditAvailable(RemoteInputChannel inputChannel);

/**
* Sends a task event backwards to an intermediate result partition.
*
* @param partitionId The identifier of result partition.
* @param event The task event to be sent.
* @param inputChannel The remote input channel for sending this event.
*/
void sendTaskEvent(ResultPartitionID partitionId, TaskEvent event, RemoteInputChannel inputChannel) throws IOException;

/**
* Cancels the partition request for the given remote input channel and removes
* this client from factory if it is not referenced by any other input channels.
*
* @param inputChannel The remote input channel for canceling partition and to
* be removed from network stack.
*/
void close(RemoteInputChannel inputChannel) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.runtime.io.network.ConnectionID;
import org.apache.flink.runtime.io.network.ConnectionManager;
import org.apache.flink.runtime.io.network.PartitionRequestClient;
import org.apache.flink.runtime.io.network.TaskEventPublisher;
import org.apache.flink.runtime.io.network.partition.ResultPartitionProvider;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@
package org.apache.flink.runtime.io.network.netty;

import org.apache.flink.runtime.event.TaskEvent;
import org.apache.flink.runtime.io.network.NetworkClientHandler;
import org.apache.flink.runtime.io.network.ConnectionID;
import org.apache.flink.runtime.io.network.NetworkClientHandler;
import org.apache.flink.runtime.io.network.PartitionRequestClient;
import org.apache.flink.runtime.io.network.netty.exception.LocalTransportException;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
Expand All @@ -47,9 +48,9 @@
* <p>This client is shared by all remote input channels, which request a partition
* from the same {@link ConnectionID}.
*/
public class PartitionRequestClient {
public class NettyPartitionRequestClient implements PartitionRequestClient {

private static final Logger LOG = LoggerFactory.getLogger(PartitionRequestClient.class);
private static final Logger LOG = LoggerFactory.getLogger(NettyPartitionRequestClient.class);

private final Channel tcpChannel;

Expand All @@ -62,7 +63,7 @@ public class PartitionRequestClient {
/** If zero, the underlying TCP channel can be safely closed. */
private final AtomicDisposableReferenceCounter closeReferenceCounter = new AtomicDisposableReferenceCounter();

PartitionRequestClient(
NettyPartitionRequestClient(
Channel tcpChannel,
NetworkClientHandler clientHandler,
ConnectionID connectionId,
Expand Down Expand Up @@ -94,7 +95,8 @@ boolean incrementReferenceCounter() {
* <p>The request goes to the remote producer, for which this partition
* request client instance has been created.
*/
public ChannelFuture requestSubpartition(
@Override
public void requestSubpartition(
final ResultPartitionID partitionId,
final int subpartitionIndex,
final RemoteInputChannel inputChannel,
Expand Down Expand Up @@ -128,7 +130,6 @@ public void operationComplete(ChannelFuture future) throws Exception {
if (delayMs == 0) {
ChannelFuture f = tcpChannel.writeAndFlush(request);
f.addListener(listener);
return f;
} else {
final ChannelFuture[] f = new ChannelFuture[1];
tcpChannel.eventLoop().schedule(new Runnable() {
Expand All @@ -138,19 +139,18 @@ public void run() {
f[0].addListener(listener);
}
}, delayMs, TimeUnit.MILLISECONDS);

return f[0];
}
}

/**
* Sends a task event backwards to an intermediate result partition producer.
* <p>
* Backwards task events flow between readers and writers and therefore
*
* <p>Backwards task events flow between readers and writers and therefore
* will only work when both are running at the same time, which is only
* guaranteed to be the case when both the respective producer and
* consumer task run pipelined.
*/
@Override
public void sendTaskEvent(ResultPartitionID partitionId, TaskEvent event, final RemoteInputChannel inputChannel) throws IOException {
checkNotClosed();

Expand All @@ -170,10 +170,12 @@ public void operationComplete(ChannelFuture future) throws Exception {
});
}

@Override
public void notifyCreditAvailable(RemoteInputChannel inputChannel) {
clientHandler.notifyCreditAvailable(inputChannel);
}

@Override
public void close(RemoteInputChannel inputChannel) throws IOException {

clientHandler.removeInputChannel(inputChannel);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.runtime.io.network.ConnectionID;
import org.apache.flink.runtime.io.network.NetworkClientHandler;
import org.apache.flink.runtime.io.network.PartitionRequestClient;
import org.apache.flink.runtime.io.network.netty.exception.LocalTransportException;
import org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException;
import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
Expand All @@ -33,7 +34,7 @@
import java.util.concurrent.ConcurrentMap;

/**
* Factory for {@link PartitionRequestClient} instances.
* Factory for {@link NettyPartitionRequestClient} instances.
*
* <p>Instances of partition requests clients are shared among several {@link RemoteInputChannel}
* instances.
Expand All @@ -50,19 +51,19 @@ class PartitionRequestClientFactory {

/**
* Atomically establishes a TCP connection to the given remote address and
* creates a {@link PartitionRequestClient} instance for this connection.
* creates a {@link NettyPartitionRequestClient} instance for this connection.
*/
PartitionRequestClient createPartitionRequestClient(ConnectionID connectionId) throws IOException, InterruptedException {
NettyPartitionRequestClient createPartitionRequestClient(ConnectionID connectionId) throws IOException, InterruptedException {
Object entry;
PartitionRequestClient client = null;
NettyPartitionRequestClient client = null;

while (client == null) {
entry = clients.get(connectionId);

if (entry != null) {
// Existing channel or connecting channel
if (entry instanceof PartitionRequestClient) {
client = (PartitionRequestClient) entry;
if (entry instanceof NettyPartitionRequestClient) {
client = (NettyPartitionRequestClient) entry;
}
else {
ConnectingChannel future = (ConnectingChannel) entry;
Expand Down Expand Up @@ -92,7 +93,7 @@ else if (old instanceof ConnectingChannel) {
clients.replace(connectionId, old, client);
}
else {
client = (PartitionRequestClient) old;
client = (NettyPartitionRequestClient) old;
}
}

Expand Down Expand Up @@ -166,7 +167,7 @@ private void handInChannel(Channel channel) {
synchronized (connectLock) {
try {
NetworkClientHandler clientHandler = channel.pipeline().get(NetworkClientHandler.class);
partitionRequestClient = new PartitionRequestClient(
partitionRequestClient = new NettyPartitionRequestClient(
channel, clientHandler, connectionId, clientFactory);

if (disposeRequestClient) {
Expand All @@ -181,11 +182,11 @@ private void handInChannel(Channel channel) {
}
}

private volatile PartitionRequestClient partitionRequestClient;
private volatile NettyPartitionRequestClient partitionRequestClient;

private volatile Throwable error;

private PartitionRequestClient waitForChannel() throws IOException, InterruptedException {
private NettyPartitionRequestClient waitForChannel() throws IOException, InterruptedException {
synchronized (connectLock) {
while (error == null && partitionRequestClient == null) {
connectLock.wait(2000);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@
import org.apache.flink.runtime.event.TaskEvent;
import org.apache.flink.runtime.io.network.ConnectionID;
import org.apache.flink.runtime.io.network.ConnectionManager;
import org.apache.flink.runtime.io.network.PartitionRequestClient;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferListener;
import org.apache.flink.runtime.io.network.buffer.BufferProvider;
import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
import org.apache.flink.runtime.io.network.metrics.InputChannelMetrics;
import org.apache.flink.runtime.io.network.netty.PartitionRequestClient;
import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.util.ExceptionUtils;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.runtime.io.network.ConnectionID;
import org.apache.flink.runtime.io.network.NetworkClientHandler;
import org.apache.flink.runtime.io.network.PartitionRequestClient;
import org.apache.flink.runtime.io.network.TaskEventDispatcher;
import org.apache.flink.runtime.io.network.netty.NettyTestUtil.NettyServerAndClient;
import org.apache.flink.runtime.io.network.netty.exception.LocalTransportException;
Expand All @@ -32,7 +33,6 @@

import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter;
Expand All @@ -56,7 +56,6 @@
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.isA;
Expand Down Expand Up @@ -113,7 +112,7 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
}
});

PartitionRequestClient requestClient = new PartitionRequestClient(
PartitionRequestClient requestClient = new NettyPartitionRequestClient(
ch, handler, mock(ConnectionID.class), mock(PartitionRequestClientFactory.class));

// Create input channels
Expand All @@ -134,22 +133,20 @@ public Void answer(InvocationOnMock invocation) throws Throwable {
}).when(rich[1]).onError(isA(LocalTransportException.class));

// First request is successful
ChannelFuture f = requestClient.requestSubpartition(new ResultPartitionID(), 0, rich[0], 0);
assertTrue(f.await().isSuccess());
requestClient.requestSubpartition(new ResultPartitionID(), 0, rich[0], 0);

// Second request is *not* successful
f = requestClient.requestSubpartition(new ResultPartitionID(), 0, rich[1], 0);
assertFalse(f.await().isSuccess());
requestClient.requestSubpartition(new ResultPartitionID(), 0, rich[1], 0);

// Only the second channel should be notified about the error
verify(rich[0], times(0)).onError(any(LocalTransportException.class));

// Wait for the notification
// Wait for the notification and it could confirm all the request operations are done
if (!sync.await(TestingUtils.TESTING_DURATION().toMillis(), TimeUnit.MILLISECONDS)) {
fail("Timed out after waiting for " + TestingUtils.TESTING_DURATION().toMillis() +
" ms to be notified about the channel error.");
}

// Only the second channel should be notified about the error
verify(rich[0], times(0)).onError(any(LocalTransportException.class));

shutdown(serverAndClient);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.runtime.io.network.netty;

import org.apache.flink.runtime.io.network.ConnectionID;
import org.apache.flink.runtime.io.network.PartitionRequestClient;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferListener;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
Expand Down Expand Up @@ -244,7 +245,7 @@ public void testCancelBeforeActive() throws Exception {
public void testNotifyCreditAvailable() throws Exception {
final CreditBasedPartitionRequestClientHandler handler = new CreditBasedPartitionRequestClientHandler();
final EmbeddedChannel channel = new EmbeddedChannel(handler);
final PartitionRequestClient client = new PartitionRequestClient(
final PartitionRequestClient client = new NettyPartitionRequestClient(
channel, handler, mock(ConnectionID.class), mock(PartitionRequestClientFactory.class));

final NetworkBufferPool networkBufferPool = new NetworkBufferPool(10, 32, 2);
Expand Down Expand Up @@ -344,7 +345,7 @@ public void testNotifyCreditAvailable() throws Exception {
public void testNotifyCreditAvailableAfterReleased() throws Exception {
final CreditBasedPartitionRequestClientHandler handler = new CreditBasedPartitionRequestClientHandler();
final EmbeddedChannel channel = new EmbeddedChannel(handler);
final PartitionRequestClient client = new PartitionRequestClient(
final PartitionRequestClient client = new NettyPartitionRequestClient(
channel, handler, mock(ConnectionID.class), mock(PartitionRequestClientFactory.class));

final NetworkBufferPool networkBufferPool = new NetworkBufferPool(10, 32, 2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.runtime.io.network.netty;

import org.apache.flink.runtime.io.network.ConnectionID;
import org.apache.flink.runtime.io.network.PartitionRequestClient;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.netty.NettyMessage.PartitionRequest;
Expand All @@ -41,17 +42,17 @@
import static org.mockito.Mockito.mock;

/**
* Tests for {@link PartitionRequestClient}.
* Tests for {@link NettyPartitionRequestClient}.
*/
public class PartitionRequestClientTest {
public class NettyPartitionRequestClientTest {

@Test
public void testRetriggerPartitionRequest() throws Exception {
final long deadline = System.currentTimeMillis() + 30_000L; // 30 secs

final PartitionRequestClientHandler handler = new PartitionRequestClientHandler();
final EmbeddedChannel channel = new EmbeddedChannel(handler);
final PartitionRequestClient client = new PartitionRequestClient(
final PartitionRequestClient client = new NettyPartitionRequestClient(
channel, handler, mock(ConnectionID.class), mock(PartitionRequestClientFactory.class));

final int numExclusiveBuffers = 2;
Expand Down Expand Up @@ -110,7 +111,7 @@ public void testRetriggerPartitionRequest() throws Exception {
public void testDoublePartitionRequest() throws Exception {
final PartitionRequestClientHandler handler = new PartitionRequestClientHandler();
final EmbeddedChannel channel = new EmbeddedChannel(handler);
final PartitionRequestClient client = new PartitionRequestClient(
final PartitionRequestClient client = new NettyPartitionRequestClient(
channel, handler, mock(ConnectionID.class), mock(PartitionRequestClientFactory.class));

final int numExclusiveBuffers = 2;
Expand Down
Loading

0 comments on commit 0189db4

Please sign in to comment.