Skip to content

Commit

Permalink
[BEAM-9329] Support request of schemas by version on KafkaIO + CSR
Browse files Browse the repository at this point in the history
  • Loading branch information
iemejia committed Feb 25, 2020
1 parent f7e7bc3 commit e83c3a5
Show file tree
Hide file tree
Showing 3 changed files with 129 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import java.io.IOException;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.avro.Schema;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Experimental.Kind;
Expand All @@ -48,12 +49,14 @@ public class ConfluentSchemaRegistryDeserializerProvider<T> implements Deseriali
private final SerializableFunction<Void, SchemaRegistryClient> schemaRegistryClientProviderFn;
private final String schemaRegistryUrl;
private final String subject;
private final @Nullable Integer version;

@VisibleForTesting
ConfluentSchemaRegistryDeserializerProvider(
SerializableFunction<Void, SchemaRegistryClient> schemaRegistryClientProviderFn,
String schemaRegistryUrl,
String subject) {
String subject,
@Nullable Integer version) {
checkArgument(
schemaRegistryClientProviderFn != null,
"You should provide a schemaRegistryClientProviderFn.");
Expand All @@ -62,15 +65,22 @@ public class ConfluentSchemaRegistryDeserializerProvider<T> implements Deseriali
this.schemaRegistryClientProviderFn = schemaRegistryClientProviderFn;
this.schemaRegistryUrl = schemaRegistryUrl;
this.subject = subject;
this.version = version;
}

public static <T> ConfluentSchemaRegistryDeserializerProvider<T> of(
String schemaRegistryUrl, String subject) {
return of(schemaRegistryUrl, subject, null);
}

public static <T> ConfluentSchemaRegistryDeserializerProvider<T> of(
String schemaRegistryUrl, String subject, @Nullable Integer version) {
return new ConfluentSchemaRegistryDeserializerProvider(
(SerializableFunction<Void, SchemaRegistryClient>)
input -> new CachedSchemaRegistryClient(schemaRegistryUrl, Integer.MAX_VALUE),
schemaRegistryUrl,
subject);
subject,
version);
}

