Skip to content

Commit

Permalink
[FLINK-19586] Add committer operators for STREAMING mode for new Sink…
Browse files Browse the repository at this point in the history
… API
  • Loading branch information
guoweiM authored and kl0u committed Oct 24, 2020
1 parent 07c8918 commit e90e8cc
Show file tree
Hide file tree
Showing 10 changed files with 898 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.runtime.operators.sink;

import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
import org.apache.flink.api.connector.sink.Committer;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.TreeMap;

/**
* Abstract base class for operators that work with a {@link Committer} or a
* {@link org.apache.flink.api.connector.sink.GlobalCommitter} in the streaming execution mode.
*
* <p>Sub-classes are responsible for implementing {@link #recoveredCommittables(List)},
* {@link #prepareCommit(List)} and {@link #commit(List)}.
*
* @param <InputT> The input type of the {@link Committer}.
* @param <CommT> The committable type of the {@link Committer}.
*/
abstract class AbstractStreamingCommitterOperator<InputT, CommT> extends AbstractStreamOperator<CommT>
implements OneInputStreamOperator<InputT, CommT> {

private static final long serialVersionUID = 1L;

/** The operator's state descriptor. */
private static final ListStateDescriptor<byte[]> STREAMING_COMMITTER_RAW_STATES_DESC =
new ListStateDescriptor<>(
"streaming_committer_raw_states",
BytePrimitiveArraySerializer.INSTANCE);

/** Group the committable by the checkpoint id. */
private final NavigableMap<Long, List<CommT>> committablesPerCheckpoint;

/** The committable's serializer. */
private final StreamingCommitterStateSerializer<CommT> streamingCommitterStateSerializer;

/** The operator's state. */
private ListState<StreamingCommitterState<CommT>> streamingCommitterState;

/** Inputs collected between every pre-commit. */
private List<InputT> currentInputs;

/**
* Notifies a list of committables that might need to be committed again after recovering from a failover.
*
* @param committables A list of committables
*/
abstract void recoveredCommittables(List<CommT> committables);

/**
* Prepares a commit.
*
* @param input A list of input elements received since last pre-commit
*
* @return A list of committables that could be committed in the following checkpoint complete.
*/
abstract List<CommT> prepareCommit(List<InputT> input);

/**
* Commits a list of committables.
*
* @param committables A list of committables that is ready for committing.
*
* @return A list of committables needed to re-commit.
*/
abstract List<CommT> commit(List<CommT> committables) throws Exception;

AbstractStreamingCommitterOperator(SimpleVersionedSerializer<CommT> committableSerializer) {
this.streamingCommitterStateSerializer = new StreamingCommitterStateSerializer<>(
committableSerializer);
this.committablesPerCheckpoint = new TreeMap<>();
this.currentInputs = new ArrayList<>();
}

@Override
public void initializeState(StateInitializationContext context) throws Exception {
super.initializeState(context);
streamingCommitterState = new SimpleVersionedListState<>(
context.getOperatorStateStore().getListState(STREAMING_COMMITTER_RAW_STATES_DESC),
streamingCommitterStateSerializer);
final List<CommT> restored = new ArrayList<>();
streamingCommitterState.get().forEach(s -> restored.addAll(s.getCommittables()));
recoveredCommittables(restored);
}

@Override
public void processElement(StreamRecord<InputT> element) throws Exception {
currentInputs.add(element.getValue());
}

@Override
public void snapshotState(StateSnapshotContext context) throws Exception {
super.snapshotState(context);
committablesPerCheckpoint.put(context.getCheckpointId(), prepareCommit(currentInputs));
currentInputs = new ArrayList<>();

streamingCommitterState.update(
Collections.singletonList(new StreamingCommitterState<>(committablesPerCheckpoint)));
}

@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
super.notifyCheckpointComplete(checkpointId);
commitUpTo(checkpointId);
}

