Skip to content

Commit

Permalink
[FLINK-11833] [State Backends] Cleanup unnecessary createKeyedStateBa…
Browse files Browse the repository at this point in the history
…ckend methods in StateBackend

This closes apache#7909.
  • Loading branch information
carp84 authored and StefanRRichter committed Mar 6, 2019
1 parent 6f84008 commit f10a7d8
Show file tree
Hide file tree
Showing 12 changed files with 185 additions and 179 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.queryablestate.KvStateID;
import org.apache.flink.queryablestate.client.VoidNamespace;
import org.apache.flink.queryablestate.client.VoidNamespaceSerializer;
Expand All @@ -40,6 +42,7 @@
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.internal.InternalKvState;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.NetUtils;

Expand Down Expand Up @@ -630,13 +633,17 @@ public void testClientServerIntegration() throws Throwable {
dummyEnv.setKvStateRegistry(dummyRegistry);

AbstractKeyedStateBackend<Integer> backend = abstractBackend.createKeyedStateBackend(
dummyEnv,
new JobID(),
"test_op",
IntSerializer.INSTANCE,
numKeyGroups,
new KeyGroupRange(0, 0),
dummyRegistry.createTaskRegistry(new JobID(), new JobVertexID()));
dummyEnv,
new JobID(),
"test_op",
IntSerializer.INSTANCE,
numKeyGroups,
new KeyGroupRange(0, 0),
dummyRegistry.createTaskRegistry(new JobID(), new JobVertexID()),
TtlTimeProvider.DEFAULT,
new UnregisteredMetricsGroup(),
Collections.emptyList(),
new CloseableRegistry());

final FiniteDuration timeout = new FiniteDuration(10, TimeUnit.SECONDS);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.queryablestate.client.VoidNamespace;
import org.apache.flink.queryablestate.client.VoidNamespaceSerializer;
import org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer;
Expand All @@ -39,6 +41,7 @@
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;

import org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap;
import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
Expand Down Expand Up @@ -117,7 +120,11 @@ public void testSimpleRequest() throws Throwable {
IntSerializer.INSTANCE,
numKeyGroups,
new KeyGroupRange(0, 0),
registry.createTaskRegistry(jobId, new JobVertexID()));
registry.createTaskRegistry(jobId, new JobVertexID()),
TtlTimeProvider.DEFAULT,
new UnregisteredMetricsGroup(),
Collections.emptyList(),
new CloseableRegistry());

final KvStateServerHandlerTest.TestRegistryListener registryListener =
new KvStateServerHandlerTest.TestRegistryListener();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
Expand All @@ -32,7 +31,6 @@

import java.io.IOException;
import java.util.Collection;
import java.util.Collections;

/**
* A <b>State Backend</b> defines how the state of a streaming application is stored and
Expand Down Expand Up @@ -122,117 +120,24 @@ public interface StateBackend extends java.io.Serializable {
// ------------------------------------------------------------------------
// Structure Backends
// ------------------------------------------------------------------------

/**
* Creates a new {@link AbstractKeyedStateBackend} that is responsible for holding <b>keyed state</b>
* and checkpointing it. Uses default TTL time provider.
*
* <p><i>Keyed State</i> is state where each value is bound to a key.
*
* @param <K> The type of the keys by which the state is organized.
*
* @return The Keyed State Backend for the given job, operator, and key group range.
*
* @throws Exception This method may forward all exceptions that occur while instantiating the backend.
*/
default <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
Environment env,
JobID jobID,
String operatorIdentifier,
TypeSerializer<K> keySerializer,
int numberOfKeyGroups,
KeyGroupRange keyGroupRange,
TaskKvStateRegistry kvStateRegistry) throws Exception {
return createKeyedStateBackend(
env,
jobID,
operatorIdentifier,
keySerializer,
numberOfKeyGroups,
keyGroupRange,
kvStateRegistry,
TtlTimeProvider.DEFAULT
);
}

/**
* Creates a new {@link AbstractKeyedStateBackend} that is responsible for holding <b>keyed state</b>
* and checkpointing it.
*
* <p><i>Keyed State</i> is state where each value is bound to a key.
*
* @param <K> The type of the keys by which the state is organized.
*
* @return The Keyed State Backend for the given job, operator, and key group range.
*
* @throws Exception This method may forward all exceptions that occur while instantiating the backend.
*/
default <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
Environment env,
JobID jobID,
String operatorIdentifier,
TypeSerializer<K> keySerializer,
int numberOfKeyGroups,
KeyGroupRange keyGroupRange,
TaskKvStateRegistry kvStateRegistry,
TtlTimeProvider ttlTimeProvider
) throws Exception {
return createKeyedStateBackend(
env,
jobID,
operatorIdentifier,
keySerializer,
numberOfKeyGroups,
keyGroupRange,
kvStateRegistry,
ttlTimeProvider,
new UnregisteredMetricsGroup(),
Collections.emptyList(),
new CloseableRegistry());
}

/**
* Creates a new {@link AbstractKeyedStateBackend} that is responsible for holding <b>keyed state</b>
* and checkpointing it.
*
* <p><i>Keyed State</i> is state where each value is bound to a key.
*
* @param <K> The type of the keys by which the state is organized.
* @return The Keyed State Backend for the given job, operator, and key group range.
* @throws Exception This method may forward all exceptions that occur while instantiating the backend.
*/
default <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
Environment env,
JobID jobID,
String operatorIdentifier,
TypeSerializer<K> keySerializer,
int numberOfKeyGroups,
KeyGroupRange keyGroupRange,
TaskKvStateRegistry kvStateRegistry,
TtlTimeProvider ttlTimeProvider,
Collection<KeyedStateHandle> stateHandles
) throws Exception {
return createKeyedStateBackend(
env,
jobID,
operatorIdentifier,
keySerializer,
numberOfKeyGroups,
keyGroupRange,
kvStateRegistry,
ttlTimeProvider,
new UnregisteredMetricsGroup(),
stateHandles,
new CloseableRegistry());
}

