Skip to content

Commit

Permalink
[FLINK-26736][tests] Migrate flink-avro-confluent-registry to JUnit5
Browse files Browse the repository at this point in the history
  • Loading branch information
RyanSkraba committed Mar 25, 2022
1 parent 7e909ff commit e3b123d
Show file tree
Hide file tree
Showing 7 changed files with 143 additions and 132 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import io.confluent.kafka.schemaregistry.client.rest.RestService;
import io.confluent.kafka.schemaregistry.client.security.basicauth.BasicAuthCredentialProvider;
import io.confluent.kafka.schemaregistry.client.security.bearerauth.BearerAuthCredentialProvider;
import org.junit.Test;
import org.junit.jupiter.api.Test;
import org.powermock.reflect.Whitebox;

import javax.net.ssl.SSLSocketFactory;
Expand All @@ -31,26 +31,24 @@
import java.util.HashMap;
import java.util.Map;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.assertj.core.api.Assertions.assertThat;

/**
* Tests for properties set by {@link RegistryAvroFormatFactory} in {@link
* CachedSchemaCoderProvider}.
*/
public class CachedSchemaCoderProviderTest {
class CachedSchemaCoderProviderTest {

@Test
public void testThatSslIsNotInitializedForNoSslProperties() {
void testThatSslIsNotInitializedForNoSslProperties() {
CachedSchemaCoderProvider provider = initCachedSchemaCoderProvider(new HashMap<>());
SSLSocketFactory sslSocketFactory = getSslSocketFactoryFromProvider(provider);

assertNull(sslSocketFactory);
assertThat(sslSocketFactory).isNull();
}

@Test
public void testThatSslIsInitializedForSslProperties() throws URISyntaxException {
void testThatSslIsInitializedForSslProperties() throws URISyntaxException {
String keystoreFile = getAbsolutePath("/test-keystore.jks");
String keystorePassword = "123456";
Map<String, String> configs = new HashMap<>();
Expand All @@ -62,20 +60,20 @@ public void testThatSslIsInitializedForSslProperties() throws URISyntaxException
CachedSchemaCoderProvider provider = initCachedSchemaCoderProvider(configs);
SSLSocketFactory sslSocketFactory = getSslSocketFactoryFromProvider(provider);

assertNotNull(sslSocketFactory);
assertThat(sslSocketFactory).isNotNull();
}

@Test
public void testThatBasicAuthIsNotInitializedForNoBasicAuthProperties() {
void testThatBasicAuthIsNotInitializedForNoBasicAuthProperties() {
CachedSchemaCoderProvider provider = initCachedSchemaCoderProvider(new HashMap<>());
BasicAuthCredentialProvider basicAuthCredentialProvider =
getBasicAuthFromProvider(provider);

assertNull(basicAuthCredentialProvider);
assertThat(basicAuthCredentialProvider).isNull();
}

@Test
public void testThatBasicAuthIsInitializedForBasicAuthProperties() {
void testThatBasicAuthIsInitializedForBasicAuthProperties() {
String userPassword = "user:pwd";
Map<String, String> configs = new HashMap<>();
configs.put("basic.auth.credentials.source", "USER_INFO");
Expand All @@ -85,21 +83,21 @@ public void testThatBasicAuthIsInitializedForBasicAuthProperties() {
BasicAuthCredentialProvider basicAuthCredentialProvider =
getBasicAuthFromProvider(provider);

assertNotNull(basicAuthCredentialProvider);
assertEquals(basicAuthCredentialProvider.getUserInfo(null), userPassword);
assertThat(basicAuthCredentialProvider).isNotNull();
assertThat(basicAuthCredentialProvider.getUserInfo(null)).isEqualTo(userPassword);
}

@Test
public void testThatBearerAuthIsNotInitializedForNoBearerAuthProperties() {
void testThatBearerAuthIsNotInitializedForNoBearerAuthProperties() {
CachedSchemaCoderProvider provider = initCachedSchemaCoderProvider(new HashMap<>());
BearerAuthCredentialProvider bearerAuthCredentialProvider =
getBearerAuthFromProvider(provider);

assertNull(bearerAuthCredentialProvider);
assertThat(bearerAuthCredentialProvider).isNull();
}

@Test
public void testThatBearerAuthIsInitializedForBearerAuthProperties() {
void testThatBearerAuthIsInitializedForBearerAuthProperties() {
String token = "123456";
Map<String, String> configs = new HashMap<>();
configs.put("bearer.auth.credentials.source", "STATIC_TOKEN");
Expand All @@ -109,8 +107,8 @@ public void testThatBearerAuthIsInitializedForBearerAuthProperties() {
BearerAuthCredentialProvider bearerAuthCredentialProvider =
getBearerAuthFromProvider(provider);

assertNotNull(bearerAuthCredentialProvider);
assertEquals(bearerAuthCredentialProvider.getBearerToken(null), token);
assertThat(bearerAuthCredentialProvider).isNotNull();
assertThat(bearerAuthCredentialProvider.getBearerToken(null)).isEqualTo(token);
}

private String getAbsolutePath(String path) throws URISyntaxException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,21 @@
import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.junit.Test;
import org.junit.jupiter.api.Test;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;

import static org.junit.Assert.assertEquals;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

/** Tests for {@link ConfluentSchemaRegistryCoder}. */
public class ConfluentSchemaRegistryCoderTest {
class ConfluentSchemaRegistryCoderTest {

@Test
public void testSpecificRecordWithConfluentSchemaRegistry() throws Exception {
void testSpecificRecordWithConfluentSchemaRegistry() throws Exception {
MockSchemaRegistryClient client = new MockSchemaRegistryClient();

Schema schema =
Expand All @@ -51,12 +52,12 @@ public void testSpecificRecordWithConfluentSchemaRegistry() throws Exception {
ByteArrayInputStream byteInStream = new ByteArrayInputStream(byteOutStream.toByteArray());
Schema readSchema = registryCoder.readSchema(byteInStream);

assertEquals(schema, readSchema);
assertEquals(0, byteInStream.available());
assertThat(readSchema).isEqualTo(schema);
assertThat(byteInStream).isEmpty();
}

@Test(expected = IOException.class)
public void testMagicByteVerification() throws Exception {
@Test
void testMagicByteVerification() throws Exception {
MockSchemaRegistryClient client = new MockSchemaRegistryClient();
int schemaId = client.register("testTopic", Schema.create(Schema.Type.BOOLEAN));

Expand All @@ -67,9 +68,10 @@ public void testMagicByteVerification() throws Exception {
dataOutputStream.writeInt(schemaId);
dataOutputStream.flush();

ByteArrayInputStream byteInStream = new ByteArrayInputStream(byteOutStream.toByteArray());
coder.readSchema(byteInStream);

// exception is thrown
try (ByteArrayInputStream byteInStream =
new ByteArrayInputStream(byteOutStream.toByteArray())) {
assertThatThrownBy(() -> coder.readSchema(byteInStream))
.isInstanceOf(IOException.class);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,23 +37,20 @@
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.logical.RowType;

import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.jupiter.api.Test;

import java.util.HashMap;
import java.util.Map;
import java.util.function.Consumer;

import static org.apache.flink.core.testutils.FlinkMatchers.containsCause;
import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches;
import static org.apache.flink.table.factories.utils.FactoryMocks.createTableSink;
import static org.apache.flink.table.factories.utils.FactoryMocks.createTableSource;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

/** Tests for the {@link RegistryAvroFormatFactory}. */
public class RegistryAvroFormatFactoryTest {
class RegistryAvroFormatFactoryTest {

private static final ResolvedSchema SCHEMA =
ResolvedSchema.of(
Expand Down Expand Up @@ -81,10 +78,8 @@ public class RegistryAvroFormatFactoryTest {
EXPECTED_OPTIONAL_PROPERTIES.put("bearer.auth.token", "CUSTOM");
}

@Rule public ExpectedException thrown = ExpectedException.none();

@Test
public void testDeserializationSchema() {
void testDeserializationSchema() {
final AvroRowDataDeserializationSchema expectedDeser =
new AvroRowDataDeserializationSchema(
ConfluentRegistryAvroDeserializationSchema.forGeneric(
Expand All @@ -93,19 +88,19 @@ public void testDeserializationSchema() {
InternalTypeInfo.of(ROW_TYPE));

final DynamicTableSource actualSource = createTableSource(SCHEMA, getDefaultOptions());
assertThat(actualSource, instanceOf(TestDynamicTableFactory.DynamicTableSourceMock.class));
assertThat(actualSource).isInstanceOf(TestDynamicTableFactory.DynamicTableSourceMock.class);
TestDynamicTableFactory.DynamicTableSourceMock scanSourceMock =
(TestDynamicTableFactory.DynamicTableSourceMock) actualSource;

DeserializationSchema<RowData> actualDeser =
scanSourceMock.valueFormat.createRuntimeDecoder(
ScanRuntimeProviderContext.INSTANCE, SCHEMA.toPhysicalRowDataType());

assertEquals(expectedDeser, actualDeser);
assertThat(actualDeser).isEqualTo(expectedDeser);
}

@Test
public void testSerializationSchema() {
void testSerializationSchema() {
final AvroRowDataSerializationSchema expectedSer =
new AvroRowDataSerializationSchema(
ROW_TYPE,
Expand All @@ -116,32 +111,31 @@ public void testSerializationSchema() {
RowDataToAvroConverters.createConverter(ROW_TYPE));

final DynamicTableSink actualSink = createTableSink(SCHEMA, getDefaultOptions());
assertThat(actualSink, instanceOf(TestDynamicTableFactory.DynamicTableSinkMock.class));
assertThat(actualSink).isInstanceOf(TestDynamicTableFactory.DynamicTableSinkMock.class);
TestDynamicTableFactory.DynamicTableSinkMock sinkMock =
(TestDynamicTableFactory.DynamicTableSinkMock) actualSink;

SerializationSchema<RowData> actualSer =
sinkMock.valueFormat.createRuntimeEncoder(null, SCHEMA.toPhysicalRowDataType());

assertEquals(expectedSer, actualSer);
assertThat(actualSer).isEqualTo(expectedSer);
}

@Test
public void testMissingSubjectForSink() {
thrown.expect(ValidationException.class);
thrown.expect(
containsCause(
new ValidationException(
"Option avro-confluent.subject is required for serialization")));

void testMissingSubjectForSink() {
final Map<String, String> options =
getModifiedOptions(opts -> opts.remove("avro-confluent.subject"));

createTableSink(SCHEMA, options);
assertThatThrownBy(() -> createTableSink(SCHEMA, options))
.isInstanceOf(ValidationException.class)
.satisfies(
anyCauseMatches(
ValidationException.class,
"Option avro-confluent.subject is required for serialization"));
}

@Test
public void testDeserializationSchemaWithOptionalProperties() {
void testDeserializationSchemaWithOptionalProperties() {
final AvroRowDataDeserializationSchema expectedDeser =
new AvroRowDataDeserializationSchema(
ConfluentRegistryAvroDeserializationSchema.forGeneric(
Expand All @@ -152,19 +146,19 @@ public void testDeserializationSchemaWithOptionalProperties() {
InternalTypeInfo.of(ROW_TYPE));

final DynamicTableSource actualSource = createTableSource(SCHEMA, getOptionalProperties());
assertThat(actualSource, instanceOf(TestDynamicTableFactory.DynamicTableSourceMock.class));
assertThat(actualSource).isInstanceOf(TestDynamicTableFactory.DynamicTableSourceMock.class);
TestDynamicTableFactory.DynamicTableSourceMock scanSourceMock =
(TestDynamicTableFactory.DynamicTableSourceMock) actualSource;

DeserializationSchema<RowData> actualDeser =
scanSourceMock.valueFormat.createRuntimeDecoder(
ScanRuntimeProviderContext.INSTANCE, SCHEMA.toPhysicalRowDataType());

assertEquals(expectedDeser, actualDeser);
assertThat(actualDeser).isEqualTo(expectedDeser);
}

@Test
public void testSerializationSchemaWithOptionalProperties() {
void testSerializationSchemaWithOptionalProperties() {
final AvroRowDataSerializationSchema expectedSer =
new AvroRowDataSerializationSchema(
ROW_TYPE,
Expand All @@ -176,14 +170,14 @@ public void testSerializationSchemaWithOptionalProperties() {
RowDataToAvroConverters.createConverter(ROW_TYPE));

final DynamicTableSink actualSink = createTableSink(SCHEMA, getOptionalProperties());
assertThat(actualSink, instanceOf(TestDynamicTableFactory.DynamicTableSinkMock.class));
assertThat(actualSink).isInstanceOf(TestDynamicTableFactory.DynamicTableSinkMock.class);
TestDynamicTableFactory.DynamicTableSinkMock sinkMock =
(TestDynamicTableFactory.DynamicTableSinkMock) actualSink;

SerializationSchema<RowData> actualSer =
sinkMock.valueFormat.createRuntimeEncoder(null, SCHEMA.toPhysicalRowDataType());

assertEquals(expectedSer, actualSer);
assertThat(actualSer).isEqualTo(expectedSer);
}

// ------------------------------------------------------------------------
Expand Down
Loading

0 comments on commit e3b123d

Please sign in to comment.