Skip to content

Commit

Permalink
[FLINK-23450][avro-confluent-registry] Set properties map for Debeziu…
Browse files Browse the repository at this point in the history
…mAvroFormat

This closes apache#16565.
  • Loading branch information
twalthr committed Aug 2, 2021
1 parent 4b517a9 commit 78db0e7
Show file tree
Hide file tree
Showing 6 changed files with 71 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -621,7 +621,7 @@ private SerializationSchema<RowData> createConfluentAvroSerSchema(

private SerializationSchema<RowData> createDebeziumAvroSerSchema(
RowType rowType, String subject) {
return new DebeziumAvroSerializationSchema(rowType, TEST_REGISTRY_URL, subject);
return new DebeziumAvroSerializationSchema(rowType, TEST_REGISTRY_URL, subject, null);
}

// --------------------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;

import javax.annotation.Nullable;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
Expand Down Expand Up @@ -85,14 +87,10 @@ public DeserializationSchema<RowData> createRuntimeDecoder(
final TypeInformation<RowData> rowDataTypeInfo =
context.createTypeInformation(producedDataType);
return new AvroRowDataDeserializationSchema(
optionalPropertiesMap.isEmpty()
? ConfluentRegistryAvroDeserializationSchema.forGeneric(
AvroSchemaConverter.convertToSchema(rowType),
schemaRegistryURL)
: ConfluentRegistryAvroDeserializationSchema.forGeneric(
AvroSchemaConverter.convertToSchema(rowType),
schemaRegistryURL,
optionalPropertiesMap),
ConfluentRegistryAvroDeserializationSchema.forGeneric(
AvroSchemaConverter.convertToSchema(rowType),
schemaRegistryURL,
optionalPropertiesMap),
AvroToRowDataConverters.createRowConverter(rowType),
rowDataTypeInfo);
}
Expand Down Expand Up @@ -127,16 +125,11 @@ public SerializationSchema<RowData> createRuntimeEncoder(
final RowType rowType = (RowType) consumedDataType.getLogicalType();
return new AvroRowDataSerializationSchema(
rowType,
optionalPropertiesMap.isEmpty()
? ConfluentRegistryAvroSerializationSchema.forGeneric(
subject.get(),
AvroSchemaConverter.convertToSchema(rowType),
schemaRegistryURL)
: ConfluentRegistryAvroSerializationSchema.forGeneric(
subject.get(),
AvroSchemaConverter.convertToSchema(rowType),
schemaRegistryURL,
optionalPropertiesMap),
ConfluentRegistryAvroSerializationSchema.forGeneric(
subject.get(),
AvroSchemaConverter.convertToSchema(rowType),
schemaRegistryURL,
optionalPropertiesMap),
RowDataToAvroConverters.createConverter(rowType));
}

Expand Down Expand Up @@ -175,7 +168,8 @@ public Set<ConfigOption<?>> optionalOptions() {
return options;
}

private Map<String, String> buildOptionalPropertiesMap(ReadableConfig formatOptions) {
public static @Nullable Map<String, String> buildOptionalPropertiesMap(
ReadableConfig formatOptions) {
final Map<String, String> properties = new HashMap<>();

formatOptions.getOptional(PROPERTIES).ifPresent(properties::putAll);
Expand Down Expand Up @@ -205,6 +199,9 @@ private Map<String, String> buildOptionalPropertiesMap(ReadableConfig formatOpti
.getOptional(BEARER_AUTH_TOKEN)
.ifPresent(v -> properties.put("bearer.auth.token", v));

if (properties.isEmpty()) {
return null;
}
return properties;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,10 @@
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Collector;

import javax.annotation.Nullable;

import java.io.IOException;
import java.util.Map;
import java.util.Objects;

import static java.lang.String.format;
Expand Down Expand Up @@ -74,15 +77,19 @@ public final class DebeziumAvroDeserializationSchema implements DeserializationS
private final TypeInformation<RowData> producedTypeInfo;

public DebeziumAvroDeserializationSchema(
RowType rowType, TypeInformation<RowData> producedTypeInfo, String schemaRegistryUrl) {
RowType rowType,
TypeInformation<RowData> producedTypeInfo,
String schemaRegistryUrl,
@Nullable Map<String, ?> registryConfigs) {
this.producedTypeInfo = producedTypeInfo;
RowType debeziumAvroRowType = createDebeziumAvroRowType(fromLogicalToDataType(rowType));

this.avroDeserializer =
new AvroRowDataDeserializationSchema(
ConfluentRegistryAvroDeserializationSchema.forGeneric(
AvroSchemaConverter.convertToSchema(debeziumAvroRowType),
schemaRegistryUrl),
schemaRegistryUrl,
registryConfigs),
AvroToRowDataConverters.createRowConverter(debeziumAvroRowType),
producedTypeInfo);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,22 @@
import org.apache.flink.types.RowKind;

import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;

import static org.apache.flink.formats.avro.registry.confluent.AvroConfluentFormatOptions.BASIC_AUTH_CREDENTIALS_SOURCE;
import static org.apache.flink.formats.avro.registry.confluent.AvroConfluentFormatOptions.BASIC_AUTH_USER_INFO;
import static org.apache.flink.formats.avro.registry.confluent.AvroConfluentFormatOptions.BEARER_AUTH_CREDENTIALS_SOURCE;
import static org.apache.flink.formats.avro.registry.confluent.AvroConfluentFormatOptions.BEARER_AUTH_TOKEN;
import static org.apache.flink.formats.avro.registry.confluent.AvroConfluentFormatOptions.PROPERTIES;
import static org.apache.flink.formats.avro.registry.confluent.AvroConfluentFormatOptions.SSL_KEYSTORE_LOCATION;
import static org.apache.flink.formats.avro.registry.confluent.AvroConfluentFormatOptions.SSL_KEYSTORE_PASSWORD;
import static org.apache.flink.formats.avro.registry.confluent.AvroConfluentFormatOptions.SSL_TRUSTSTORE_LOCATION;
import static org.apache.flink.formats.avro.registry.confluent.AvroConfluentFormatOptions.SSL_TRUSTSTORE_PASSWORD;
import static org.apache.flink.formats.avro.registry.confluent.AvroConfluentFormatOptions.SUBJECT;
import static org.apache.flink.formats.avro.registry.confluent.AvroConfluentFormatOptions.URL;
import static org.apache.flink.formats.avro.registry.confluent.RegistryAvroFormatFactory.buildOptionalPropertiesMap;

/**
* Format factory for providing configured instances of Debezium Avro to RowData {@link
Expand All @@ -60,6 +71,7 @@ public DecodingFormat<DeserializationSchema<RowData>> createDecodingFormat(

FactoryUtil.validateFactoryOptions(this, formatOptions);
String schemaRegistryURL = formatOptions.get(URL);
Map<String, ?> optionalPropertiesMap = buildOptionalPropertiesMap(formatOptions);

return new DecodingFormat<DeserializationSchema<RowData>>() {
@Override
Expand All @@ -69,7 +81,7 @@ public DeserializationSchema<RowData> createRuntimeDecoder(
final TypeInformation<RowData> producedTypeInfo =
context.createTypeInformation(producedDataType);
return new DebeziumAvroDeserializationSchema(
rowType, producedTypeInfo, schemaRegistryURL);
rowType, producedTypeInfo, schemaRegistryURL, optionalPropertiesMap);
}

@Override
Expand All @@ -91,6 +103,8 @@ public EncodingFormat<SerializationSchema<RowData>> createEncodingFormat(
FactoryUtil.validateFactoryOptions(this, formatOptions);
String schemaRegistryURL = formatOptions.get(URL);
Optional<String> subject = formatOptions.getOptional(SUBJECT);
Map<String, ?> optionalPropertiesMap = buildOptionalPropertiesMap(formatOptions);

if (!subject.isPresent()) {
throw new ValidationException(
String.format(
Expand All @@ -114,7 +128,7 @@ public SerializationSchema<RowData> createRuntimeEncoder(
DynamicTableSink.Context context, DataType consumedDataType) {
final RowType rowType = (RowType) consumedDataType.getLogicalType();
return new DebeziumAvroSerializationSchema(
rowType, schemaRegistryURL, subject.get());
rowType, schemaRegistryURL, subject.get(), optionalPropertiesMap);
}
};
}
Expand All @@ -135,6 +149,15 @@ public Set<ConfigOption<?>> requiredOptions() {
public Set<ConfigOption<?>> optionalOptions() {
Set<ConfigOption<?>> options = new HashSet<>();
options.add(SUBJECT);
options.add(PROPERTIES);
options.add(SSL_KEYSTORE_LOCATION);
options.add(SSL_KEYSTORE_PASSWORD);
options.add(SSL_TRUSTSTORE_LOCATION);
options.add(SSL_TRUSTSTORE_PASSWORD);
options.add(BASIC_AUTH_CREDENTIALS_SOURCE);
options.add(BASIC_AUTH_USER_INFO);
options.add(BEARER_AUTH_CREDENTIALS_SOURCE);
options.add(BEARER_AUTH_TOKEN);
return options;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.formats.avro.registry.confluent.debezium;

import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.formats.avro.AvroRowDataSerializationSchema;
Expand All @@ -31,6 +32,9 @@
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;

import javax.annotation.Nullable;

import java.util.Map;
import java.util.Objects;

import static java.lang.String.format;
Expand All @@ -40,6 +44,7 @@
* Serialization schema from Flink Table/SQL internal data structure {@link RowData} to Debezium
* Avro.
*/
@Internal
public class DebeziumAvroSerializationSchema implements SerializationSchema<RowData> {
private static final long serialVersionUID = 1L;

Expand All @@ -54,7 +59,10 @@ public class DebeziumAvroSerializationSchema implements SerializationSchema<RowD
private transient GenericRowData outputReuse;

public DebeziumAvroSerializationSchema(
RowType rowType, String schemaRegistryUrl, String schemaRegistrySubject) {
RowType rowType,
String schemaRegistryUrl,
String schemaRegistrySubject,
@Nullable Map<String, ?> registryConfigs) {
RowType debeziumAvroRowType = createDebeziumAvroRowType(fromLogicalToDataType(rowType));

this.avroSerializer =
Expand All @@ -63,7 +71,8 @@ public DebeziumAvroSerializationSchema(
ConfluentRegistryAvroSerializationSchema.forGeneric(
schemaRegistrySubject,
AvroSchemaConverter.convertToSchema(debeziumAvroRowType),
schemaRegistryUrl),
schemaRegistryUrl,
registryConfigs),
RowDataToAvroConverters.createConverter(debeziumAvroRowType));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,19 @@ public class DebeziumAvroFormatFactoryTest extends TestLogger {
public void testSeDeSchema() {
final Map<String, String> options = getAllOptions();

final Map<String, String> registryConfigs = new HashMap<>();
registryConfigs.put("basic.auth.user.info", "something1");
registryConfigs.put("basic.auth.credentials.source", "something2");

DebeziumAvroDeserializationSchema expectedDeser =
new DebeziumAvroDeserializationSchema(
ROW_TYPE, InternalTypeInfo.of(ROW_TYPE), REGISTRY_URL);
ROW_TYPE, InternalTypeInfo.of(ROW_TYPE), REGISTRY_URL, registryConfigs);
DeserializationSchema<RowData> actualDeser = createDeserializationSchema(options);
assertEquals(expectedDeser, actualDeser);

DebeziumAvroSerializationSchema expectedSer =
new DebeziumAvroSerializationSchema(ROW_TYPE, REGISTRY_URL, SUBJECT);
new DebeziumAvroSerializationSchema(
ROW_TYPE, REGISTRY_URL, SUBJECT, registryConfigs);
SerializationSchema<RowData> actualSer = createSerializationSchema(options);
Assert.assertEquals(expectedSer, actualSer);
}
Expand All @@ -88,6 +93,8 @@ private Map<String, String> getAllOptions() {
options.put("format", DebeziumAvroFormatFactory.IDENTIFIER);
options.put("debezium-avro-confluent.url", REGISTRY_URL);
options.put("debezium-avro-confluent.subject", SUBJECT);
options.put("debezium-avro-confluent.basic-auth.user-info", "something1");
options.put("debezium-avro-confluent.basic-auth.credentials-source", "something2");
return options;
}

Expand Down

0 comments on commit 78db0e7

Please sign in to comment.