@Override
Expand All @@ -94,7 +104,9 @@ public Coder<T> getCoder(CoderRegistry coderRegistry) {

private SchemaMetadata getSchemaMetadata() {
try {
return getSchemaRegistryClient().getLatestSchemaMetadata(subject);
return (version == null)
? getSchemaRegistryClient().getLatestSchemaMetadata(subject)
: getSchemaRegistryClient().getSchemaMetadata(subject, version);
} catch (IOException | RestClientException e) {
throw new RuntimeException("Unable to get latest schema metadata for subject: " + subject, e);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.kafka;

import static org.junit.Assert.assertEquals;

import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import io.confluent.kafka.schemaregistry.testutil.MockSchemaRegistry;
import java.io.IOException;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
public class ConfluentSchemaRegistryDeserializerProviderTest {
@Test
public void testGetCoder() {
String schemaRegistryUrl = "mock:https://my-scope-name";
String subject = "mytopic";
SchemaRegistryClient mockRegistryClient = mockSchemaRegistryClient(schemaRegistryUrl, subject);
CoderRegistry coderRegistry = CoderRegistry.createDefault();

AvroCoder coderV0 =
(AvroCoder)
mockDeserializerProvider(schemaRegistryUrl, subject, null).getCoder(coderRegistry);
assertEquals(AVRO_SCHEMA, coderV0.getSchema());

try {
Integer version = mockRegistryClient.register(subject, AVRO_SCHEMA_V1);
AvroCoder coderV1 =
(AvroCoder)
mockDeserializerProvider(schemaRegistryUrl, subject, version).getCoder(coderRegistry);
assertEquals(AVRO_SCHEMA_V1, coderV1.getSchema());
} catch (IOException | RestClientException e) {
throw new RuntimeException("Unable to register schema for subject: " + subject, e);
}
}

static <T> DeserializerProvider<T> mockDeserializerProvider(
String schemaRegistryUrl, String subject, Integer version) {
return new ConfluentSchemaRegistryDeserializerProvider<>(
(SerializableFunction<Void, SchemaRegistryClient>)
input -> mockSchemaRegistryClient(schemaRegistryUrl, subject),
schemaRegistryUrl,
subject,
version);
}

private static SchemaRegistryClient mockSchemaRegistryClient(
String schemaRegistryUrl, String subject) {
SchemaRegistryClient mockRegistryClient =
MockSchemaRegistry.getClientForScope(schemaRegistryUrl);
try {
mockRegistryClient.register(subject, AVRO_SCHEMA);
} catch (IOException | RestClientException e) {
throw new RuntimeException("Unable to register schema for subject: " + subject, e);
}
return mockRegistryClient;
}

private static final String AVRO_SCHEMA_STRING =
"{\"namespace\": \"example.avro\",\n"
+ " \"type\": \"record\",\n"
+ " \"name\": \"AvroGeneratedUser\",\n"
+ " \"fields\": [\n"
+ " {\"name\": \"name\", \"type\": \"string\"},\n"
+ " {\"name\": \"favorite_number\", \"type\": [\"int\", \"null\"]},\n"
+ " {\"name\": \"favorite_color\", \"type\": [\"string\", \"null\"]}\n"
+ " ]\n"
+ "}";

private static final org.apache.avro.Schema AVRO_SCHEMA =
new org.apache.avro.Schema.Parser().parse(AVRO_SCHEMA_STRING);

private static final String AVRO_SCHEMA_V1_STRING =
"{\"namespace\": \"example.avro\",\n"
+ " \"type\": \"record\",\n"
+ " \"name\": \"AvroGeneratedUser\",\n"
+ " \"fields\": [\n"
+ " {\"name\": \"name\", \"type\": \"string\"},\n"
+ " {\"name\": \"age\", \"type\": \"int\"},\n"
+ " {\"name\": \"favorite_number\", \"type\": [\"int\", \"null\"]},\n"
+ " {\"name\": \"favorite_color\", \"type\": [\"string\", \"null\"]}\n"
+ " ]\n"
+ "}";
private static final org.apache.avro.Schema AVRO_SCHEMA_V1 =
new org.apache.avro.Schema.Parser().parse(AVRO_SCHEMA_V1_STRING);
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.beam.sdk.io.kafka;

import static org.apache.beam.sdk.io.kafka.ConfluentSchemaRegistryDeserializerProviderTest.mockDeserializerProvider;
import static org.apache.beam.sdk.metrics.MetricResultsMatchers.attemptedMetricsResult;
import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
import static org.hamcrest.Matchers.containsString;
Expand All @@ -32,7 +33,6 @@
import static org.junit.internal.matchers.ThrowableMessageMatcher.hasMessage;

import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import io.confluent.kafka.schemaregistry.testutil.MockSchemaRegistry;
import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
Expand Down Expand Up @@ -440,8 +440,10 @@ public void testReadAvroGenericRecordsWithConfluentSchemaRegistry() {
KafkaIO.<GenericRecord, GenericRecord>read()
.withBootstrapServers("localhost:9092")
.withTopic(topic)
.withKeyDeserializer(mockDeserializerProvider(schemaRegistryUrl, keySchemaSubject))
.withValueDeserializer(mockDeserializerProvider(schemaRegistryUrl, valueSchemaSubject))
.withKeyDeserializer(
mockDeserializerProvider(schemaRegistryUrl, keySchemaSubject, null))
.withValueDeserializer(
mockDeserializerProvider(schemaRegistryUrl, valueSchemaSubject, null))
.withConsumerFactoryFn(
new ConsumerFactoryFn(
ImmutableList.of(topic),
Expand Down Expand Up @@ -475,7 +477,8 @@ public void testReadAvroSpecificRecordsWithConfluentSchemaRegistry() {
.withBootstrapServers("localhost:9092")
.withTopic(topic)
.withKeyDeserializer(IntegerDeserializer.class)
.withValueDeserializer(mockDeserializerProvider(schemaRegistryUrl, valueSchemaSubject))
.withValueDeserializer(
mockDeserializerProvider(schemaRegistryUrl, valueSchemaSubject, null))
.withConsumerFactoryFn(
new ConsumerFactoryFn(
ImmutableList.of(topic),
Expand All @@ -492,41 +495,6 @@ public void testReadAvroSpecificRecordsWithConfluentSchemaRegistry() {
p.run();
}

private static <T> DeserializerProvider<T> mockDeserializerProvider(
String schemaRegistryUrl, String subject) {
return new ConfluentSchemaRegistryDeserializerProvider<>(
(SerializableFunction<Void, SchemaRegistryClient>)
input -> mockSchemaRegistryClient(schemaRegistryUrl, subject),
schemaRegistryUrl,
subject);
}

private static SchemaRegistryClient mockSchemaRegistryClient(
String schemaRegistryUrl, String subject) {
SchemaRegistryClient mockRegistryClient =
MockSchemaRegistry.getClientForScope(schemaRegistryUrl);
try {
mockRegistryClient.register(subject, AVRO_SCHEMA);
} catch (IOException | RestClientException e) {
throw new RuntimeException("Unable to register schema for subject: " + subject, e);
}
return mockRegistryClient;
}

private static final String AVRO_SCHEMA_STRING =
"{\"namespace\": \"example.avro\",\n"
+ " \"type\": \"record\",\n"
+ " \"name\": \"AvroGeneratedUser\",\n"
+ " \"fields\": [\n"
+ " {\"name\": \"name\", \"type\": \"string\"},\n"
+ " {\"name\": \"favorite_number\", \"type\": [\"int\", \"null\"]},\n"
+ " {\"name\": \"favorite_color\", \"type\": [\"string\", \"null\"]}\n"
+ " ]\n"
+ "}";

private static final org.apache.avro.Schema AVRO_SCHEMA =
new org.apache.avro.Schema.Parser().parse(AVRO_SCHEMA_STRING);

@Test
public void testUnboundedSource() {
int numElements = 1000;
Expand Down

0 comments on commit e83c3a5

Please sign in to comment.