Skip to content

Commit

Permalink
[FLINK-2924] [streaming] Out-of-core state backend for JDBC databases
Browse files Browse the repository at this point in the history
  • Loading branch information
gyfora committed Nov 24, 2015
1 parent ad6f826 commit 75a7c4b
Show file tree
Hide file tree
Showing 38 changed files with 2,910 additions and 73 deletions.
24 changes: 24 additions & 0 deletions flink-contrib/flink-streaming-contrib/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,30 @@ under the License.
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-tests</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${guava.version}</version>
</dependency>
<dependency>
<groupId>org.apache.derby</groupId>
<artifactId>derbyclient</artifactId>
<version>10.12.1.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.derby</groupId>
<artifactId>derbynet</artifactId>
<version>10.12.1.1</version>
<scope>test</scope>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.contrib.streaming.state;

import java.io.IOException;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.List;

import org.apache.flink.api.java.tuple.Tuple2;

public interface DbAdapter {

/**
* Initialize tables for storing non-partitioned checkpoints for the given
* job id and database connection.
*
*/
void createCheckpointsTable(String jobId, Connection con) throws SQLException;

/**
* Checkpoints will be inserted in the database using prepared statements.
* This methods should prepare and return the statement that will be used
* later to insert using the given connection.
*
*/
PreparedStatement prepareCheckpointInsert(String jobId, Connection con) throws SQLException;

/**
* Set the {@link PreparedStatement} parameters for the statement returned
* by {@link #prepareCheckpointInsert(String, Connection)}.
*
* @param jobId
* Id of the current job.
* @param insertStatement
* Statement returned by
* {@link #prepareCheckpointInsert(String, Connection)}.
* @param checkpointId
* Global checkpoint id.
* @param timestamp
* Global checkpoint timestamp.
* @param handleId
* Unique id assigned to this state checkpoint (should be primary
* key).
* @param checkpoint
* The serialized checkpoint.
* @throws SQLException
*/
void setCheckpointInsertParams(String jobId, PreparedStatement insertStatement, long checkpointId,
long timestamp, long handleId, byte[] checkpoint) throws SQLException;

/**
* Retrieve the serialized checkpoint data from the database.
*
* @param jobId
* Id of the current job.
* @param con
* Database connection
* @param checkpointId
* Global checkpoint id.
* @param checkpointTs
* Global checkpoint timestamp.
* @param handleId
* Unique id assigned to this state checkpoint (should be primary
* key).
* @return The byte[] corresponding to the checkpoint or null if missing.
* @throws SQLException
*/
byte[] getCheckpoint(String jobId, Connection con, long checkpointId, long checkpointTs, long handleId)
throws SQLException;

/**
* Remove the given checkpoint from the database.
*
* @param jobId
* Id of the current job.
* @param con
* Database connection
* @param checkpointId
* Global checkpoint id.
* @param checkpointTs
* Global checkpoint timestamp.
* @param handleId
* Unique id assigned to this state checkpoint (should be primary
* key).
* @return The byte[] corresponding to the checkpoint or null if missing.
* @throws SQLException
*/
void deleteCheckpoint(String jobId, Connection con, long checkpointId, long checkpointTs, long handleId)
throws SQLException;

/**
* Remove all states for the given JobId, by for instance dropping the
* entire table.
*
* @throws SQLException
*/
void disposeAllStateForJob(String jobId, Connection con) throws SQLException;

/**
* Initialize the necessary tables for the given stateId. The state id
* consist of the JobId+OperatorId+StateName.
*
*/
void createKVStateTable(String stateId, Connection con) throws SQLException;

/**
* Prepare the the statement that will be used to insert key-value pairs in
* the database.
*
*/
PreparedStatement prepareKVCheckpointInsert(String stateId, Connection con) throws SQLException;

/**
* Prepare the statement that will be used to lookup keys from the database.
* Keys and values are assumed to be byte arrays.
*
*/
PreparedStatement prepareKeyLookup(String stateId, Connection con) throws SQLException;

/**
* Retrieve the latest value from the database for a given key and
* checkpointId.
*
* @param stateId
* Unique identifier of the kvstate (usually the table name).
* @param lookupStatement
* The statement returned by
* {@link #prepareKeyLookup(String, Connection)}.
* @param key
* The key to lookup.
* @return The latest valid value for the key.
* @throws SQLException
*/
byte[] lookupKey(String stateId, PreparedStatement lookupStatement, byte[] key, long lookupId)
throws SQLException;

/**
* Clean up states between the current and next checkpoint id. Everything
* with larger than current and smaller than next should be removed.
*
*/
void cleanupFailedCheckpoints(String stateId, Connection con, long checkpointId,
long nextId) throws SQLException;

/**
* Insert a list of Key-Value pairs into the database. The suggested
* approach is to use idempotent inserts(updates) as 1 batch operation.
*
*/
void insertBatch(String stateId, DbBackendConfig conf, Connection con, PreparedStatement insertStatement,
long checkpointId, List<Tuple2<byte[], byte[]>> toInsert) throws IOException;

}
Loading

0 comments on commit 75a7c4b

Please sign in to comment.