diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ConfluentSchemaRegistryDeserializerProvider.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ConfluentSchemaRegistryDeserializerProvider.java index fc10de131dea8..d5c2d72449ba6 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ConfluentSchemaRegistryDeserializerProvider.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ConfluentSchemaRegistryDeserializerProvider.java @@ -27,6 +27,7 @@ import io.confluent.kafka.serializers.KafkaAvroDeserializer; import java.io.IOException; import java.util.Map; +import javax.annotation.Nullable; import org.apache.avro.Schema; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.annotations.Experimental.Kind; @@ -48,12 +49,14 @@ public class ConfluentSchemaRegistryDeserializerProvider implements Deseriali private final SerializableFunction schemaRegistryClientProviderFn; private final String schemaRegistryUrl; private final String subject; + private final @Nullable Integer version; @VisibleForTesting ConfluentSchemaRegistryDeserializerProvider( SerializableFunction schemaRegistryClientProviderFn, String schemaRegistryUrl, - String subject) { + String subject, + @Nullable Integer version) { checkArgument( schemaRegistryClientProviderFn != null, "You should provide a schemaRegistryClientProviderFn."); @@ -62,15 +65,22 @@ public class ConfluentSchemaRegistryDeserializerProvider implements Deseriali this.schemaRegistryClientProviderFn = schemaRegistryClientProviderFn; this.schemaRegistryUrl = schemaRegistryUrl; this.subject = subject; + this.version = version; } public static ConfluentSchemaRegistryDeserializerProvider of( String schemaRegistryUrl, String subject) { + return of(schemaRegistryUrl, subject, null); + } + + public static ConfluentSchemaRegistryDeserializerProvider of( + String schemaRegistryUrl, String subject, @Nullable Integer version) { return new ConfluentSchemaRegistryDeserializerProvider( (SerializableFunction) input -> new CachedSchemaRegistryClient(schemaRegistryUrl, Integer.MAX_VALUE), schemaRegistryUrl, - subject); + subject, + version); } @Override @@ -94,7 +104,9 @@ public Coder getCoder(CoderRegistry coderRegistry) { private SchemaMetadata getSchemaMetadata() { try { - return getSchemaRegistryClient().getLatestSchemaMetadata(subject); + return (version == null) + ? getSchemaRegistryClient().getLatestSchemaMetadata(subject) + : getSchemaRegistryClient().getSchemaMetadata(subject, version); } catch (IOException | RestClientException e) { throw new RuntimeException("Unable to get latest schema metadata for subject: " + subject, e); } diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ConfluentSchemaRegistryDeserializerProviderTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ConfluentSchemaRegistryDeserializerProviderTest.java new file mode 100644 index 0000000000000..ff2d281fbfd92 --- /dev/null +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ConfluentSchemaRegistryDeserializerProviderTest.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.kafka; + +import static org.junit.Assert.assertEquals; + +import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; +import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; +import io.confluent.kafka.schemaregistry.testutil.MockSchemaRegistry; +import java.io.IOException; +import org.apache.beam.sdk.coders.AvroCoder; +import org.apache.beam.sdk.coders.CoderRegistry; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class ConfluentSchemaRegistryDeserializerProviderTest { + @Test + public void testGetCoder() { + String schemaRegistryUrl = "mock://my-scope-name"; + String subject = "mytopic"; + SchemaRegistryClient mockRegistryClient = mockSchemaRegistryClient(schemaRegistryUrl, subject); + CoderRegistry coderRegistry = CoderRegistry.createDefault(); + + AvroCoder coderV0 = + (AvroCoder) + mockDeserializerProvider(schemaRegistryUrl, subject, null).getCoder(coderRegistry); + assertEquals(AVRO_SCHEMA, coderV0.getSchema()); + + try { + Integer version = mockRegistryClient.register(subject, AVRO_SCHEMA_V1); + AvroCoder coderV1 = + (AvroCoder) + mockDeserializerProvider(schemaRegistryUrl, subject, version).getCoder(coderRegistry); + assertEquals(AVRO_SCHEMA_V1, coderV1.getSchema()); + } catch (IOException | RestClientException e) { + throw new RuntimeException("Unable to register schema for subject: " + subject, e); + } + } + + static DeserializerProvider mockDeserializerProvider( + String schemaRegistryUrl, String subject, Integer version) { + return new ConfluentSchemaRegistryDeserializerProvider<>( + (SerializableFunction) + input -> mockSchemaRegistryClient(schemaRegistryUrl, subject), + schemaRegistryUrl, + subject, + version); + } + + private static SchemaRegistryClient mockSchemaRegistryClient( + String schemaRegistryUrl, String subject) { + SchemaRegistryClient mockRegistryClient = + MockSchemaRegistry.getClientForScope(schemaRegistryUrl); + try { + mockRegistryClient.register(subject, AVRO_SCHEMA); + } catch (IOException | RestClientException e) { + throw new RuntimeException("Unable to register schema for subject: " + subject, e); + } + return mockRegistryClient; + } + + private static final String AVRO_SCHEMA_STRING = + "{\"namespace\": \"example.avro\",\n" + + " \"type\": \"record\",\n" + + " \"name\": \"AvroGeneratedUser\",\n" + + " \"fields\": [\n" + + " {\"name\": \"name\", \"type\": \"string\"},\n" + + " {\"name\": \"favorite_number\", \"type\": [\"int\", \"null\"]},\n" + + " {\"name\": \"favorite_color\", \"type\": [\"string\", \"null\"]}\n" + + " ]\n" + + "}"; + + private static final org.apache.avro.Schema AVRO_SCHEMA = + new org.apache.avro.Schema.Parser().parse(AVRO_SCHEMA_STRING); + + private static final String AVRO_SCHEMA_V1_STRING = + "{\"namespace\": \"example.avro\",\n" + + " \"type\": \"record\",\n" + + " \"name\": \"AvroGeneratedUser\",\n" + + " \"fields\": [\n" + + " {\"name\": \"name\", \"type\": \"string\"},\n" + + " {\"name\": \"age\", \"type\": \"int\"},\n" + + " {\"name\": \"favorite_number\", \"type\": [\"int\", \"null\"]},\n" + + " {\"name\": \"favorite_color\", \"type\": [\"string\", \"null\"]}\n" + + " ]\n" + + "}"; + private static final org.apache.avro.Schema AVRO_SCHEMA_V1 = + new org.apache.avro.Schema.Parser().parse(AVRO_SCHEMA_V1_STRING); +} diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java index d8f716c55c632..8627b8b4f843b 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java @@ -17,6 +17,7 @@ */ package org.apache.beam.sdk.io.kafka; +import static org.apache.beam.sdk.io.kafka.ConfluentSchemaRegistryDeserializerProviderTest.mockDeserializerProvider; import static org.apache.beam.sdk.metrics.MetricResultsMatchers.attemptedMetricsResult; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; import static org.hamcrest.Matchers.containsString; @@ -32,7 +33,6 @@ import static org.junit.internal.matchers.ThrowableMessageMatcher.hasMessage; import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; -import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; import io.confluent.kafka.schemaregistry.testutil.MockSchemaRegistry; import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig; import io.confluent.kafka.serializers.KafkaAvroSerializer; @@ -440,8 +440,10 @@ public void testReadAvroGenericRecordsWithConfluentSchemaRegistry() { KafkaIO.read() .withBootstrapServers("localhost:9092") .withTopic(topic) - .withKeyDeserializer(mockDeserializerProvider(schemaRegistryUrl, keySchemaSubject)) - .withValueDeserializer(mockDeserializerProvider(schemaRegistryUrl, valueSchemaSubject)) + .withKeyDeserializer( + mockDeserializerProvider(schemaRegistryUrl, keySchemaSubject, null)) + .withValueDeserializer( + mockDeserializerProvider(schemaRegistryUrl, valueSchemaSubject, null)) .withConsumerFactoryFn( new ConsumerFactoryFn( ImmutableList.of(topic), @@ -475,7 +477,8 @@ public void testReadAvroSpecificRecordsWithConfluentSchemaRegistry() { .withBootstrapServers("localhost:9092") .withTopic(topic) .withKeyDeserializer(IntegerDeserializer.class) - .withValueDeserializer(mockDeserializerProvider(schemaRegistryUrl, valueSchemaSubject)) + .withValueDeserializer( + mockDeserializerProvider(schemaRegistryUrl, valueSchemaSubject, null)) .withConsumerFactoryFn( new ConsumerFactoryFn( ImmutableList.of(topic), @@ -492,41 +495,6 @@ public void testReadAvroSpecificRecordsWithConfluentSchemaRegistry() { p.run(); } - private static DeserializerProvider mockDeserializerProvider( - String schemaRegistryUrl, String subject) { - return new ConfluentSchemaRegistryDeserializerProvider<>( - (SerializableFunction) - input -> mockSchemaRegistryClient(schemaRegistryUrl, subject), - schemaRegistryUrl, - subject); - } - - private static SchemaRegistryClient mockSchemaRegistryClient( - String schemaRegistryUrl, String subject) { - SchemaRegistryClient mockRegistryClient = - MockSchemaRegistry.getClientForScope(schemaRegistryUrl); - try { - mockRegistryClient.register(subject, AVRO_SCHEMA); - } catch (IOException | RestClientException e) { - throw new RuntimeException("Unable to register schema for subject: " + subject, e); - } - return mockRegistryClient; - } - - private static final String AVRO_SCHEMA_STRING = - "{\"namespace\": \"example.avro\",\n" - + " \"type\": \"record\",\n" - + " \"name\": \"AvroGeneratedUser\",\n" - + " \"fields\": [\n" - + " {\"name\": \"name\", \"type\": \"string\"},\n" - + " {\"name\": \"favorite_number\", \"type\": [\"int\", \"null\"]},\n" - + " {\"name\": \"favorite_color\", \"type\": [\"string\", \"null\"]}\n" - + " ]\n" - + "}"; - - private static final org.apache.avro.Schema AVRO_SCHEMA = - new org.apache.avro.Schema.Parser().parse(AVRO_SCHEMA_STRING); - @Test public void testUnboundedSource() { int numElements = 1000;