Skip to content

Commit

Permalink
[FLINK-3798] [streaming] Clean up RocksDB backend field/method access
Browse files Browse the repository at this point in the history
  • Loading branch information
gyfora committed Apr 21, 2016
1 parent a43bade commit d636bf7
Show file tree
Hide file tree
Showing 6 changed files with 14 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,14 +60,14 @@ abstract class AbstractRocksDBState<K, N, S extends State, SD extends StateDescr
protected RocksDBStateBackend backend;

/** The column family of this particular instance of state */
ColumnFamilyHandle columnFamily;
protected ColumnFamilyHandle columnFamily;

/**
* Creates a new RocksDB backed state.
*
* @param namespaceSerializer The serializer for the namespace.
*/
AbstractRocksDBState(ColumnFamilyHandle columnFamily,
protected AbstractRocksDBState(ColumnFamilyHandle columnFamily,
TypeSerializer<N> namespaceSerializer,
RocksDBStateBackend backend) {

Expand All @@ -80,7 +80,7 @@ abstract class AbstractRocksDBState<K, N, S extends State, SD extends StateDescr
// ------------------------------------------------------------------------

@Override
final public void clear() {
public void clear() {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(baos);
try {
Expand All @@ -92,19 +92,19 @@ final public void clear() {
}
}

void writeKeyAndNamespace(DataOutputView out) throws IOException {
protected void writeKeyAndNamespace(DataOutputView out) throws IOException {
backend.keySerializer().serialize(backend.currentKey(), out);
out.writeByte(42);
namespaceSerializer.serialize(currentNamespace, out);
}

@Override
final public void setCurrentNamespace(N namespace) {
public void setCurrentNamespace(N namespace) {
this.currentNamespace = namespace;
}

@Override
final public void dispose() {
public void dispose() {
// ignore because we don't hold any state ourselves
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ class RocksDBFoldingState<K, N, T, ACC>
* @param stateDesc The state identifier for the state. This contains name
* and can create a default state value.
*/
RocksDBFoldingState(ColumnFamilyHandle columnFamily,
public RocksDBFoldingState(ColumnFamilyHandle columnFamily,
TypeSerializer<N> namespaceSerializer,
FoldingStateDescriptor<T, ACC> stateDesc,
RocksDBStateBackend backend) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ class RocksDBListState<K, N, V>
* @param stateDesc The state identifier for the state. This contains name
* and can create a default state value.
*/
RocksDBListState(ColumnFamilyHandle columnFamily,
public RocksDBListState(ColumnFamilyHandle columnFamily,
TypeSerializer<N> namespaceSerializer,
ListStateDescriptor<V> stateDesc,
RocksDBStateBackend backend) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ class RocksDBReducingState<K, N, V>
* @param stateDesc The state identifier for the state. This contains name
* and can create a default state value.
*/
RocksDBReducingState(ColumnFamilyHandle columnFamily,
public RocksDBReducingState(ColumnFamilyHandle columnFamily,
TypeSerializer<N> namespaceSerializer,
ReducingStateDescriptor<V> stateDesc,
RocksDBStateBackend backend) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ public class RocksDBStateBackend extends AbstractStateBackend {
* to store state. The different k/v states that we have don't each have their own RocksDB
* instance. They all write to this instance but to their own column family.
*/
transient RocksDB db;
protected transient RocksDB db;

/**
* Information about the k/v states as we create them. This is used to retrieve the
Expand Down Expand Up @@ -807,7 +807,7 @@ public final long getStateSize() throws Exception {
* <p>This also checks whether the {@link StateDescriptor} for a state matches the one
* that we checkpointed, i.e. is already in the map of column families.
*/
private ColumnFamilyHandle getColumnFamily(StateDescriptor descriptor) {
protected ColumnFamilyHandle getColumnFamily(StateDescriptor descriptor) {

Tuple2<ColumnFamilyHandle, StateDescriptor> stateInfo = kvStateInformation.get(descriptor.getName());

Expand All @@ -832,14 +832,14 @@ private ColumnFamilyHandle getColumnFamily(StateDescriptor descriptor) {
/**
* Used by k/v states to access the current key.
*/
Object currentKey() {
public Object currentKey() {
return currentKey;
}

/**
* Used by k/v states to access the key serializer.
*/
TypeSerializer keySerializer() {
public TypeSerializer keySerializer() {
return keySerializer;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ class RocksDBValueState<K, N, V>
* @param stateDesc The state identifier for the state. This contains name
* and can create a default state value.
*/
RocksDBValueState(ColumnFamilyHandle columnFamily,
public RocksDBValueState(ColumnFamilyHandle columnFamily,
TypeSerializer<N> namespaceSerializer,
ValueStateDescriptor<V> stateDesc,
RocksDBStateBackend backend) {
Expand Down

0 comments on commit d636bf7

Please sign in to comment.