/**
* Creates a new {@link AbstractKeyedStateBackend} that is responsible for holding <b>keyed state</b>
* and checkpointing it.
*
* <p><i>Keyed State</i> is state where each value is bound to a key.
*
* @param <K> The type of the keys by which the state is organized.
* @param env The environment of the task.
* @param jobID The ID of the job that the task belongs to.
* @param operatorIdentifier The identifier text of the operator.
* @param keySerializer The key-serializer for the operator.
* @param numberOfKeyGroups The number of key-groups aka max parallelism.
* @param keyGroupRange Range of key-groups for which the to-be-created backend is responsible.
* @param kvStateRegistry KvStateRegistry helper for this task.
* @param ttlTimeProvider Provider for TTL logic to judge about state expiration.
* @param metricGroup The parent metric group for all state backend metrics.
* @param stateHandles The state handles for restore.
* @param cancelStreamRegistry The registry to which created closeable objects will be registered during restore.
* @param <K> The type of the keys by which the state is organized.
*
* @return The Keyed State Backend for the given job, operator, and key group range.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
Expand All @@ -31,6 +33,8 @@
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

import java.util.Collections;

import static org.junit.Assert.assertTrue;

/**
Expand Down Expand Up @@ -72,7 +76,10 @@ private void validateSupportForAsyncSnapshots(StateBackend backend) throws Excep
1,
new KeyGroupRange(0, 0),
null,
TtlTimeProvider.DEFAULT
TtlTimeProvider.DEFAULT,
new UnregisteredMetricsGroup(),
Collections.emptyList(),
new CloseableRegistry()
);

assertTrue(keyedStateBackend.supportsAsynchronousSnapshots());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.StateObjectCollection;
import org.apache.flink.runtime.execution.Environment;
Expand Down Expand Up @@ -1001,7 +1002,11 @@ private <K> AbstractKeyedStateBackend<K> createKeyedBackend(
keySerializer,
numberOfKeyGroups,
keyGroupRange,
env.getTaskKvStateRegistry());
env.getTaskKvStateRegistry(),
TtlTimeProvider.DEFAULT,
new UnregisteredMetricsGroup(),
Collections.emptyList(),
new CloseableRegistry());
return backend;
}

Expand Down Expand Up @@ -1034,9 +1039,11 @@ private <K> AbstractKeyedStateBackend<K> restoreKeyedBackend(
keySerializer,
numberOfKeyGroups,
keyGroupRange,
env.getTaskKvStateRegistry()
, TtlTimeProvider.DEFAULT,
state);
env.getTaskKvStateRegistry(),
TtlTimeProvider.DEFAULT,
new UnregisteredMetricsGroup(),
state,
new CloseableRegistry());
return backend;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,11 @@
import org.apache.flink.api.java.typeutils.runtime.PojoSerializer;
import org.apache.flink.api.java.typeutils.runtime.kryo.JavaSerializer;
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.core.testutils.CheckedThread;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.queryablestate.KvStateID;
import org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
Expand Down Expand Up @@ -172,14 +174,17 @@ protected <K> AbstractKeyedStateBackend<K> createKeyedBackend(
Environment env) throws Exception {

AbstractKeyedStateBackend<K> backend = getStateBackend().createKeyedStateBackend(
env,
new JobID(),
"test_op",
keySerializer,
numberOfKeyGroups,
keyGroupRange,
env.getTaskKvStateRegistry(),
TtlTimeProvider.DEFAULT);
env,
new JobID(),
"test_op",
keySerializer,
numberOfKeyGroups,
keyGroupRange,
env.getTaskKvStateRegistry(),
TtlTimeProvider.DEFAULT,
new UnregisteredMetricsGroup(),
Collections.emptyList(),
new CloseableRegistry());

return backend;
}
Expand Down Expand Up @@ -216,7 +221,9 @@ protected <K> AbstractKeyedStateBackend<K> restoreKeyedBackend(
keyGroupRange,
env.getTaskKvStateRegistry(),
TtlTimeProvider.DEFAULT,
state);
new UnregisteredMetricsGroup(),
state,
new CloseableRegistry());

return backend;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
Expand Down Expand Up @@ -92,8 +94,17 @@ void createAndRestoreKeyedStateBackend(int numberOfKeyGroups, KeyedStateHandle s
try {
disposeKeyedStateBackend();
keyedStateBackend = stateBackend.createKeyedStateBackend(
env, new JobID(), "test", StringSerializer.INSTANCE, numberOfKeyGroups,
new KeyGroupRange(0, numberOfKeyGroups - 1), env.getTaskKvStateRegistry(), timeProvider, stateHandles);
env,
new JobID(),
"test",
StringSerializer.INSTANCE,
numberOfKeyGroups,
new KeyGroupRange(0, numberOfKeyGroups - 1),
env.getTaskKvStateRegistry(),
timeProvider,
new UnregisteredMetricsGroup(),
stateHandles,
new CloseableRegistry());
} catch (Exception e) {
throw new RuntimeException("unexpected", e);
}
Expand Down
Loading

0 comments on commit f10a7d8

Please sign in to comment.