Skip to content

Commit

Permalink
[FLINK-24468][runtime] Moved the methods for sending messages from Ne…
Browse files Browse the repository at this point in the history
…tworkClientHandler to NettyPartitioonRequestClient since it is not responsibility of handler to send them
  • Loading branch information
akalash authored and dawidwys committed Oct 21, 2021
1 parent 34de7d1 commit 4648e8a
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 147 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,35 +38,4 @@ public interface NetworkClientHandler extends ChannelHandler {
RemoteInputChannel getInputChannel(InputChannelID inputChannelId);

void cancelRequestFor(InputChannelID inputChannelId);

/**
* The credit begins to announce after receiving the sender's backlog from buffer response. Than
* means it should only happen after some interactions with the channel to make sure the context
* will not be null.
*
* @param inputChannel The input channel with unannounced credits.
*/
void notifyCreditAvailable(final RemoteInputChannel inputChannel);

/**
* The new size should be announced after it was calculated on the receiver side.
*
* @param inputChannel The input channel with new buffer size.
* @param bufferSize The new buffer size.
*/
void notifyNewBufferSize(final RemoteInputChannel inputChannel, int bufferSize);

/**
* Resumes data consumption from the producer after an exactly once checkpoint.
*
* @param inputChannel The input channel to resume data consumption.
*/
void resumeConsumption(RemoteInputChannel inputChannel);

/**
* Acknowledges all user records are processed for this channel.
*
* @param inputChannel The input channel to resume data consumption.
*/
void acknowledgeAllRecordsProcessed(RemoteInputChannel inputChannel);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.io.network.netty;

import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;

import javax.annotation.Nullable;

/** Abstract class for representing the output message. */
abstract class ClientOutboundMessage {
protected final RemoteInputChannel inputChannel;

ClientOutboundMessage(RemoteInputChannel inputChannel) {
this.inputChannel = inputChannel;
}

@Nullable
abstract Object buildMessage();
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,6 @@

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.io.network.NetworkClientHandler;
import org.apache.flink.runtime.io.network.netty.NettyMessage.AckAllUserRecordsProcessed;
import org.apache.flink.runtime.io.network.netty.NettyMessage.AddCredit;
import org.apache.flink.runtime.io.network.netty.NettyMessage.ResumeConsumption;
import org.apache.flink.runtime.io.network.netty.exception.LocalTransportException;
import org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException;
import org.apache.flink.runtime.io.network.netty.exception.TransportException;
Expand All @@ -39,17 +36,13 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import java.io.IOException;
import java.net.SocketAddress;
import java.util.ArrayDeque;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicReference;

import static org.apache.flink.util.Preconditions.checkNotNull;

/**
* Channel handler to read the messages of buffer response or error response from the producer, to
* write and flush the unannounced credits for the producer.
Expand Down Expand Up @@ -120,49 +113,6 @@ public void cancelRequestFor(InputChannelID inputChannelId) {
}
}

@Override
public void notifyCreditAvailable(final RemoteInputChannel inputChannel) {
ctx.executor()
.execute(
() ->
ctx.pipeline()
.fireUserEventTriggered(
new AddCreditMessage(inputChannel)));
}

@Override
public void notifyNewBufferSize(final RemoteInputChannel inputChannel, int bufferSize) {
ctx.executor()
.execute(
() ->
ctx.pipeline()
.fireUserEventTriggered(
new NewBufferSizeMessage(
inputChannel, bufferSize)));
}

@Override
public void resumeConsumption(RemoteInputChannel inputChannel) {
ctx.executor()
.execute(
() ->
ctx.pipeline()
.fireUserEventTriggered(
new ResumeConsumptionMessage(inputChannel)));
}

@Override
public void acknowledgeAllRecordsProcessed(RemoteInputChannel inputChannel) {
ctx.executor()
.execute(
() -> {
ctx.pipeline()
.fireUserEventTriggered(
new AcknowledgeAllRecordsProcessedMessage(
inputChannel));
});
}

// ------------------------------------------------------------------------
// Network events
// ------------------------------------------------------------------------
Expand Down Expand Up @@ -444,66 +394,4 @@ public void operationComplete(ChannelFuture future) throws Exception {
}
}
}

private abstract static class ClientOutboundMessage {
protected final RemoteInputChannel inputChannel;

ClientOutboundMessage(RemoteInputChannel inputChannel) {
this.inputChannel = inputChannel;
}

@Nullable
abstract Object buildMessage();
}

private static class AddCreditMessage extends ClientOutboundMessage {

AddCreditMessage(RemoteInputChannel inputChannel) {
super(checkNotNull(inputChannel));
}

@Override
public Object buildMessage() {
int credits = inputChannel.getAndResetUnannouncedCredit();
return credits > 0 ? new AddCredit(credits, inputChannel.getInputChannelId()) : null;
}
}

private static class NewBufferSizeMessage extends ClientOutboundMessage {
private final int bufferSize;

NewBufferSizeMessage(RemoteInputChannel inputChannel, int bufferSize) {
super(checkNotNull(inputChannel));
this.bufferSize = bufferSize;
}

@Override
public Object buildMessage() {
return new NettyMessage.NewBufferSize(bufferSize, inputChannel.getInputChannelId());
}
}

private static class ResumeConsumptionMessage extends ClientOutboundMessage {

ResumeConsumptionMessage(RemoteInputChannel inputChannel) {
super(checkNotNull(inputChannel));
}

@Override
Object buildMessage() {
return new ResumeConsumption(inputChannel.getInputChannelId());
}
}

private static class AcknowledgeAllRecordsProcessedMessage extends ClientOutboundMessage {

AcknowledgeAllRecordsProcessedMessage(RemoteInputChannel inputChannel) {
super(checkNotNull(inputChannel));
}

@Override
Object buildMessage() {
return new AckAllUserRecordsProcessed(inputChannel.getInputChannelId());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -195,22 +195,26 @@ public void operationComplete(ChannelFuture future) throws Exception {

@Override
public void notifyCreditAvailable(RemoteInputChannel inputChannel) {
clientHandler.notifyCreditAvailable(inputChannel);
sendToChannel(new AddCreditMessage(inputChannel));
}

@Override
public void notifyNewBufferSize(RemoteInputChannel inputChannel, int bufferSize) {
clientHandler.notifyNewBufferSize(inputChannel, bufferSize);
sendToChannel(new NewBufferSizeMessage(inputChannel, bufferSize));
}

@Override
public void resumeConsumption(RemoteInputChannel inputChannel) {
clientHandler.resumeConsumption(inputChannel);
sendToChannel(new ResumeConsumptionMessage(inputChannel));
}

@Override
public void acknowledgeAllRecordsProcessed(RemoteInputChannel inputChannel) {
clientHandler.acknowledgeAllRecordsProcessed(inputChannel);
sendToChannel(new AcknowledgeAllRecordsProcessedMessage(inputChannel));
}

private void sendToChannel(ClientOutboundMessage message) {
tcpChannel.eventLoop().execute(() -> tcpChannel.pipeline().fireUserEventTriggered(message));
}

@Override
Expand Down Expand Up @@ -240,4 +244,57 @@ private void checkNotClosed() throws IOException {
String.format("Channel to '%s' closed.", remoteAddr), localAddr);
}
}

private static class AddCreditMessage extends ClientOutboundMessage {

private AddCreditMessage(RemoteInputChannel inputChannel) {
super(checkNotNull(inputChannel));
}

@Override
Object buildMessage() {
int credits = inputChannel.getAndResetUnannouncedCredit();
return credits > 0
? new NettyMessage.AddCredit(credits, inputChannel.getInputChannelId())
: null;
}
}

private static class NewBufferSizeMessage extends ClientOutboundMessage {
private final int bufferSize;

private NewBufferSizeMessage(RemoteInputChannel inputChannel, int bufferSize) {
super(checkNotNull(inputChannel));
this.bufferSize = bufferSize;
}

@Override
Object buildMessage() {
return new NettyMessage.NewBufferSize(bufferSize, inputChannel.getInputChannelId());
}
}

private static class ResumeConsumptionMessage extends ClientOutboundMessage {

private ResumeConsumptionMessage(RemoteInputChannel inputChannel) {
super(checkNotNull(inputChannel));
}

@Override
Object buildMessage() {
return new NettyMessage.ResumeConsumption(inputChannel.getInputChannelId());
}
}

private static class AcknowledgeAllRecordsProcessedMessage extends ClientOutboundMessage {

private AcknowledgeAllRecordsProcessedMessage(RemoteInputChannel inputChannel) {
super(checkNotNull(inputChannel));
}

@Override
Object buildMessage() {
return new NettyMessage.AckAllUserRecordsProcessed(inputChannel.getInputChannelId());
}
}
}

0 comments on commit 4648e8a

Please sign in to comment.