Skip to content

Commit

Permalink
[FLINK-19491] Add v2 snapshot test to AvroSerializerSnapshotTest
Browse files Browse the repository at this point in the history
We do this before updating the version to 3 and changing the
serialization format of the snapshot to ensure that the new code can
read the older snapshot.

This also adds the general infrastructure for testing restore from older
versions to the test.

The generated serializer snapshot was created with snapshot v2.
  • Loading branch information
aljoscha committed Nov 9, 2020
1 parent 8bc6a7b commit cf9e837
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,25 @@

import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil;
import org.apache.flink.core.memory.DataInputDeserializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputSerializer;
import org.apache.flink.formats.avro.generated.Address;
import org.apache.flink.formats.avro.generated.User;
import org.apache.flink.formats.avro.utils.TestDataGenerator;

import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.apache.avro.generic.GenericRecord;
import org.junit.Ignore;
import org.junit.Test;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Random;

import static org.apache.flink.api.common.typeutils.TypeSerializerMatchers.isCompatibleAfterMigration;
Expand All @@ -46,6 +52,8 @@
*/
public class AvroSerializerSnapshotTest {

private static final int[] PAST_VERSIONS = new int[] {2};

private static final Schema FIRST_NAME = SchemaBuilder.record("name")
.namespace("org.apache.flink")
.fields()
Expand Down Expand Up @@ -183,6 +191,48 @@ public void changingFromGenericToSpecificWithCompatibleSchemaShouldResultInCompa
assertThat(genericSnapshot.resolveSchemaCompatibility(specificSerializer), isCompatibleAsIs());
}

@Test
public void restorePastSnapshots() throws IOException {
for (int pastVersion : PAST_VERSIONS) {
AvroSerializer<GenericRecord> currentSerializer =
new AvroSerializer<>(GenericRecord.class, Address.getClassSchema());

DataInputView in = new DataInputDeserializer(Files.readAllBytes(
getSerializerSnapshotFilePath(pastVersion)));

TypeSerializerSnapshot<GenericRecord> restored = TypeSerializerSnapshotSerializationUtil
.readSerializerSnapshot(
in, AvroSerializer.class.getClassLoader(), null);

assertThat(restored.resolveSchemaCompatibility(currentSerializer), isCompatibleAsIs());
}
}

/**
* Creates a new serializer snapshot for the current version. Use this before bumping the
* snapshot version and also add the version (before bumping) to {@link #PAST_VERSIONS}.
*/
@Ignore
@Test
public void writeCurrentVersionSnapshot() throws IOException {
AvroSerializer<GenericRecord> serializer =
new AvroSerializer<>(GenericRecord.class, Address.getClassSchema());

DataOutputSerializer out = new DataOutputSerializer(1024);

TypeSerializerSnapshotSerializationUtil.writeSerializerSnapshot(
out, serializer.snapshotConfiguration(), serializer);

Path snapshotPath =
getSerializerSnapshotFilePath(new AvroSerializerSnapshot<>().getCurrentVersion());

Files.write(snapshotPath, out.getCopyOfBuffer());
}

private Path getSerializerSnapshotFilePath(int version) {
return Paths.get(System.getProperty("user.dir") + "/src/test/resources/serializer-snapshot-v" + version);
}

// ---------------------------------------------------------------------------------------------------------------
// Utils
// ---------------------------------------------------------------------------------------------------------------
Expand Down
Binary file not shown.
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -1457,6 +1457,7 @@ under the License.
<exclude>flink-connectors/flink-connector-kinesis/src/test/resources/profile</exclude>

<!-- snapshots -->
<exclude>**/src/test/resources/serializer-snapshot-*</exclude>
<exclude>**/src/test/resources/**/serializer-snapshot</exclude>
<exclude>**/src/test/resources/**/test-data</exclude>
<exclude>**/src/test/resources/*-snapshot</exclude>
Expand Down

0 comments on commit cf9e837

Please sign in to comment.