diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateReader.java new file mode 100644 index 0000000000000..49321cc1d429e --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateReader.java @@ -0,0 +1,66 @@ +package org.apache.flink.runtime.checkpoint.channel; +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import org.apache.flink.annotation.Internal; +import org.apache.flink.runtime.io.network.buffer.Buffer; + +import java.io.IOException; + +/** + * Reads channel state saved during checkpoint/savepoint. + */ +@Internal +public interface ChannelStateReader extends AutoCloseable { + + /** + * Status of reading result. + */ + enum ReadResult { HAS_MORE_DATA, NO_MORE_DATA } + + /** + * Put data into the supplied buffer to be injected into + * {@link org.apache.flink.runtime.io.network.partition.consumer.InputChannel InputChannel}. + */ + ReadResult readInputData(InputChannelInfo info, Buffer buffer) throws IOException; + + /** + * Put data into the supplied buffer to be injected into + * {@link org.apache.flink.runtime.io.network.partition.ResultSubpartition ResultSubpartition}. + */ + ReadResult readOutputData(ResultSubpartitionInfo info, Buffer buffer) throws IOException; + + @Override + void close() throws Exception; + + ChannelStateReader NO_OP = new ChannelStateReader() { + + @Override + public ReadResult readInputData(InputChannelInfo info, Buffer buffer) { + return ReadResult.NO_MORE_DATA; + } + + @Override + public ReadResult readOutputData(ResultSubpartitionInfo info, Buffer buffer) { + return ReadResult.NO_MORE_DATA; + } + + @Override + public void close() { + } + }; +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriter.java new file mode 100644 index 0000000000000..24a3a4edc6268 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriter.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint.channel; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.state.StateObject; + +import java.util.Collection; +import java.util.Collections; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Future; + +/** + * Writes channel state during checkpoint/savepoint. + */ +@Internal +public interface ChannelStateWriter extends AutoCloseable { + + /** + * Sequence number for the buffers that were saved during the previous execution attempt; then restored; and now are + * to be saved again (as opposed to the buffers received from the upstream or from the operator). + */ + int SEQUENCE_NUMBER_RESTORED = -1; + + /** + * Signifies that buffer sequence number is unknown (e.g. if passing sequence numbers is not implemented). + */ + int SEQUENCE_NUMBER_UNKNOWN = -2; + + /** + * Initiate write of channel state for the given checkpoint id. + */ + void start(long checkpointId); + + /** + * Add in-flight buffers from the {@link org.apache.flink.runtime.io.network.partition.consumer.InputChannel InputChannel}. + * Must be called after {@link #start(long)} and before {@link #finish(long)}. + * @param startSeqNum sequence number of the 1st passed buffer. + * It is intended to use for incremental snapshots. + * If no data is passed it is ignored. + * @param data zero or more buffers ordered by their sequence numbers + * @see org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter#SEQUENCE_NUMBER_RESTORED + * @see org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter#SEQUENCE_NUMBER_UNKNOWN + */ + void addInputData(long checkpointId, InputChannelInfo info, int startSeqNum, Buffer... data); + + /** + * Add in-flight buffers from the {@link org.apache.flink.runtime.io.network.partition.ResultSubpartition ResultSubpartition}. + * Must be called after {@link #start(long)} and before {@link #finish(long)}. + * @param startSeqNum sequence number of the 1st passed buffer. + * It is intended to use for incremental snapshots. + * If no data is passed it is ignored. + * @param data zero or more buffers ordered by their sequence numbers + * @see org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter#SEQUENCE_NUMBER_RESTORED + * @see org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter#SEQUENCE_NUMBER_UNKNOWN + */ + void addOutputData(long checkpointId, ResultSubpartitionInfo info, int startSeqNum, Buffer... data); + + /** + * Finalize write of channel state for the given checkpoint id. + * Must be called after {@link #start(long)} and all of the data of the given checkpoint added. + */ + void finish(long checkpointId); + + /** + * Must be called after {@link #start(long)}. + */ + Future> getWriteCompletionFuture(long checkpointId); + + @Override + void close() throws Exception; + + ChannelStateWriter NO_OP = new ChannelStateWriter() { + + @Override + public void start(long checkpointId) { + } + + @Override + public void addInputData(long checkpointId, InputChannelInfo info, int startSeqNum, Buffer... data) { + } + + @Override + public void addOutputData(long checkpointId, ResultSubpartitionInfo info, int startSeqNum, Buffer... data) { + } + + @Override + public void finish(long checkpointId) { + } + + @Override + public Future> getWriteCompletionFuture(long checkpointId) { + return CompletableFuture.completedFuture(Collections.emptyList()); + } + + @Override + public void close() { + } + }; + +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/InputChannelInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/InputChannelInfo.java new file mode 100644 index 0000000000000..e8339b8356734 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/InputChannelInfo.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint.channel; + +import org.apache.flink.annotation.Internal; + +import java.io.Serializable; + +/** + * Identifies {@link org.apache.flink.runtime.io.network.partition.consumer.InputChannel} in a given subtask. + * Note that {@link org.apache.flink.runtime.io.network.partition.consumer.InputChannelID InputChannelID} + * can not be used because it is generated randomly. + */ +@Internal +public class InputChannelInfo implements Serializable { + private static final long serialVersionUID = 1L; + + private final int gateIdx; + private final int inputChannelIdx; + + public InputChannelInfo(int gateIdx, int inputChannelIdx) { + this.gateIdx = gateIdx; + this.inputChannelIdx = inputChannelIdx; + } + + public int getGateIdx() { + return gateIdx; + } + + public int getInputChannelIdx() { + return inputChannelIdx; + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ResultSubpartitionInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ResultSubpartitionInfo.java new file mode 100644 index 0000000000000..513f1560c83f7 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ResultSubpartitionInfo.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint.channel; + +import org.apache.flink.annotation.Internal; + +import java.io.Serializable; + +/** + * Identifies {@link org.apache.flink.runtime.io.network.partition.ResultSubpartition ResultSubpartition} in a given subtask. + * Note that {@link org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID IntermediateResultPartitionID} + * can not be used because it: a) identifies the whole + * {@link org.apache.flink.runtime.io.network.partition.ResultPartition ResultPartition} b) is generated randomly. + */ +@Internal +public class ResultSubpartitionInfo implements Serializable { + private static final long serialVersionUID = 1L; + + private final int partitionIdx; + private final int subPartitionIdx; + + public ResultSubpartitionInfo(int partitionIdx, int subPartitionIdx) { + this.partitionIdx = partitionIdx; + this.subPartitionIdx = subPartitionIdx; + } + + public int getPartitionIdx() { + return partitionIdx; + } + + public int getSubPartitionIdx() { + return subPartitionIdx; + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/InputChannelStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/InputChannelStateHandle.java new file mode 100644 index 0000000000000..61b902f2f92f0 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/InputChannelStateHandle.java @@ -0,0 +1,26 @@ +package org.apache.flink.runtime.state; +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo; + +/** + * {@link StateObject Handle} to an {@link org.apache.flink.runtime.io.network.partition.consumer.InputChannel InputChannel} state. + */ +public interface InputChannelStateHandle extends StateObject { + InputChannelInfo getInputChannelInfo(); +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ResultSubpartitionStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ResultSubpartitionStateHandle.java new file mode 100644 index 0000000000000..7f6c90e37ec0e --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ResultSubpartitionStateHandle.java @@ -0,0 +1,26 @@ +package org.apache.flink.runtime.state; +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import org.apache.flink.runtime.checkpoint.channel.ResultSubpartitionInfo; + +/** + * {@link StateObject Handle} to a {@link org.apache.flink.runtime.io.network.partition.ResultSubpartition ResultSubpartition} state. + */ +public interface ResultSubpartitionStateHandle extends StateObject { + ResultSubpartitionInfo getResultSubpartitionInfo(); +}