private void commitUpTo(long checkpointId) throws Exception {
final Iterator<Map.Entry<Long, List<CommT>>>
it = committablesPerCheckpoint.headMap(checkpointId, true).entrySet().iterator();

final List<CommT> readyCommittables = new ArrayList<>();

while (it.hasNext()) {
final Map.Entry<Long, List<CommT>> entry = it.next();
final List<CommT> committables = entry.getValue();

readyCommittables.addAll(committables);
it.remove();
}

LOG.info("Committing the state for checkpoint {}", checkpointId);
final List<CommT> neededToRetryCommittables = commit(readyCommittables);
if (!neededToRetryCommittables.isEmpty()) {
throw new UnsupportedOperationException("Currently does not support the re-commit!");
}

// TODO fix :: send only for the committer, not for the global COMMITTER
for (CommT committable : readyCommittables) {
output.collect(new StreamRecord<>(committable));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.runtime.operators.sink;

import org.apache.flink.api.connector.sink.Committer;
import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;

/**
* Base {@link OneInputStreamOperatorFactory} for subclasses of {@link AbstractStreamingCommitterOperator}.
*
* @param <InputT> The input type of the {@link Committer}.
* @param <CommT> The committable type of the {@link Committer}.
*/
abstract class AbstractStreamingCommitterOperatorFactory<InputT, CommT> extends AbstractStreamOperatorFactory<CommT>
implements OneInputStreamOperatorFactory<InputT, CommT> {

@SuppressWarnings("unchecked")
@Override
public <T extends StreamOperator<CommT>> T createStreamOperator(StreamOperatorParameters<CommT> parameters) {
final AbstractStreamingCommitterOperator<InputT, CommT> streamingCommitterOperator = createStreamingCommitterOperator();
streamingCommitterOperator.setup(
parameters.getContainingTask(),
parameters.getStreamConfig(),
parameters.getOutput());
return (T) streamingCommitterOperator;
}

abstract AbstractStreamingCommitterOperator<InputT, CommT> createStreamingCommitterOperator();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.runtime.operators.sink;

import org.apache.flink.api.connector.sink.Committer;
import org.apache.flink.core.io.SimpleVersionedSerializer;

import java.util.ArrayList;
import java.util.List;

import static org.apache.flink.util.Preconditions.checkNotNull;

/**
* Runtime {@link org.apache.flink.streaming.api.operators.StreamOperator} for executing
* {@link Committer} in the streaming execution mode.
*
* @param <CommT> The committable type of the {@link Committer}.
*/
final class StreamingCommitterOperator<CommT> extends AbstractStreamingCommitterOperator<CommT, CommT> {

/** The committables that might need to be committed again after recovering from a failover. */
private final List<CommT> recoveredCommittables;

/** Responsible for committing the committable to the external system. **/
private final Committer<CommT> committer;

StreamingCommitterOperator(
Committer<CommT> committer,
SimpleVersionedSerializer<CommT> committableSerializer) {
super(committableSerializer);
this.committer = checkNotNull(committer);
this.recoveredCommittables = new ArrayList<>();
}

@Override
void recoveredCommittables(List<CommT> committables) {
recoveredCommittables.addAll(checkNotNull(committables));
}

@Override
List<CommT> prepareCommit(List<CommT> input) {
checkNotNull(input);
final List<CommT> result = new ArrayList<>(recoveredCommittables);
recoveredCommittables.clear();
result.addAll(input);
return result;
}

@Override
List<CommT> commit(List<CommT> committables) throws Exception {
return committer.commit(checkNotNull(committables));
}

@Override
public void close() throws Exception {
super.close();
committer.close();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.runtime.operators.sink;

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.connector.sink.Committer;
import org.apache.flink.api.connector.sink.Sink;
import org.apache.flink.streaming.api.operators.StreamOperator;

import static org.apache.flink.util.Preconditions.checkNotNull;

/**
* A {@link org.apache.flink.streaming.api.operators.StreamOperatorFactory} for {@link StreamingCommitterOperator}.
*
* @param <CommT> The committable type of the {@link Committer}.
*/
@Internal
public class StreamingCommitterOperatorFactory<CommT> extends AbstractStreamingCommitterOperatorFactory<CommT, CommT> {

private final Sink<?, CommT, ?, ?> sink;

public StreamingCommitterOperatorFactory(Sink<?, CommT, ?, ?> sink) {
this.sink = checkNotNull(sink);
}

@Override
AbstractStreamingCommitterOperator<CommT, CommT> createStreamingCommitterOperator() {
return new StreamingCommitterOperator<>(
sink.createCommitter()
.orElseThrow(() -> new IllegalStateException(
"Could not create committer from the sink")),
sink.getCommittableSerializer()
.orElseThrow(() -> new IllegalStateException(
"Could not get committable serializer from the sink")));
}

@Override
@SuppressWarnings("rawtypes")
public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) {
return StreamingCommitterOperator.class;
}
}
Loading

0 comments on commit e90e8cc

Please sign in to comment.