Skip to content

Commit

Permalink
[FLINK-8360][checkpointing] Implement state storage for local recover…
Browse files Browse the repository at this point in the history
…y and integrate with task lifecycle
  • Loading branch information
StefanRRichter committed Feb 25, 2018
1 parent ea0d16d commit df3e6bb
Show file tree
Hide file tree
Showing 159 changed files with 8,768 additions and 1,834 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.checkpoint.StateObjectCollection;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.streaming.api.operators.StreamSink;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
Expand Down Expand Up @@ -389,34 +390,34 @@ public void testScaleUpAfterScalingDown() throws Exception {
final int parallelism3 = 3;
final int maxParallelism = Math.max(parallelism1, Math.max(parallelism2, parallelism3));

List<OperatorStateHandle> operatorStateHandles = repartitionAndExecute(
List<OperatorStateHandle> operatorSubtaskState = repartitionAndExecute(
topic,
Collections.emptyList(),
parallelism1,
maxParallelism,
IntStream.range(0, parallelism1).boxed().iterator());

operatorStateHandles = repartitionAndExecute(
operatorSubtaskState = repartitionAndExecute(
topic,
operatorStateHandles,
operatorSubtaskState,
parallelism2,
maxParallelism,
IntStream.range(parallelism1, parallelism1 + parallelism2).boxed().iterator());

operatorStateHandles = repartitionAndExecute(
operatorSubtaskState = repartitionAndExecute(
topic,
operatorStateHandles,
operatorSubtaskState,
parallelism3,
maxParallelism,
IntStream.range(parallelism1 + parallelism2, parallelism1 + parallelism2 + parallelism3).boxed().iterator());

// After each previous repartitionAndExecute call, we are left with some lingering transactions, that would
// not allow us to read all committed messages from the topic. Thus we initialize operators from
// operatorStateHandles once more, but without any new data. This should terminate all ongoing transactions.
// OperatorSubtaskState once more, but without any new data. This should terminate all ongoing transactions.

operatorStateHandles = repartitionAndExecute(
operatorSubtaskState = repartitionAndExecute(
topic,
operatorStateHandles,
operatorSubtaskState,
1,
maxParallelism,
Collections.emptyIterator());
Expand Down Expand Up @@ -448,10 +449,10 @@ private List<OperatorStateHandle> repartitionAndExecute(
testHarness.setup();

testHarness.initializeState(new OperatorSubtaskState(
inputStates,
Collections.emptyList(),
Collections.emptyList(),
Collections.emptyList()));
new StateObjectCollection<>(inputStates),
StateObjectCollection.empty(),
StateObjectCollection.empty(),
StateObjectCollection.empty()));
testHarness.open();

if (inputData.hasNext()) {
Expand All @@ -460,9 +461,9 @@ private List<OperatorStateHandle> repartitionAndExecute(
OperatorSubtaskState snapshot = testHarness.snapshot(0, 0);

outputStates.addAll(snapshot.getManagedOperatorState());
checkState(snapshot.getRawOperatorState() == null, "Unexpected raw operator state");
checkState(snapshot.getManagedKeyedState() == null, "Unexpected managed keyed state");
checkState(snapshot.getRawKeyedState() == null, "Unexpected raw keyed state");
checkState(snapshot.getRawOperatorState().isEmpty(), "Unexpected raw operator state");
checkState(snapshot.getManagedKeyedState().isEmpty(), "Unexpected managed keyed state");
checkState(snapshot.getRawKeyedState().isEmpty(), "Unexpected raw keyed state");

for (int i = 1; i < FlinkKafkaProducer011.DEFAULT_KAFKA_PRODUCERS_POOL_SIZE - 1; i++) {
testHarness.processElement(-nextValue, 0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,20 @@ public class CheckpointingOptions {
" complete checkpoint state. Some state backends may not support incremental checkpoints and ignore" +
" this option.");

/**
* This option configures local recovery for this state backend.
*/
public static final ConfigOption<String> LOCAL_RECOVERY = ConfigOptions
.key("state.backend.local-recovery")
.defaultValue("DISABLED");

/**
* The config parameter defining the root directories for storing file-based state for local recovery.
*/
public static final ConfigOption<String> LOCAL_RECOVERY_TASK_MANAGER_STATE_ROOT_DIRS = ConfigOptions
.key("taskmanager.state.local.root-dirs")
.noDefaultValue();

// ------------------------------------------------------------------------
// Options specific to the file-system-based state backends
// ------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,48 @@

package org.apache.flink.configuration;

import javax.annotation.Nonnull;

import java.io.File;

/**
* Utility class for {@link Configuration} related helper functions.
*/
public class ConfigurationUtils {

private static final String[] EMPTY = new String[0];

/**
* Extracts the task manager directories for temporary files as defined by
* {@link org.apache.flink.configuration.CoreOptions#TMP_DIRS}.
*
* @param configuration configuration object
* @return array of configured directories (in order)
*/
@Nonnull
public static String[] parseTempDirectories(Configuration configuration) {
return configuration.getString(CoreOptions.TMP_DIRS).split(",|" + File.pathSeparator);
return splitPaths(configuration.getString(CoreOptions.TMP_DIRS));
}

/**
* Extracts the local state directories as defined by
* {@link CheckpointingOptions#LOCAL_RECOVERY_TASK_MANAGER_STATE_ROOT_DIRS}.
*
* @param configuration configuration object
* @return array of configured directories (in order)
*/
@Nonnull
public static String[] parseLocalStateDirectories(Configuration configuration) {
String configValue = configuration.getString(CheckpointingOptions.LOCAL_RECOVERY_TASK_MANAGER_STATE_ROOT_DIRS, "");
return splitPaths(configValue);
}

@Nonnull
private static String[] splitPaths(@Nonnull String separatedPaths) {
return separatedPaths.length() > 0 ? separatedPaths.split(",|" + File.pathSeparator) : EMPTY;
}

// Make sure that we cannot instantiate this class
private ConfigurationUtils() {}
private ConfigurationUtils() {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -211,10 +211,12 @@ private boolean delete(final File f) throws IOException {

if (f.isDirectory()) {
final File[] files = f.listFiles();
for (File file : files) {
final boolean del = delete(file);
if (!del) {
return false;
if (files != null) {
for (File file : files) {
final boolean del = delete(file);
if (!del) {
return false;
}
}
}
} else {
Expand Down
36 changes: 36 additions & 0 deletions flink-core/src/main/java/org/apache/flink/util/Disposable.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.util;

/**
* Interface for classes that can be disposed, i.e. that have a dedicated lifecycle step to "destroy" the object. On
* reason for this is for example to release native resources. From this point, the interface fulfills a similar purpose
* as the {@link java.io.Closeable} interface, but sometimes both should be represented as isolated, independent
* lifecycle steps.
*/
public interface Disposable {

/**
* Disposes the object and releases all resources. After calling this method, calling any methods on the
* object may result in undefined behavior.
*
* @throws Exception if something goes wrong during disposal.
*/
void dispose() throws Exception;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.util;

import org.mockito.Mockito;
import org.mockito.internal.util.MockUtil;

import java.lang.reflect.Array;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Collections;
import java.util.Set;
import java.util.function.Function;
import java.util.function.Supplier;

import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.spy;

/**
* Helper class with a method that attempts to automatically test method forwarding between a delegate and a wrapper.
*/
public class MethodForwardingTestUtil {

/**
* This is a best effort automatic test for method forwarding between a delegate and its wrapper, where the wrapper
* class is a subtype of the delegate. This ignores methods that are inherited from Object.
*
* @param delegateClass the class for the delegate.
* @param wrapperFactory factory that produces a wrapper from a delegate.
* @param <D> type of the delegate
* @param <W> type of the wrapper
*/
public static <D, W> void testMethodForwarding(
Class<D> delegateClass,
Function<D, W> wrapperFactory)
throws NoSuchMethodException, IllegalAccessException, InvocationTargetException {
testMethodForwarding(delegateClass, wrapperFactory, () -> spy(delegateClass), Collections.emptySet());
}

/**
* This is a best effort automatic test for method forwarding between a delegate and its wrapper, where the wrapper
* class is a subtype of the delegate. This ignores methods that are inherited from Object.
*
* @param delegateClass the class for the delegate.
* @param wrapperFactory factory that produces a wrapper from a delegate.
* @param delegateObjectSupplier supplier for the delegate object passed to the wrapper factory.
* @param <D> type of the delegate
* @param <W> type of the wrapper
* @param <I> type of the object created as delegate, is a subtype of D.
*/
public static <D, W, I extends D> void testMethodForwarding(
Class<D> delegateClass,
Function<I, W> wrapperFactory,
Supplier<I> delegateObjectSupplier)
throws NoSuchMethodException, IllegalAccessException, InvocationTargetException {
testMethodForwarding(delegateClass, wrapperFactory, delegateObjectSupplier, Collections.emptySet());
}

/**
* This is a best effort automatic test for method forwarding between a delegate and its wrapper, where the wrapper
* class is a subtype of the delegate. Methods can be remapped in case that the implementation does not call the
* original method. Remapping to null skips the method. This ignores methods that are inherited from Object.
*
* @param delegateClass the class for the delegate.
* @param wrapperFactory factory that produces a wrapper from a delegate.
* @param delegateObjectSupplier supplier for the delegate object passed to the wrapper factory.
* @param skipMethodSet set of methods to ignore.
* @param <D> type of the delegate
* @param <W> type of the wrapper
* @param <I> type of the object created as delegate, is a subtype of D.
*/
public static <D, W, I extends D> void testMethodForwarding(
Class<D> delegateClass,
Function<I, W> wrapperFactory,
Supplier<I> delegateObjectSupplier,
Set<Method> skipMethodSet) throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {

Preconditions.checkNotNull(delegateClass);
Preconditions.checkNotNull(wrapperFactory);
Preconditions.checkNotNull(skipMethodSet);

I delegate = delegateObjectSupplier.get();

//check if we need to wrap the delegate object as a spy, or if it is already testable with Mockito.
MockUtil mockUtil = new MockUtil();
if (!mockUtil.isSpy(delegate) || !mockUtil.isMock(delegate)) {
delegate = spy(delegate);
}

W wrapper = wrapperFactory.apply(delegate);

// ensure that wrapper is a subtype of delegate
Preconditions.checkArgument(delegateClass.isAssignableFrom(wrapper.getClass()));

for (Method delegateMethod : delegateClass.getMethods()) {

if (checkSkipMethodForwardCheck(delegateMethod, skipMethodSet)) {
continue;
}

// find the correct method to substitute the bridge for erased generic types.
// if this doesn't work, the user need to exclude the method and write an additional test.
Method wrapperMethod = wrapper.getClass().getMethod(
delegateMethod.getName(),
delegateMethod.getParameterTypes());

// things get a bit fuzzy here, best effort to find a match but this might end up with a wrong method.
if (wrapperMethod.isBridge()) {
for (Method method : wrapper.getClass().getMethods()) {
if (!method.isBridge()
&& method.getName().equals(wrapperMethod.getName())
&& method.getParameterCount() == wrapperMethod.getParameterCount()) {
wrapperMethod = method;
break;
}
}
}

Class<?>[] parameterTypes = wrapperMethod.getParameterTypes();
Object[] arguments = new Object[parameterTypes.length];
for (int j = 0; j < arguments.length; j++) {
Class<?> parameterType = parameterTypes[j];
if (parameterType.isArray()) {
arguments[j] = Array.newInstance(parameterType.getComponentType(), 0);
} else if (parameterType.isPrimitive()) {
if (boolean.class.equals(parameterType)) {
arguments[j] = false;
} else if (char.class.equals(parameterType)) {
arguments[j] = 'a';
} else {
arguments[j] = (byte) 0;
}
} else {
arguments[j] = Mockito.mock(parameterType);
}
}

wrapperMethod.invoke(wrapper, arguments);
delegateMethod.invoke(Mockito.verify(delegate, Mockito.times(1)), arguments);
reset(delegate);
}
}

/**
* Test if this method should be skipped in our check for proper forwarding, e.g. because it is just a bridge.
*/
private static boolean checkSkipMethodForwardCheck(Method delegateMethod, Set<Method> skipMethods) {

if (delegateMethod.isBridge()
|| delegateMethod.isDefault()
|| skipMethods.contains(delegateMethod)) {
return true;
}

// skip methods declared in Object (Mockito doesn't like them)
try {
Object.class.getMethod(delegateMethod.getName(), delegateMethod.getParameterTypes());
return true;
} catch (Exception ignore) {
}
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -120,9 +120,7 @@ private static void createHtmlFile(DocumentingRestEndpoint restEndpoint, Path ou
List<MessageHeaders> specs = restEndpoint.getSpecs();
specs.forEach(spec -> html.append(createHtmlEntry(spec)));

if (Files.exists(outputFile)) {
Files.delete(outputFile);
}
Files.deleteIfExists(outputFile);
Files.write(outputFile, html.toString().getBytes(StandardCharsets.UTF_8));
}

Expand Down
Loading

0 comments on commit df3e6bb

Please sign in to comment.