Skip to content

Commit

Permalink
[FLINK-18075] Wrap the SerializationSchema in KafkaSerializationSchem…
Browse files Browse the repository at this point in the history
…a in Kafka connector
  • Loading branch information
dawidwys committed Jun 8, 2020
1 parent bebe503 commit 74fd291
Show file tree
Hide file tree
Showing 14 changed files with 479 additions and 58 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.connectors.kafka;

import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.streaming.api.operators.StreamSink;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;

import org.junit.Test;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.junit.Assert.assertThat;

/**
* Tests for {@link FlinkKafkaProducer010}.
*/
public class FlinkKafkaProducerTest {
@Test
public void testOpenProducer() throws Exception {

OpenTestingSerializationSchema schema = new OpenTestingSerializationSchema();
FlinkKafkaProducer010<Integer> kafkaProducer = new FlinkKafkaProducer010<>(
"localhost:9092",
"test-topic",
schema
);

OneInputStreamOperatorTestHarness<Integer, Object> testHarness = new OneInputStreamOperatorTestHarness<>(
new StreamSink<>(kafkaProducer),
1,
1,
0,
IntSerializer.INSTANCE,
new OperatorID(1, 1));

testHarness.open();

assertThat(schema.openCalled, equalTo(true));
}

private static class OpenTestingSerializationSchema implements SerializationSchema<Integer> {
private boolean openCalled;

@Override
public void open(InitializationContext context) throws Exception {
openCalled = true;
}

@Override
public byte[] serialize(Integer element) {
return new byte[0];
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.flink.streaming.connectors.kafka;

import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.networking.NetworkFailuresProxy;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
Expand Down Expand Up @@ -154,7 +155,11 @@ public <K, V> Collection<ConsumerRecord<K, V>> getAllRecordsFromTopic(Properties
}

@Override
public <T> StreamSink<T> getProducerSink(String topic, KeyedSerializationSchema<T> serSchema, Properties props, FlinkKafkaPartitioner<T> partitioner) {
public <T> StreamSink<T> getProducerSink(
String topic,
SerializationSchema<T> serSchema,
Properties props,
FlinkKafkaPartitioner<T> partitioner) {
FlinkKafkaProducer010<T> prod = new FlinkKafkaProducer010<>(topic, serSchema, props, partitioner);
prod.setFlushOnCheckpoint(true);
return new StreamSink<>(prod);
Expand All @@ -167,6 +172,18 @@ public <T> DataStreamSink<T> produceIntoKafka(DataStream<T> stream, String topic
return stream.addSink(prod);
}

@Override
public <T> DataStreamSink<T> produceIntoKafka(
DataStream<T> stream,
String topic,
SerializationSchema<T> serSchema,
Properties props,
FlinkKafkaPartitioner<T> partitioner) {
FlinkKafkaProducer010<T> prod = new FlinkKafkaProducer010<>(topic, serSchema, props, partitioner);
prod.setFlushOnCheckpoint(true);
return stream.addSink(prod);
}

@Override
public KafkaOffsetHandler createOffsetHandler() {
return new KafkaOffsetHandlerImpl();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.connectors.kafka;

import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.streaming.api.operators.StreamSink;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;

import org.junit.Test;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.junit.Assert.assertThat;

/**
* Tests for {@link FlinkKafkaProducer011}.
*/
public class FlinkKafkaProducerTest {
@Test
public void testOpenProducer() throws Exception {

OpenTestingSerializationSchema schema = new OpenTestingSerializationSchema();
FlinkKafkaProducer011<Integer> kafkaProducer = new FlinkKafkaProducer011<>(
"localhost:9092",
"test-topic",
schema
);

OneInputStreamOperatorTestHarness<Integer, Object> testHarness = new OneInputStreamOperatorTestHarness<>(
new StreamSink<>(kafkaProducer),
1,
1,
0,
IntSerializer.INSTANCE,
new OperatorID(1, 1));

testHarness.open();

assertThat(schema.openCalled, equalTo(true));
}

private static class OpenTestingSerializationSchema implements SerializationSchema<Integer> {
private boolean openCalled;

@Override
public void open(InitializationContext context) throws Exception {
openCalled = true;
}

@Override
public byte[] serialize(Integer element) {
return new byte[0];
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@

package org.apache.flink.streaming.connectors.kafka;

import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.networking.NetworkFailuresProxy;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.operators.StreamSink;
import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.streaming.connectors.kafka.testutils.ZooKeeperStringSerializer;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
Expand Down Expand Up @@ -267,18 +269,27 @@ public <K, V> Collection<ConsumerRecord<K, V>> getAllRecordsFromTopic(Properties
}

@Override
public <T> StreamSink<T> getProducerSink(String topic, KeyedSerializationSchema<T> serSchema, Properties props, FlinkKafkaPartitioner<T> partitioner) {
public <T> StreamSink<T> getProducerSink(
String topic,
SerializationSchema<T> serSchema,
Properties props,
FlinkKafkaPartitioner<T> partitioner) {
return new StreamSink<>(new FlinkKafkaProducer011<>(
topic,
serSchema,
new KeyedSerializationSchemaWrapper<>(serSchema),
props,
Optional.ofNullable(partitioner),
producerSemantic,
FlinkKafkaProducer011.DEFAULT_KAFKA_PRODUCERS_POOL_SIZE));
}

@Override
public <T> DataStreamSink<T> produceIntoKafka(DataStream<T> stream, String topic, KeyedSerializationSchema<T> serSchema, Properties props, FlinkKafkaPartitioner<T> partitioner) {
public <T> DataStreamSink<T> produceIntoKafka(
DataStream<T> stream,
String topic,
KeyedSerializationSchema<T> serSchema,
Properties props,
FlinkKafkaPartitioner<T> partitioner) {
return stream.addSink(new FlinkKafkaProducer011<>(
topic,
serSchema,
Expand All @@ -288,6 +299,22 @@ public <T> DataStreamSink<T> produceIntoKafka(DataStream<T> stream, String topic
FlinkKafkaProducer011.DEFAULT_KAFKA_PRODUCERS_POOL_SIZE));
}

@Override
public <T> DataStreamSink<T> produceIntoKafka(
DataStream<T> stream,
String topic,
SerializationSchema<T> serSchema,
Properties props,
FlinkKafkaPartitioner<T> partitioner) {
return stream.addSink(new FlinkKafkaProducer011<>(
topic,
new KeyedSerializationSchemaWrapper<>(serSchema),
props,
Optional.ofNullable(partitioner),
producerSemantic,
FlinkKafkaProducer011.DEFAULT_KAFKA_PRODUCERS_POOL_SIZE));
}

@Override
public KafkaOffsetHandler createOffsetHandler() {
return new KafkaOffsetHandlerImpl();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.connectors.kafka.internals;

import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.streaming.connectors.kafka.KafkaContextAware;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;

import org.apache.kafka.clients.producer.ProducerRecord;

import javax.annotation.Nullable;

/**
* An adapter from old style interfaces such as {@link org.apache.flink.api.common.serialization.SerializationSchema},
* {@link org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner} to the
* {@link KafkaSerializationSchema}.
*/
public class KafkaSerializationSchemaWrapper<T> implements KafkaSerializationSchema<T>, KafkaContextAware<T> {

private final FlinkKafkaPartitioner<T> partitioner;
private final SerializationSchema<T> serializationSchema;
private final String topic;
private boolean writeTimestamp;

private int[] partitions;

public KafkaSerializationSchemaWrapper(
String topic,
FlinkKafkaPartitioner<T> partitioner,
boolean writeTimestamp,
SerializationSchema<T> serializationSchema) {
this.partitioner = partitioner;
this.serializationSchema = serializationSchema;
this.topic = topic;
this.writeTimestamp = writeTimestamp;
}

@Override
public void open(SerializationSchema.InitializationContext context) throws Exception {
serializationSchema.open(context);
}

@Override
public ProducerRecord<byte[], byte[]> serialize(
T element,
@Nullable Long timestamp) {
byte[] serialized = serializationSchema.serialize(element);
final Integer partition;
if (partitioner != null) {
partition = partitioner.partition(element, null, serialized, topic, partitions);
} else {
partition = null;
}

final Long timestampToWrite;
if (writeTimestamp) {
timestampToWrite = timestamp;
} else {
timestampToWrite = null;
}

return new ProducerRecord<>(topic, partition, timestampToWrite, null, serialized);
}

@Override
public String getTargetTopic(T element) {
return topic;
}

@Override
public void setPartitions(int[] partitions) {
this.partitions = partitions;
}

public void setWriteTimestamp(boolean writeTimestamp) {
this.writeTimestamp = writeTimestamp;
}
}
Loading

0 comments on commit 74fd291

Please sign in to comment.