Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Out-of-core state backend for JDBC databases #1305

Merged
merged 7 commits into from
Nov 24, 2015
Next Next commit
[FLINK-2916] [streaming] Expose operator and task information to Stat…
…eBackend
  • Loading branch information
gyfora committed Nov 24, 2015
commit ad6f826584be7527c58e2126e2828f82afc97875
Original file line number Diff line number Diff line change
Expand Up @@ -18,96 +18,98 @@

package org.apache.flink.runtime.state;

import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.execution.Environment;

import java.io.IOException;
import java.io.OutputStream;
import java.io.Serializable;

/**
* A state backend defines how state is stored and snapshotted during checkpoints.
*
*
* @param <Backend> The type of backend itself. This generic parameter is used to refer to the
* type of backend when creating state backed by this backend.
*/
public abstract class StateBackend<Backend extends StateBackend<Backend>> implements java.io.Serializable {

private static final long serialVersionUID = 4620413814639220247L;

// ------------------------------------------------------------------------
// initialization and cleanup
// ------------------------------------------------------------------------

/**
* This method is called by the task upon deployment to initialize the state backend for
* data for a specific job.
*
* @param job The ID of the job for which the state backend instance checkpoints data.
*
* @param The {@link Environment} of the task that instantiated the state backend
* @throws Exception Overwritten versions of this method may throw exceptions, in which
* case the job that uses the state backend is considered failed during
* deployment.
*/
public abstract void initializeForJob(JobID job) throws Exception;
public abstract void initializeForJob(Environment env) throws Exception;

/**
* Disposes all state associated with the current job.
*
*
* @throws Exception Exceptions may occur during disposal of the state and should be forwarded.
*/
public abstract void disposeAllStateForCurrentJob() throws Exception;

/**
* Closes the state backend, releasing all internal resources, but does not delete any persistent
* checkpoint data.
*
*
* @throws Exception Exceptions can be forwarded and will be logged by the system
*/
public abstract void close() throws Exception;

// ------------------------------------------------------------------------
// key/value state
// ------------------------------------------------------------------------

/**
* Creates a key/value state backed by this state backend.
*
*
* @param operatorId Unique id for the operator creating the state
* @param stateName Name of the created state
* @param keySerializer The serializer for the key.
* @param valueSerializer The serializer for the value.
* @param defaultValue The value that is returned when no other value has been associated with a key, yet.
* @param <K> The type of the key.
* @param <V> The type of the value.
*
*
* @return A new key/value state backed by this backend.
*
*
* @throws Exception Exceptions may occur during initialization of the state and should be forwarded.
*/
public abstract <K, V> KvState<K, V, Backend> createKvState(
public abstract <K, V> KvState<K, V, Backend> createKvState(int operatorId, String stateName,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would like to get rid of this change and simply let the state backend create a UID for the state name.

This method is called one per proper creation of a state (so it should not need deterministic state naming). Recovery happens from the state handle, which can store all required info.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not completely sure what you mean here.

Multiple different states can have the same name in different tasks. As far as I know we dont assume unique state names. This gets worse if the chained tasks have states with the same name then they actually go to the same backend as well.

I dont see how to go around this without an operator id. Could you please clarify your idea?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest to not let the operator supply an ID and name, but simply leave the naming of the state to the state backend. The SqlStateBackend could just use UUID.randomUUID().toString() instead of operatorId+stateName.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The point is that all parallel instances write to the same set of tables. This will way sharding is transparently handled and the job parallelism can actually change without affecting the state. (No need to repartition it)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Otherwise you will have to create p*numShards tables and you wont even know what state is in it from looking at the table names

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What you mention depends on the parallel subtask ID (which is already given in the initialize() method). The operatorId and name are the same for all parallel instances anyways.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The "name" (as a string) of the state is a very API specific thing that no other part of the runtime is concerned with. The operator ID is something specific to the StreamGraphBuilder and not to the streaming tasks at all. I think we are tying things together here that should not be tied together.

I still do not understand how this affects sharding. Does the shard assignment depend on the state name (rather than the parallel subtask / JobVertexId) ?

I only see that the table names will have the task name instead of the name of the state.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me first describe how sharding works than I will give a concrete example.
Key-Value pairs are sharded by key not by the subtask. This means that each parallel subtask maintains a connection to all the shards and partitions the states before writing them to the appropriate shards according to the user defined partitioner (in the backend config). This is much better than sharding by subtask because we can later change the parallelism of the job without affecting the state and also lets us defined a more elaborate sharding strategy through the partitioner.

This means, when a kv state is created we create a table for that kvstate in each shard. If we would do it according to your suggestion we would need to create numShards number of tables for each parallel instance (total of p*ns) for each kvstate. Furthermore this makes the fancy sharding useless because we cannot change the job parallelism. So we need to make sure that parallel subtasks of a given operator write to the same state tables (so we only have ns number of tables regardless of the parallelism).

In order to do this we need something that uniqely identifies a given state in the streaming program (and parallel instances should have the same id).

The information required to create such unique state id is an identifier for the operator that has the state + the name of the state. (The information obtained from the environment is not enough because chained operators have the same environment, therefore if they have conflicting state names the id is not unique). The only thing that identifies an operator in the logical streaming program is the operator id assigned by the jobgraphbuilder (thats the whole point of having it).

An example job with p=2 and numshards = 3:

chained map -> filter, both the mapper and filter has a state named "count", and let's assume that mapper has opid 1 and filter 2.

In this case the mapper would create 3 db tables (1 on each shard) with the same name kvstate_count_1_jobId. The filter would also create 3 tables with names: kvstate_count_2_jobId

All mapper instances would write to all three database shards, and the same goes for all the filters.

I hope you get what I am trying to say.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You might be right that state name and operator id is too api specific, but we will need ways to globally identify states which is impossible without that I think currently.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the description of the sharding. The issue is that you need a deterministic table name that each KeyValueState can create independently.

TypeSerializer<K> keySerializer, TypeSerializer<V> valueSerializer,
V defaultValue) throws Exception;


// ------------------------------------------------------------------------
// storing state for a checkpoint
// ------------------------------------------------------------------------

/**
* Creates an output stream that writes into the state of the given checkpoint. When the stream
* is closes, it returns a state handle that can retrieve the state back.
*
*
* @param checkpointID The ID of the checkpoint.
* @param timestamp The timestamp of the checkpoint.
* @return An output stream that writes state for the given checkpoint.
*
*
* @throws Exception Exceptions may occur while creating the stream and should be forwarded.
*/
public abstract CheckpointStateOutputStream createCheckpointStateOutputStream(
long checkpointID, long timestamp) throws Exception;

/**
* Creates a {@link DataOutputView} stream that writes into the state of the given checkpoint.
* When the stream is closes, it returns a state handle that can retrieve the state back.
Expand All @@ -125,20 +127,20 @@ public CheckpointStateOutputView createCheckpointStateOutputView(

/**
* Writes the given state into the checkpoint, and returns a handle that can retrieve the state back.
*
*
* @param state The state to be checkpointed.
* @param checkpointID The ID of the checkpoint.
* @param timestamp The timestamp of the checkpoint.
* @param <S> The type of the state.
*
*
* @return A state handle that can retrieve the checkpoined state.
*
*
* @throws Exception Exceptions may occur during serialization / storing the state and should be forwarded.
*/
public abstract <S extends Serializable> StateHandle<S> checkpointStateSerializable(
S state, long checkpointID, long timestamp) throws Exception;


// ------------------------------------------------------------------------
// Checkpoint state output stream
// ------------------------------------------------------------------------
Expand All @@ -151,7 +153,7 @@ public static abstract class CheckpointStateOutputStream extends OutputStream {
/**
* Closes the stream and gets a state handle that can create an input stream
* producing the data written to this stream.
*
*
* @return A state handle that can create an input stream producing the data written to this stream.
* @throws IOException Thrown, if the stream cannot be closed.
*/
Expand All @@ -162,9 +164,9 @@ public static abstract class CheckpointStateOutputStream extends OutputStream {
* A dedicated DataOutputView stream that produces a {@code StateHandle<DataInputView>} when closed.
*/
public static final class CheckpointStateOutputView extends DataOutputViewStreamWrapper {

private final CheckpointStateOutputStream out;

public CheckpointStateOutputView(CheckpointStateOutputStream out) {
super(out);
this.out = out;
Expand Down Expand Up @@ -193,7 +195,7 @@ public void close() throws IOException {
private static final class DataInputViewHandle implements StateHandle<DataInputView> {

private static final long serialVersionUID = 2891559813513532079L;

private final StreamStateHandle stream;

private DataInputViewHandle(StreamStateHandle stream) {
Expand All @@ -202,7 +204,7 @@ private DataInputViewHandle(StreamStateHandle stream) {

@Override
public DataInputView getState(ClassLoader userCodeClassLoader) throws Exception {
return new DataInputViewStreamWrapper(stream.getState(userCodeClassLoader));
return new DataInputViewStreamWrapper(stream.getState(userCodeClassLoader));
}

@Override
Expand Down
Loading