Skip to content

Commit

Permalink
[FLINK-27046][tests] Migrate avro/json schema registry to JUnit 5
Browse files Browse the repository at this point in the history
  • Loading branch information
RyanSkraba committed Apr 5, 2022
1 parent 352ca38 commit 6692ff3
Show file tree
Hide file tree
Showing 10 changed files with 123 additions and 110 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,10 @@

package org.apache.flink.formats.avro.glue.schema.registry;

import org.apache.flink.util.TestLogger;

import com.amazonaws.services.schemaregistry.utils.AWSSchemaRegistryConstants;
import org.apache.avro.Schema;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

import java.io.File;
import java.io.IOException;
Expand All @@ -33,13 +31,13 @@
import static org.assertj.core.api.Assertions.assertThat;

/** Tests for {@link GlueSchemaRegistryAvroDeserializationSchema}. */
public class GlueSchemaRegistryAvroDeserializationSchemaTest extends TestLogger {
class GlueSchemaRegistryAvroDeserializationSchemaTest {
private static final String AVRO_USER_SCHEMA_FILE = "src/test/java/resources/avro/user.avsc";
private static Schema userSchema;
private static Map<String, Object> configs = new HashMap<>();

@BeforeClass
public static void setup() throws IOException {
@BeforeAll
static void setup() throws IOException {
configs.put(AWSSchemaRegistryConstants.AWS_REGION, "us-west-2");
configs.put(AWSSchemaRegistryConstants.AWS_ENDPOINT, "https://test");
configs.put(AWSSchemaRegistryConstants.SCHEMA_AUTO_REGISTRATION_SETTING, true);
Expand All @@ -50,19 +48,17 @@ public static void setup() throws IOException {

/** Test whether forGeneric method works. */
@Test
public void testForGeneric_withValidParams_succeeds() {
assertThat(GlueSchemaRegistryAvroDeserializationSchema.forGeneric(userSchema, configs))
.isNotNull();
void testForGeneric_withValidParams_succeeds() {
assertThat(GlueSchemaRegistryAvroDeserializationSchema.forGeneric(userSchema, configs))
.isNotNull()
.isInstanceOf(GlueSchemaRegistryAvroDeserializationSchema.class);
}

/** Test whether forSpecific method works. */
@Test
public void testForSpecific_withValidParams_succeeds() {
assertThat(GlueSchemaRegistryAvroDeserializationSchema.forSpecific(User.class, configs))
.isNotNull();
void testForSpecific_withValidParams_succeeds() {
assertThat(GlueSchemaRegistryAvroDeserializationSchema.forSpecific(User.class, configs))
.isNotNull()
.isInstanceOf(GlueSchemaRegistryAvroDeserializationSchema.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@

package org.apache.flink.formats.avro.glue.schema.registry;

import org.apache.flink.util.TestLogger;

import com.amazonaws.services.schemaregistry.common.AWSSchemaRegistryClient;
import com.amazonaws.services.schemaregistry.common.configs.GlueSchemaRegistryConfiguration;
import com.amazonaws.services.schemaregistry.deserializers.GlueSchemaRegistryDeserializationFacade;
Expand All @@ -28,10 +26,8 @@
import com.amazonaws.services.schemaregistry.utils.AWSSchemaRegistryConstants;
import lombok.NonNull;
import org.apache.avro.Schema;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import software.amazon.awssdk.services.glue.model.EntityNotFoundException;
Expand All @@ -48,9 +44,10 @@
import java.util.UUID;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

/** Tests for {@link GlueSchemaRegistryAvroSchemaCoder}. */
public class GlueSchemaRegistryAvroSchemaCoderTest extends TestLogger {
class GlueSchemaRegistryAvroSchemaCoderTest {
private static final String testTopic = "Test-Topic";
private static final String schemaName = "User-Topic";
private static final String AVRO_USER_SCHEMA_FILE = "src/test/java/resources/avro/user.avsc";
Expand All @@ -70,10 +67,9 @@ public class GlueSchemaRegistryAvroSchemaCoderTest extends TestLogger {
private static AWSSchemaRegistryClient mockClient;
private static GlueSchemaRegistryInputStreamDeserializer mockInputStreamDeserializer;
private static GlueSchemaRegistryOutputStreamSerializer mockOutputStreamSerializer;
@Rule public ExpectedException thrown = ExpectedException.none();

@BeforeClass
public static void setup() throws IOException {
@BeforeAll
static void setup() throws IOException {
metadata.put("test-key", "test-value");
metadata.put(AWSSchemaRegistryConstants.TRANSPORT_METADATA_KEY, testTopic);

Expand All @@ -98,13 +94,13 @@ public static void setup() throws IOException {

/** Test whether constructor works. */
@Test
public void testConstructor_withConfigs_succeeds() {
void testConstructor_withConfigs_succeeds() {
assertThat(new GlueSchemaRegistryAvroSchemaCoder(testTopic, configs)).isNotNull();
}

/** Test whether readSchema method works. */
@Test
public void testReadSchema_withValidParams_succeeds() throws IOException {
void testReadSchema_withValidParams_succeeds() throws IOException {
GlueSchemaRegistryAvroSchemaCoder glueSchemaRegistryAvroSchemaCoder =
new GlueSchemaRegistryAvroSchemaCoder(mockInputStreamDeserializer);
Schema resultSchema =
Expand All @@ -115,7 +111,7 @@ public void testReadSchema_withValidParams_succeeds() throws IOException {

/** Test whether writeSchema method works. */
@Test
public void testWriteSchema_withValidParams_succeeds() throws IOException {
void testWriteSchema_withValidParams_succeeds() throws IOException {
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
outputStream.write(actualBytes);
GlueSchemaRegistryAvroSchemaCoder glueSchemaRegistryAvroSchemaCoder =
Expand All @@ -127,7 +123,7 @@ public void testWriteSchema_withValidParams_succeeds() throws IOException {

/** Test whether writeSchema method throws exception if auto registration un-enabled. */
@Test
public void testWriteSchema_withoutAutoRegistration_throwsException() throws IOException {
void testWriteSchema_withoutAutoRegistration_throwsException() {
configs.put(AWSSchemaRegistryConstants.SCHEMA_AUTO_REGISTRATION_SETTING, false);
mockClient = new MockAWSSchemaRegistryClient();

Expand All @@ -145,9 +141,12 @@ public void testWriteSchema_withoutAutoRegistration_throwsException() throws IOE
GlueSchemaRegistryAvroSchemaCoder glueSchemaRegistryAvroSchemaCoder =
new GlueSchemaRegistryAvroSchemaCoder(glueSchemaRegistryOutputStreamSerializer);

thrown.expect(AWSSchemaRegistryException.class);
thrown.expectMessage(AWSSchemaRegistryConstants.AUTO_REGISTRATION_IS_DISABLED_MSG);
glueSchemaRegistryAvroSchemaCoder.writeSchema(userSchema, new ByteArrayOutputStream());
assertThatThrownBy(
() ->
glueSchemaRegistryAvroSchemaCoder.writeSchema(
userSchema, new ByteArrayOutputStream()))
.isInstanceOf(AWSSchemaRegistryException.class)
.hasMessage(AWSSchemaRegistryConstants.AUTO_REGISTRATION_IS_DISABLED_MSG);
}

private void testForSerializedData(byte[] serializedData) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,12 @@

package org.apache.flink.formats.avro.glue.schema.registry;

import org.apache.flink.util.TestLogger;

import com.amazonaws.services.schemaregistry.common.configs.GlueSchemaRegistryConfiguration;
import com.amazonaws.services.schemaregistry.serializers.GlueSchemaRegistrySerializationFacade;
import com.amazonaws.services.schemaregistry.utils.AWSSchemaRegistryConstants;
import org.apache.avro.Schema;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;

Expand All @@ -37,7 +35,7 @@
import static org.assertj.core.api.Assertions.assertThat;

/** Tests for {@link GlueSchemaRegistryAvroSerializationSchema}. */
public class GlueSchemaRegistryAvroSerializationSchemaTest extends TestLogger {
class GlueSchemaRegistryAvroSerializationSchemaTest {
private static final String testTopic = "Test-Topic";
private static final String schemaName = "User-Topic";
private static final String AVRO_USER_SCHEMA_FILE = "src/test/java/resources/avro/user.avsc";
Expand All @@ -55,8 +53,8 @@ public class GlueSchemaRegistryAvroSerializationSchemaTest extends TestLogger {
DefaultCredentialsProvider.builder().build();
private static GlueSchemaRegistrySerializationFacade mockSerializationFacade;

@BeforeClass
public static void setup() throws IOException {
@BeforeAll
static void setup() throws IOException {
metadata.put("test-key", "test-value");
metadata.put(AWSSchemaRegistryConstants.TRANSPORT_METADATA_KEY, testTopic);

Expand All @@ -81,7 +79,7 @@ public static void setup() throws IOException {

/** Test whether forGeneric method works. */
@Test
public void testForGeneric_withValidParams_succeeds() {
void testForGeneric_withValidParams_succeeds() {
assertThat(
GlueSchemaRegistryAvroSerializationSchema.forGeneric(
userSchema, testTopic, configs))
Expand All @@ -94,7 +92,7 @@ public void testForGeneric_withValidParams_succeeds() {

/** Test whether forSpecific method works. */
@Test
public void testForSpecific_withValidParams_succeeds() {
void testForSpecific_withValidParams_succeeds() {
assertThat(
GlueSchemaRegistryAvroSerializationSchema.forSpecific(
User.class, testTopic, configs))
Expand All @@ -107,7 +105,7 @@ public void testForSpecific_withValidParams_succeeds() {

/** Test whether serialize method when compression is not enabled works. */
@Test
public void testSerialize_withValidParams_withoutCompression_succeeds() {
void testSerialize_withValidParams_withoutCompression_succeeds() {
AWSSchemaRegistryConstants.COMPRESSION compressionType =
AWSSchemaRegistryConstants.COMPRESSION.NONE;
configs.put(AWSSchemaRegistryConstants.COMPRESSION_TYPE, compressionType.name());
Expand All @@ -128,7 +126,7 @@ public void testSerialize_withValidParams_withoutCompression_succeeds() {

/** Test whether serialize method when compression is enabled works. */
@Test
public void testSerialize_withValidParams_withCompression_succeeds() {
void testSerialize_withValidParams_withCompression_succeeds() {
AWSSchemaRegistryConstants.COMPRESSION compressionType =
AWSSchemaRegistryConstants.COMPRESSION.ZLIB;
configs.put(AWSSchemaRegistryConstants.COMPRESSION_TYPE, compressionType.name());
Expand All @@ -149,7 +147,7 @@ public void testSerialize_withValidParams_withCompression_succeeds() {

/** Test whether serialize method returns null when input object is null. */
@Test
public void testSerialize_withNullObject_returnNull() {
void testSerialize_withNullObject_returnNull() {
GlueSchemaRegistryAvroSerializationSchema<User> glueSchemaRegistryAvroSerializationSchema =
GlueSchemaRegistryAvroSerializationSchema.forSpecific(
User.class, testTopic, configs);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.flink.formats.avro.glue.schema.registry;

import org.apache.flink.formats.avro.utils.MutableByteArrayInputStream;
import org.apache.flink.util.TestLogger;

import com.amazonaws.services.schemaregistry.common.GlueSchemaRegistryCompressionHandler;
import com.amazonaws.services.schemaregistry.common.GlueSchemaRegistryDefaultCompression;
Expand All @@ -33,10 +32,8 @@
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.specific.SpecificDatumWriter;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import software.amazon.awssdk.services.glue.model.DataFormat;
Expand All @@ -55,9 +52,10 @@
import static org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_ACCESS_KEY_ID;
import static org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_SECRET_ACCESS_KEY;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

/** Tests for {@link GlueSchemaRegistryInputStreamDeserializer}. */
public class GlueSchemaRegistryInputStreamDeserializerTest extends TestLogger {
class GlueSchemaRegistryInputStreamDeserializerTest {
private static final String testTopic = "Test-Topic";
private static final UUID USER_SCHEMA_VERSION_ID = UUID.randomUUID();
private static final String AVRO_USER_SCHEMA_FILE = "src/test/java/resources/avro/user.avsc";
Expand All @@ -70,11 +68,10 @@ public class GlueSchemaRegistryInputStreamDeserializerTest extends TestLogger {
private static GlueSchemaRegistryCompressionHandler compressionHandler;
private static final AwsCredentialsProvider credentialsProvider =
DefaultCredentialsProvider.builder().build();
@Rule public ExpectedException thrown = ExpectedException.none();
private GlueSchemaRegistryDeserializationFacade glueSchemaRegistryDeserializationFacade;

@Before
public void setup() throws IOException {
@BeforeEach
void setup() throws IOException {
metadata.put("test-key", "test-value");
metadata.put(AWSSchemaRegistryConstants.TRANSPORT_METADATA_KEY, testTopic);

Expand All @@ -97,15 +94,15 @@ public void setup() throws IOException {

/** Test whether constructor works with configuration map. */
@Test
public void testConstructor_withConfigs_succeeds() {
void testConstructor_withConfigs_succeeds() {
GlueSchemaRegistryInputStreamDeserializer glueSchemaRegistryInputStreamDeserializer =
new GlueSchemaRegistryInputStreamDeserializer(configs);
assertThat(glueSchemaRegistryInputStreamDeserializer)
.isInstanceOf(GlueSchemaRegistryInputStreamDeserializer.class);
}

@Test
public void testDefaultAwsCredentialsProvider() throws Exception {
void testDefaultAwsCredentialsProvider() throws Exception {
GlueSchemaRegistryInputStreamDeserializer glueSchemaRegistryInputStreamDeserializer =
new GlueSchemaRegistryInputStreamDeserializer(configs);

Expand All @@ -119,7 +116,7 @@ public void testDefaultAwsCredentialsProvider() throws Exception {
}

@Test
public void testAwsCredentialsProviderFromConfig() throws Exception {
void testAwsCredentialsProviderFromConfig() throws Exception {
Map<String, Object> config = new HashMap<>(configs);
config.put(AWS_ACCESS_KEY_ID, "ak");
config.put(AWS_SECRET_ACCESS_KEY, "sk");
Expand All @@ -139,7 +136,7 @@ public void testAwsCredentialsProviderFromConfig() throws Exception {

/** Test whether constructor works with AWS de-serializer input. */
@Test
public void testConstructor_withDeserializer_succeeds() {
void testConstructor_withDeserializer_succeeds() {
GlueSchemaRegistryInputStreamDeserializer glueSchemaRegistryInputStreamDeserializer =
new GlueSchemaRegistryInputStreamDeserializer(
glueSchemaRegistryDeserializationFacade);
Expand All @@ -149,8 +146,7 @@ public void testConstructor_withDeserializer_succeeds() {

/** Test whether getSchemaAndDeserializedStream method when compression is not enabled works. */
@Test
public void testGetSchemaAndDeserializedStream_withoutCompression_succeeds()
throws IOException {
void testGetSchemaAndDeserializedStream_withoutCompression_succeeds() throws IOException {
compressionByte = COMPRESSION_DEFAULT_BYTE;
compressionHandler = new GlueSchemaRegistryDefaultCompression();

Expand Down Expand Up @@ -179,7 +175,7 @@ public void testGetSchemaAndDeserializedStream_withoutCompression_succeeds()

/** Test whether getSchemaAndDeserializedStream method when compression is enabled works. */
@Test
public void testGetSchemaAndDeserializedStream_withCompression_succeeds() throws IOException {
void testGetSchemaAndDeserializedStream_withCompression_succeeds() throws IOException {
COMPRESSION compressionType = COMPRESSION.ZLIB;
compressionByte = AWSSchemaRegistryConstants.COMPRESSION_BYTE;
compressionHandler = new GlueSchemaRegistryDefaultCompression();
Expand Down Expand Up @@ -211,8 +207,7 @@ public void testGetSchemaAndDeserializedStream_withCompression_succeeds() throws

/** Test whether getSchemaAndDeserializedStream method throws exception with invalid schema. */
@Test
public void testGetSchemaAndDeserializedStream_withWrongSchema_throwsException()
throws IOException {
void testGetSchemaAndDeserializedStream_withWrongSchema_throwsException() {
String schemaDefinition =
"{"
+ "\"type\":\"record\","
Expand All @@ -235,11 +230,14 @@ public void testGetSchemaAndDeserializedStream_withWrongSchema_throwsException()
new GlueSchemaRegistryInputStreamDeserializer(
glueSchemaRegistryDeserializationFacade);

thrown.expect(AWSSchemaRegistryException.class);
thrown.expectMessage(
"Error occurred while parsing schema, see inner exception for details.");
awsSchemaRegistryInputStreamDeserializer.getSchemaAndDeserializedStream(
mutableByteArrayInputStream);
assertThatThrownBy(
() ->
awsSchemaRegistryInputStreamDeserializer
.getSchemaAndDeserializedStream(
mutableByteArrayInputStream))
.isInstanceOf(AWSSchemaRegistryException.class)
.hasMessage(
"Error occurred while parsing schema, see inner exception for details.");
}

private ByteArrayOutputStream buildByteArrayOutputStream(byte headerByte, byte compressionByte)
Expand Down
Loading

0 comments on commit 6692ff3

Please sign in to comment.