Skip to content

Commit

Permalink
[FLINK-11328][core] Migrate NFAStateSerializer to new TypeSerializer …
Browse files Browse the repository at this point in the history
…interface

This closes apache#7566.
  • Loading branch information
dawidwys authored and tzulitai committed Jan 30, 2019
1 parent 8a8b8d8 commit 4e607e9
Show file tree
Hide file tree
Showing 9 changed files with 152 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.flink.cep.nfa;

import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
Expand All @@ -33,32 +32,48 @@
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.util.PriorityQueue;
import java.util.Queue;

import static org.apache.flink.util.Preconditions.checkNotNull;

/**
* A {@link TypeSerializer} for {@link NFAState} that uses Java Serialization.
* A {@link TypeSerializer} for {@link NFAState}.
*/
public class NFAStateSerializer extends TypeSerializerSingleton<NFAState> {

private static final long serialVersionUID = 2098282423980597010L;

public static final NFAStateSerializer INSTANCE = new NFAStateSerializer();
/**
* NOTE: this field should actually be final.
* The reason that it isn't final is due to backward compatible deserialization
* paths. See {@link #readObject(ObjectInputStream)}.
*/
private TypeSerializer<DeweyNumber> versionSerializer;
private TypeSerializer<NodeId> nodeIdSerializer;
private TypeSerializer<EventId> eventIdSerializer;

public NFAStateSerializer() {
this.versionSerializer = DeweyNumber.DeweyNumberSerializer.INSTANCE;
this.eventIdSerializer = EventId.EventIdSerializer.INSTANCE;
this.nodeIdSerializer = new NodeId.NodeIdSerializer();
}

private static final DeweyNumber.DeweyNumberSerializer VERSION_SERIALIZER = DeweyNumber.DeweyNumberSerializer.INSTANCE;
private static final NodeId.NodeIdSerializer NODE_ID_SERIALIZER = new NodeId.NodeIdSerializer();
private static final EventId.EventIdSerializer EVENT_ID_SERIALIZER = EventId.EventIdSerializer.INSTANCE;
NFAStateSerializer(
final TypeSerializer<DeweyNumber> versionSerializer,
final TypeSerializer<NodeId> nodeIdSerializer,
final TypeSerializer<EventId> eventIdSerializer) {
this.versionSerializer = checkNotNull(versionSerializer);
this.nodeIdSerializer = checkNotNull(nodeIdSerializer);
this.eventIdSerializer = checkNotNull(eventIdSerializer);
}

@Override
public boolean isImmutableType() {
return false;
}

@Override
public NFAStateSerializer duplicate() {
return new NFAStateSerializer();
}

@Override
public NFAState createInstance() {
return null;
Expand Down Expand Up @@ -130,24 +145,25 @@ public boolean canEqual(Object obj) {
return true;
}

// -----------------------------------------------------------------------------------

@Override
public TypeSerializerSnapshot<NFAState> snapshotConfiguration() {
return new NFAStateSerializerSnapshot();
return new NFAStateSerializerSnapshot(this);
}

private NFAStateSerializer() {
/*
Getters for internal serializers to use in NFAStateSerializerSnapshot.
*/

TypeSerializer<DeweyNumber> getVersionSerializer() {
return versionSerializer;
}

/**
* Serializer configuration snapshot for compatibility and format evolution.
*/
@SuppressWarnings("WeakerAccess")
public static final class NFAStateSerializerSnapshot extends SimpleTypeSerializerSnapshot<NFAState> {
public NFAStateSerializerSnapshot() {
super(() -> INSTANCE);
}
TypeSerializer<NodeId> getNodeIdSerializer() {
return nodeIdSerializer;
}

TypeSerializer<EventId> getEventIdSerializer() {
return eventIdSerializer;
}

/*
Expand Down Expand Up @@ -177,16 +193,16 @@ private void serializeSingleComputationState(
DataOutputView target) throws IOException {

StringValue.writeString(computationState.getCurrentStateName(), target);
NODE_ID_SERIALIZER.serialize(computationState.getPreviousBufferEntry(), target);
VERSION_SERIALIZER.serialize(computationState.getVersion(), target);
nodeIdSerializer.serialize(computationState.getPreviousBufferEntry(), target);
versionSerializer.serialize(computationState.getVersion(), target);
target.writeLong(computationState.getStartTimestamp());
serializeStartEvent(computationState.getStartEventID(), target);
}

private ComputationState deserializeSingleComputationState(DataInputView source) throws IOException {
String stateName = StringValue.readString(source);
NodeId prevState = NODE_ID_SERIALIZER.deserialize(source);
DeweyNumber version = VERSION_SERIALIZER.deserialize(source);
NodeId prevState = nodeIdSerializer.deserialize(source);
DeweyNumber version = versionSerializer.deserialize(source);
long startTimestamp = source.readLong();

EventId startEventId = deserializeStartEvent(source);
Expand All @@ -200,10 +216,10 @@ private ComputationState deserializeSingleComputationState(DataInputView source)

private void copySingleComputationState(DataInputView source, DataOutputView target) throws IOException {
StringValue.copyString(source, target);
NodeId prevState = NODE_ID_SERIALIZER.deserialize(source);
NODE_ID_SERIALIZER.serialize(prevState, target);
DeweyNumber version = VERSION_SERIALIZER.deserialize(source);
VERSION_SERIALIZER.serialize(version, target);
NodeId prevState = nodeIdSerializer.deserialize(source);
nodeIdSerializer.serialize(prevState, target);
DeweyNumber version = versionSerializer.deserialize(source);
versionSerializer.serialize(version, target);
long startTimestamp = source.readLong();
target.writeLong(startTimestamp);

Expand All @@ -213,7 +229,7 @@ private void copySingleComputationState(DataInputView source, DataOutputView tar
private void serializeStartEvent(EventId startEventID, DataOutputView target) throws IOException {
if (startEventID != null) {
target.writeByte(1);
EVENT_ID_SERIALIZER.serialize(startEventID, target);
eventIdSerializer.serialize(startEventID, target);
} else {
target.writeByte(0);
}
Expand All @@ -223,7 +239,7 @@ private EventId deserializeStartEvent(DataInputView source) throws IOException {
byte isNull = source.readByte();
EventId startEventId = null;
if (isNull == 1) {
startEventId = EVENT_ID_SERIALIZER.deserialize(source);
startEventId = eventIdSerializer.deserialize(source);
}
return startEventId;
}
Expand All @@ -233,8 +249,23 @@ private void copyStartEvent(DataInputView source, DataOutputView target) throws
target.writeByte(isNull);

if (isNull == 1) {
EventId startEventId = EVENT_ID_SERIALIZER.deserialize(source);
EVENT_ID_SERIALIZER.serialize(startEventId, target);
EventId startEventId = eventIdSerializer.deserialize(source);
eventIdSerializer.serialize(startEventId, target);
}
}

/*
* Backwards compatible deserializing of NFAStateSerializer.
*/
private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
in.defaultReadObject();

// the nested serializer will be null if this was read from a savepoint taken with versions
// lower than Flink 1.7; in this case, we explicitly create instance for the nested serializer.
if (versionSerializer == null || nodeIdSerializer == null || eventIdSerializer == null) {
this.versionSerializer = DeweyNumber.DeweyNumberSerializer.INSTANCE;
this.eventIdSerializer = EventId.EventIdSerializer.INSTANCE;
this.nodeIdSerializer = new NodeId.NodeIdSerializer();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.cep.nfa;

import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.cep.nfa.sharedbuffer.EventId;
import org.apache.flink.cep.nfa.sharedbuffer.NodeId;

/**
* Snapshot class for {@link NFAStateSerializer}.
*/
public class NFAStateSerializerSnapshot extends CompositeTypeSerializerSnapshot<NFAState, NFAStateSerializer> {

private static final int CURRENT_VERSION = 1;

/**
* Constructor for read instantiation.
*/
public NFAStateSerializerSnapshot() {
super(NFAStateSerializer.class);
}

/**
* Constructor to create the snapshot for writing.
*/
public NFAStateSerializerSnapshot(NFAStateSerializer serializerInstance) {
super(serializerInstance);
}

@Override
protected int getCurrentOuterSnapshotVersion() {
return CURRENT_VERSION;
}

@Override
protected TypeSerializer<?>[] getNestedSerializers(NFAStateSerializer outerSerializer) {
TypeSerializer<DeweyNumber> versionSerializer = outerSerializer.getVersionSerializer();
TypeSerializer<NodeId> nodeIdSerializer = outerSerializer.getNodeIdSerializer();
TypeSerializer<EventId> eventIdSerializer = outerSerializer.getEventIdSerializer();

return new TypeSerializer[]{versionSerializer, nodeIdSerializer, eventIdSerializer};
}

@Override
protected NFAStateSerializer createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] nestedSerializers) {

@SuppressWarnings("unchecked")
TypeSerializer<DeweyNumber> versionSerializer = (TypeSerializer<DeweyNumber>) nestedSerializers[0];

@SuppressWarnings("unchecked")
TypeSerializer<NodeId> nodeIdSerializer = (TypeSerializer<NodeId>) nestedSerializers[1];

@SuppressWarnings("unchecked")
TypeSerializer<EventId> eventIdSerializer = (TypeSerializer<EventId>) nestedSerializers[2];

return new NFAStateSerializer(versionSerializer, nodeIdSerializer, eventIdSerializer);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -171,9 +171,9 @@ public void initializeState(StateInitializationContext context) throws Exception

// initializeState through the provided context
computationStates = context.getKeyedStateStore().getState(
new ValueStateDescriptor<>(
NFA_STATE_NAME,
NFAStateSerializer.INSTANCE));
new ValueStateDescriptor<>(
NFA_STATE_NAME,
new NFAStateSerializer()));

partialMatches = new SharedBuffer<>(context.getKeyedStateStore(), inputSerializer);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

import org.apache.flink.api.common.typeutils.TypeSerializerSnapshotMigrationTestBase;
import org.apache.flink.cep.nfa.DeweyNumber;
import org.apache.flink.cep.nfa.NFAStateSerializer;
import org.apache.flink.cep.nfa.NFAStateSerializerSnapshot;
import org.apache.flink.cep.nfa.sharedbuffer.EventId;
import org.apache.flink.cep.nfa.sharedbuffer.NodeId;
import org.apache.flink.cep.nfa.sharedbuffer.SharedBufferEdge;
Expand All @@ -41,7 +43,6 @@ public NFASerializerSnapshotsMigrationTest(TestSpecification<Object> testSpecifi
super(testSpecification);
}

@SuppressWarnings("unchecked")
@Parameterized.Parameters(name = "Test Specification = {0}")
public static Collection<TestSpecification<?>> testSpecifications() {

Expand Down Expand Up @@ -72,6 +73,11 @@ public static Collection<TestSpecification<?>> testSpecifications() {
SharedBufferNode.SharedBufferNodeSerializer.class,
SharedBufferNode.SharedBufferNodeSerializer.SharedBufferNodeSerializerSnapshot.class,
SharedBufferNode.SharedBufferNodeSerializer::new);
testSpecifications.add(
"nfa-state-serializer",
NFAStateSerializer.class,
NFAStateSerializerSnapshot.class,
NFAStateSerializer::new);

return testSpecifications.get();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ public boolean filter(Event value) throws Exception {
nfaTestHarness.consumeRecord(new StreamRecord<>(d, 7));
nfaTestHarness.consumeRecord(new StreamRecord<>(a, 8));

NFAStateSerializer serializer = NFAStateSerializer.INSTANCE;
NFAStateSerializer serializer = new NFAStateSerializer();

//serialize
ByteArrayOutputStream baos = new ByteArrayOutputStream();
Expand Down
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.

0 comments on commit 4e607e9

Please sign in to comment.