Skip to content

Commit

Permalink
[hotfix][connectors/kafka] Rename EventDeSerializer to EventDeSeriali…
Browse files Browse the repository at this point in the history
…zationSchema in examples
  • Loading branch information
Fabian Paul authored and rmetzger committed Sep 17, 2021
1 parent a83f8f4 commit 8664b36
Show file tree
Hide file tree
Showing 4 changed files with 8 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.examples.statemachine.event.Event;
import org.apache.flink.streaming.examples.statemachine.generator.EventsGeneratorSource;
import org.apache.flink.streaming.examples.statemachine.kafka.EventDeSerializer;
import org.apache.flink.streaming.examples.statemachine.kafka.EventDeSerializationSchema;

/**
* Job to generate input events that are written to Kafka, for the {@link StateMachineExample} job.
Expand Down Expand Up @@ -55,7 +55,7 @@ public static void main(String[] args) throws Exception {
.setRecordSerializer(
KafkaRecordSerializationSchema.builder()
.setValueSerializationSchema(
new EventDeSerializer())
new EventDeSerializationSchema())
.setTopic(kafkaTopic)
.build())
.build());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
import org.apache.flink.streaming.examples.statemachine.event.Alert;
import org.apache.flink.streaming.examples.statemachine.event.Event;
import org.apache.flink.streaming.examples.statemachine.generator.EventsGeneratorSource;
import org.apache.flink.streaming.examples.statemachine.kafka.EventDeSerializer;
import org.apache.flink.streaming.examples.statemachine.kafka.EventDeSerializationSchema;
import org.apache.flink.util.Collector;

/**
Expand Down Expand Up @@ -102,7 +102,7 @@ public static void main(String[] args) throws Exception {
.setTopics(kafkaTopic)
.setDeserializer(
KafkaRecordDeserializationSchema.valueOnly(
new EventDeSerializer()))
new EventDeSerializationSchema()))
.setStartingOffsets(OffsetsInitializer.latest())
.build();
events =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@
import java.nio.ByteOrder;

/** A serializer and deserializer for the {@link Event} type. */
public class EventDeSerializer implements DeserializationSchema<Event>, SerializationSchema<Event> {
public class EventDeSerializationSchema
implements DeserializationSchema<Event>, SerializationSchema<Event> {

private static final long serialVersionUID = 1L;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ private static class KafkaCollector implements Collector<Event>, AutoCloseable {

private final KafkaProducer<Object, byte[]> producer;

private final EventDeSerializer serializer;
private final EventDeSerializationSchema serializer;

private final String topic;

Expand All @@ -68,7 +68,7 @@ private static class KafkaCollector implements Collector<Event>, AutoCloseable {
KafkaCollector(String brokerAddress, String topic, int partition) {
this.topic = checkNotNull(topic);
this.partition = partition;
this.serializer = new EventDeSerializer();
this.serializer = new EventDeSerializationSchema();

// create Kafka producer
Properties properties = new Properties();
Expand Down

0 comments on commit 8664b36

Please sign in to comment.