Skip to content

Commit

Permalink
[FLINK-16512][task] Unaligned checkpoints: API for persistence
Browse files Browse the repository at this point in the history
  • Loading branch information
rkhachatryan authored and zhijiangW committed Mar 11, 2020
1 parent 87ebe55 commit 1ad361b
Show file tree
Hide file tree
Showing 6 changed files with 332 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package org.apache.flink.runtime.checkpoint.channel;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import org.apache.flink.annotation.Internal;
import org.apache.flink.runtime.io.network.buffer.Buffer;

import java.io.IOException;

/**
* Reads channel state saved during checkpoint/savepoint.
*/
@Internal
public interface ChannelStateReader extends AutoCloseable {

/**
* Status of reading result.
*/
enum ReadResult { HAS_MORE_DATA, NO_MORE_DATA }

/**
* Put data into the supplied buffer to be injected into
* {@link org.apache.flink.runtime.io.network.partition.consumer.InputChannel InputChannel}.
*/
ReadResult readInputData(InputChannelInfo info, Buffer buffer) throws IOException;

/**
* Put data into the supplied buffer to be injected into
* {@link org.apache.flink.runtime.io.network.partition.ResultSubpartition ResultSubpartition}.
*/
ReadResult readOutputData(ResultSubpartitionInfo info, Buffer buffer) throws IOException;

@Override
void close() throws Exception;

ChannelStateReader NO_OP = new ChannelStateReader() {

@Override
public ReadResult readInputData(InputChannelInfo info, Buffer buffer) {
return ReadResult.NO_MORE_DATA;
}

@Override
public ReadResult readOutputData(ResultSubpartitionInfo info, Buffer buffer) {
return ReadResult.NO_MORE_DATA;
}

@Override
public void close() {
}
};
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.checkpoint.channel;

import org.apache.flink.annotation.Internal;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.state.StateObject;

import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;

/**
* Writes channel state during checkpoint/savepoint.
*/
@Internal
public interface ChannelStateWriter extends AutoCloseable {

/**
* Sequence number for the buffers that were saved during the previous execution attempt; then restored; and now are
* to be saved again (as opposed to the buffers received from the upstream or from the operator).
*/
int SEQUENCE_NUMBER_RESTORED = -1;

/**
* Signifies that buffer sequence number is unknown (e.g. if passing sequence numbers is not implemented).
*/
int SEQUENCE_NUMBER_UNKNOWN = -2;

/**
* Initiate write of channel state for the given checkpoint id.
*/
void start(long checkpointId);

/**
* Add in-flight buffers from the {@link org.apache.flink.runtime.io.network.partition.consumer.InputChannel InputChannel}.
* Must be called after {@link #start(long)} and before {@link #finish(long)}.
* @param startSeqNum sequence number of the 1st passed buffer.
* It is intended to use for incremental snapshots.
* If no data is passed it is ignored.
* @param data zero or more buffers ordered by their sequence numbers
* @see org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter#SEQUENCE_NUMBER_RESTORED
* @see org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter#SEQUENCE_NUMBER_UNKNOWN
*/
void addInputData(long checkpointId, InputChannelInfo info, int startSeqNum, Buffer... data);

/**
* Add in-flight buffers from the {@link org.apache.flink.runtime.io.network.partition.ResultSubpartition ResultSubpartition}.
* Must be called after {@link #start(long)} and before {@link #finish(long)}.
* @param startSeqNum sequence number of the 1st passed buffer.
* It is intended to use for incremental snapshots.
* If no data is passed it is ignored.
* @param data zero or more buffers ordered by their sequence numbers
* @see org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter#SEQUENCE_NUMBER_RESTORED
* @see org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter#SEQUENCE_NUMBER_UNKNOWN
*/
void addOutputData(long checkpointId, ResultSubpartitionInfo info, int startSeqNum, Buffer... data);

/**
* Finalize write of channel state for the given checkpoint id.
* Must be called after {@link #start(long)} and all of the data of the given checkpoint added.
*/
void finish(long checkpointId);

/**
* Must be called after {@link #start(long)}.
*/
Future<Collection<StateObject>> getWriteCompletionFuture(long checkpointId);

@Override
void close() throws Exception;

ChannelStateWriter NO_OP = new ChannelStateWriter() {

@Override
public void start(long checkpointId) {
}

@Override
public void addInputData(long checkpointId, InputChannelInfo info, int startSeqNum, Buffer... data) {
}

@Override
public void addOutputData(long checkpointId, ResultSubpartitionInfo info, int startSeqNum, Buffer... data) {
}

@Override
public void finish(long checkpointId) {
}

@Override
public Future<Collection<StateObject>> getWriteCompletionFuture(long checkpointId) {
return CompletableFuture.completedFuture(Collections.emptyList());
}

@Override
public void close() {
}
};

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.checkpoint.channel;

import org.apache.flink.annotation.Internal;

import java.io.Serializable;

/**
* Identifies {@link org.apache.flink.runtime.io.network.partition.consumer.InputChannel} in a given subtask.
* Note that {@link org.apache.flink.runtime.io.network.partition.consumer.InputChannelID InputChannelID}
* can not be used because it is generated randomly.
*/
@Internal
public class InputChannelInfo implements Serializable {
private static final long serialVersionUID = 1L;

private final int gateIdx;
private final int inputChannelIdx;

public InputChannelInfo(int gateIdx, int inputChannelIdx) {
this.gateIdx = gateIdx;
this.inputChannelIdx = inputChannelIdx;
}

public int getGateIdx() {
return gateIdx;
}

public int getInputChannelIdx() {
return inputChannelIdx;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.checkpoint.channel;

import org.apache.flink.annotation.Internal;

import java.io.Serializable;

/**
* Identifies {@link org.apache.flink.runtime.io.network.partition.ResultSubpartition ResultSubpartition} in a given subtask.
* Note that {@link org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID IntermediateResultPartitionID}
* can not be used because it: a) identifies the whole
* {@link org.apache.flink.runtime.io.network.partition.ResultPartition ResultPartition} b) is generated randomly.
*/
@Internal
public class ResultSubpartitionInfo implements Serializable {
private static final long serialVersionUID = 1L;

private final int partitionIdx;
private final int subPartitionIdx;

public ResultSubpartitionInfo(int partitionIdx, int subPartitionIdx) {
this.partitionIdx = partitionIdx;
this.subPartitionIdx = subPartitionIdx;
}

public int getPartitionIdx() {
return partitionIdx;
}

public int getSubPartitionIdx() {
return subPartitionIdx;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package org.apache.flink.runtime.state;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;

/**
* {@link StateObject Handle} to an {@link org.apache.flink.runtime.io.network.partition.consumer.InputChannel InputChannel} state.
*/
public interface InputChannelStateHandle extends StateObject {
InputChannelInfo getInputChannelInfo();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package org.apache.flink.runtime.state;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import org.apache.flink.runtime.checkpoint.channel.ResultSubpartitionInfo;

/**
* {@link StateObject Handle} to a {@link org.apache.flink.runtime.io.network.partition.ResultSubpartition ResultSubpartition} state.
*/
public interface ResultSubpartitionStateHandle extends StateObject {
ResultSubpartitionInfo getResultSubpartitionInfo();
}

0 comments on commit 1ad361b

Please sign in to comment.