Skip to content

Commit

Permalink
[FLINK-3054] Remove R (return) type variable from SerializationSchema
Browse files Browse the repository at this point in the history
This closes apache#1406
  • Loading branch information
rmetzger committed Nov 27, 2015
1 parent 7691368 commit 09ea3dd
Show file tree
Hide file tree
Showing 23 changed files with 60 additions and 177 deletions.
2 changes: 1 addition & 1 deletion docs/apis/streaming_guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -3463,7 +3463,7 @@ properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("zookeeper.connect", "localhost:2181");
properties.setProperty("group.id", "test");
stream = env
.addSource(new KafkaSource[String]("topic", new SimpleStringSchema(), properties))
.addSource(new FlinkKafkaConsumer082[String]("topic", new SimpleStringSchema(), properties))
.print
{% endhighlight %}
</div>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@ public class FlumeSink<IN> extends RichSinkFunction<IN> {
boolean initDone = false;
String host;
int port;
SerializationSchema<IN, byte[]> schema;
SerializationSchema<IN> schema;

public FlumeSink(String host, int port, SerializationSchema<IN, byte[]> schema) {
public FlumeSink(String host, int port, SerializationSchema<IN> schema) {
this.host = host;
this.port = port;
this.schema = schema;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ public class FlinkKafkaProducer<IN> extends RichSinkFunction<IN> {
* @param serializationSchema
* User defined (keyless) serialization schema.
*/
public FlinkKafkaProducer(String brokerList, String topicId, SerializationSchema<IN, byte[]> serializationSchema) {
public FlinkKafkaProducer(String brokerList, String topicId, SerializationSchema<IN> serializationSchema) {
this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), getPropertiesFromBrokerList(brokerList), null);
}

Expand All @@ -128,7 +128,7 @@ public FlinkKafkaProducer(String brokerList, String topicId, SerializationSchema
* @param producerConfig
* Properties with the producer configuration.
*/
public FlinkKafkaProducer(String topicId, SerializationSchema<IN, byte[]> serializationSchema, Properties producerConfig) {
public FlinkKafkaProducer(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig) {
this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, null);
}

Expand All @@ -140,7 +140,7 @@ public FlinkKafkaProducer(String topicId, SerializationSchema<IN, byte[]> serial
* @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
* @param customPartitioner A serializable partitioner for assining messages to Kafka partitions.
*/
public FlinkKafkaProducer(String topicId, SerializationSchema<IN, byte[]> serializationSchema, Properties producerConfig, KafkaPartitioner customPartitioner) {
public FlinkKafkaProducer(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig, KafkaPartitioner customPartitioner) {
this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, customPartitioner);

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
*/
@Deprecated
public class KafkaSink<IN> extends FlinkKafkaProducer<IN> {
public KafkaSink(String brokerList, String topicId, SerializationSchema<IN, byte[]> serializationSchema) {
public KafkaSink(String brokerList, String topicId, SerializationSchema<IN> serializationSchema) {
super(brokerList, topicId, serializationSchema);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.apache.commons.collections.map.LinkedMap;

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.util.serialization.JavaDefaultStringSchema;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.junit.Ignore;
import org.junit.Test;
Expand Down Expand Up @@ -132,7 +132,7 @@ public void testCreateSourceWithoutCluster() {
props.setProperty("bootstrap.servers", "localhost:11111, localhost:22222");
props.setProperty("group.id", "non-existent-group");

new FlinkKafkaConsumer<>("no op topic", new JavaDefaultStringSchema(), props,
new FlinkKafkaConsumer<>("no op topic", new SimpleStringSchema(), props,
FlinkKafkaConsumer.OffsetStore.FLINK_ZOOKEEPER,
FlinkKafkaConsumer.FetcherType.LEGACY_LOW_LEVEL);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@
import org.apache.flink.streaming.connectors.kafka.testutils.Tuple2Partitioner;
import org.apache.flink.streaming.connectors.kafka.testutils.ValidatingExactlyOnceSink;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.apache.flink.streaming.util.serialization.JavaDefaultStringSchema;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.flink.streaming.util.serialization.TypeInformationKeyValueSerializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
Expand Down Expand Up @@ -129,7 +129,7 @@ protected abstract <T> FlinkKafkaConsumer<T> getConsumer(
public void runCheckpointingTest() throws Exception {
createTestTopic("testCheckpointing", 1, 1);

FlinkKafkaConsumer<String> source = getConsumer("testCheckpointing", new JavaDefaultStringSchema(), standardProps);
FlinkKafkaConsumer<String> source = getConsumer("testCheckpointing", new SimpleStringSchema(), standardProps);
Field pendingCheckpointsField = FlinkKafkaConsumer.class.getDeclaredField("pendingCheckpoints");
pendingCheckpointsField.setAccessible(true);
LinkedMap pendingCheckpoints = (LinkedMap) pendingCheckpointsField.get(source);
Expand Down Expand Up @@ -577,7 +577,7 @@ public void run() {
env.enableCheckpointing(100);
env.getConfig().disableSysoutLogging();

FlinkKafkaConsumer<String> source = getConsumer(topic, new JavaDefaultStringSchema(), standardProps);
FlinkKafkaConsumer<String> source = getConsumer(topic, new SimpleStringSchema(), standardProps);

env.addSource(source).addSink(new DiscardingSink<String>());

Expand Down Expand Up @@ -642,7 +642,7 @@ public void run() {
env.enableCheckpointing(100);
env.getConfig().disableSysoutLogging();

FlinkKafkaConsumer<String> source = getConsumer(topic, new JavaDefaultStringSchema(), standardProps);
FlinkKafkaConsumer<String> source = getConsumer(topic, new SimpleStringSchema(), standardProps);

env.addSource(source).addSink(new DiscardingSink<String>());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.connectors.kafka.testutils.MockRuntimeContext;
import org.apache.flink.streaming.util.serialization.JavaDefaultStringSchema;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.flink.util.TestLogger;

import org.apache.kafka.clients.producer.Callback;
Expand Down Expand Up @@ -78,7 +78,7 @@ public Future<RecordMetadata> answer(InvocationOnMock invocation) throws Throwab
// (1) producer that propagates errors

FlinkKafkaProducer<String> producerPropagating = new FlinkKafkaProducer<String>(
"mock_topic", new JavaDefaultStringSchema(), new Properties(), null);
"mock_topic", new SimpleStringSchema(), new Properties(), null);

producerPropagating.setRuntimeContext(new MockRuntimeContext(17, 3));
producerPropagating.open(new Configuration());
Expand All @@ -97,7 +97,7 @@ public Future<RecordMetadata> answer(InvocationOnMock invocation) throws Throwab
// (2) producer that only logs errors

FlinkKafkaProducer<String> producerLogging = new FlinkKafkaProducer<String>(
"mock_topic", new JavaDefaultStringSchema(), new Properties(), null);
"mock_topic", new SimpleStringSchema(), new Properties(), null);
producerLogging.setLogFailuresOnly(true);

producerLogging.setRuntimeContext(new MockRuntimeContext(17, 3));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
import org.apache.flink.streaming.util.serialization.JavaDefaultStringSchema;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema;

import java.util.Random;
Expand Down Expand Up @@ -168,7 +168,7 @@ public void run() {
// we manually feed data into the Kafka sink
FlinkKafkaProducer<String> producer = null;
try {
producer = new FlinkKafkaProducer<>(kafkaConnectionString, topic, new JavaDefaultStringSchema());
producer = new FlinkKafkaProducer<>(kafkaConnectionString, topic, new SimpleStringSchema());
producer.setRuntimeContext(new MockRuntimeContext(1,0));
producer.open(new Configuration());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@ public class RMQSink<IN> extends RichSinkFunction<IN> {
private transient ConnectionFactory factory;
private transient Connection connection;
private transient Channel channel;
private SerializationSchema<IN, byte[]> schema;
private SerializationSchema<IN> schema;

public RMQSink(String HOST_NAME, String QUEUE_NAME, SerializationSchema<IN, byte[]> schema) {
public RMQSink(String HOST_NAME, String QUEUE_NAME, SerializationSchema<IN> schema) {
this.HOST_NAME = HOST_NAME;
this.QUEUE_NAME = QUEUE_NAME;
this.schema = schema;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.util.serialization.SerializationSchema;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;

public class RMQTopology {
Expand All @@ -30,23 +29,14 @@ public static void main(String[] args) throws Exception {

@SuppressWarnings("unused")
DataStreamSink<String> dataStream1 = env.addSource(
new RMQSource<String>("localhost", "hello", new SimpleStringSchema())).print();
new RMQSource<>("localhost", "hello", new SimpleStringSchema())).print();

@SuppressWarnings("unused")
DataStreamSink<String> dataStream2 = env.fromElements("one", "two", "three", "four", "five",
"q").addSink(
new RMQSink<String>("localhost", "hello", new StringToByteSerializer()));
new RMQSink<>("localhost", "hello", new SimpleStringSchema()));

env.execute();
}

public static class StringToByteSerializer implements SerializationSchema<String, byte[]> {

private static final long serialVersionUID = 1L;

@Override
public byte[] serialize(String element) {
return element.getBytes();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -976,7 +976,7 @@ public <X extends Tuple> DataStreamSink<T> writeAsCsv(String path, WriteMode wri
* schema for serialization
* @return the closed DataStream
*/
public DataStreamSink<T> writeToSocket(String hostName, int port, SerializationSchema<T, byte[]> schema) {
public DataStreamSink<T> writeToSocket(String hostName, int port, SerializationSchema<T> schema) {
DataStreamSink<T> returnStream = addSink(new SocketClientSink<T>(hostName, port, schema, 0));
returnStream.setParallelism(1); // It would not work if multiple instances would connect to the same port
return returnStream;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public class SocketClientSink<IN> extends RichSinkFunction<IN> {


private final SerializableObject lock = new SerializableObject();
private final SerializationSchema<IN, byte[]> schema;
private final SerializationSchema<IN> schema;
private final String hostName;
private final int port;
private final int maxNumRetries;
Expand All @@ -72,7 +72,7 @@ public class SocketClientSink<IN> extends RichSinkFunction<IN> {
* @param port Port of the server.
* @param schema Schema used to serialize the data into bytes.
*/
public SocketClientSink(String hostName, int port, SerializationSchema<IN, byte[]> schema) {
public SocketClientSink(String hostName, int port, SerializationSchema<IN> schema) {
this(hostName, port, schema, 0);
}

Expand All @@ -86,7 +86,7 @@ public SocketClientSink(String hostName, int port, SerializationSchema<IN, byte[
* @param schema Schema used to serialize the data into bytes.
* @param maxNumRetries The maximum number of retries after a message send failed.
*/
public SocketClientSink(String hostName, int port, SerializationSchema<IN, byte[]> schema, int maxNumRetries) {
public SocketClientSink(String hostName, int port, SerializationSchema<IN> schema, int maxNumRetries) {
this(hostName, port, schema, maxNumRetries, false);
}

Expand All @@ -100,7 +100,7 @@ public SocketClientSink(String hostName, int port, SerializationSchema<IN, byte[
* @param maxNumRetries The maximum number of retries after a message send failed.
* @param autoflush Flag to indicate whether the socket stream should be flushed after each message.
*/
public SocketClientSink(String hostName, int port, SerializationSchema<IN, byte[]> schema,
public SocketClientSink(String hostName, int port, SerializationSchema<IN> schema,
int maxNumRetries, boolean autoflush)
{
checkArgument(port > 0 && port < 65536, "port is out of range");
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ public class KeyedSerializationSchemaWrapper<T> implements KeyedSerializationSch

private static final long serialVersionUID = 1351665280744549933L;

private final SerializationSchema<T, byte[]> serializationSchema;
private final SerializationSchema<T> serializationSchema;

public KeyedSerializationSchemaWrapper(SerializationSchema<T, byte[]> serializationSchema) {
public KeyedSerializationSchemaWrapper(SerializationSchema<T> serializationSchema) {
this.serializationSchema = serializationSchema;
}

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,8 @@
* to them in a specific format (for example as byte strings).
*
* @param <T> The type to be serialized.
* @param <R> The serialized representation type.
*/
public interface SerializationSchema<T, R> extends Serializable {
public interface SerializationSchema<T> extends Serializable {

/**
* Serializes the incoming element to a specified type.
Expand All @@ -36,5 +35,5 @@ public interface SerializationSchema<T, R> extends Serializable {
* The incoming element to be serialized
* @return The serialized element.
*/
R serialize(T element);
byte[] serialize(T element);
}
Loading

0 comments on commit 09ea3dd

Please sign in to comment.