From e079aef0efcd544322a028cc15a24bdaa456af4f Mon Sep 17 00:00:00 2001 From: Dong Lin Date: Sun, 21 Mar 2021 17:33:39 +0800 Subject: [PATCH] [hotfix][examples] Update StateMachineExample to use KafkaSource --- .../statemachine/StateMachineExample.java | 69 ++++++++++--------- 1 file changed, 36 insertions(+), 33 deletions(-) diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/StateMachineExample.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/StateMachineExample.java index 3dfd131dea8bf..1bb76adcb6fbd 100644 --- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/StateMachineExample.java +++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/StateMachineExample.java @@ -18,18 +18,20 @@ package org.apache.flink.streaming.examples.statemachine; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.RichFlatMapFunction; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.kafka.source.KafkaSource; +import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; +import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema; import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.runtime.state.hashmap.HashMapStateBackend; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.source.SourceFunction; -import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.streaming.examples.statemachine.dfa.State; import org.apache.flink.streaming.examples.statemachine.event.Alert; import org.apache.flink.streaming.examples.statemachine.event.Event; @@ -37,8 +39,6 @@ import org.apache.flink.streaming.examples.statemachine.kafka.EventDeSerializer; import org.apache.flink.util.Collector; -import java.util.Properties; - /** * Main class of the state machine example. This class implements the streaming application that * receives the stream of events and evaluates a state machine (per originating address) to validate @@ -69,9 +69,26 @@ public static void main(String[] args) throws Exception { // ---- determine whether to use the built-in source, or read from Kafka ---- - final SourceFunction source; + final DataStream events; final ParameterTool params = ParameterTool.fromArgs(args); + // create the environment to create streams and configure execution + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.enableCheckpointing(2000L); + + final String stateBackend = params.get("backend", "memory"); + if ("hashmap".equals(stateBackend)) { + final String checkpointDir = params.get("checkpoint-dir"); + boolean asyncCheckpoints = params.getBoolean("async-checkpoints", false); + env.setStateBackend(new HashMapStateBackend(asyncCheckpoints)); + env.getCheckpointConfig().setCheckpointStorage(checkpointDir); + } else if ("rocks".equals(stateBackend)) { + final String checkpointDir = params.get("checkpoint-dir"); + boolean incrementalCheckpoints = params.getBoolean("incremental-checkpoints", false); + env.setStateBackend(new EmbeddedRocksDBStateBackend(incrementalCheckpoints)); + env.getCheckpointConfig().setCheckpointStorage(checkpointDir); + } + if (params.has("kafka-topic")) { // set up the Kafka reader String kafkaTopic = params.get("kafka-topic"); @@ -80,14 +97,19 @@ public static void main(String[] args) throws Exception { System.out.printf("Reading from kafka topic %s @ %s\n", kafkaTopic, brokers); System.out.println(); - Properties kafkaProps = new Properties(); - kafkaProps.setProperty("bootstrap.servers", brokers); - - FlinkKafkaConsumer kafka = - new FlinkKafkaConsumer<>(kafkaTopic, new EventDeSerializer(), kafkaProps); - kafka.setStartFromLatest(); - kafka.setCommitOffsetsOnCheckpoints(false); - source = kafka; + KafkaSource source = + KafkaSource.builder() + .setBootstrapServers(brokers) + .setGroupId("stateMachineExample") + .setTopics(kafkaTopic) + .setDeserializer( + KafkaRecordDeserializationSchema.valueOnly( + new EventDeSerializer())) + .setStartingOffsets(OffsetsInitializer.latest()) + .build(); + events = + env.fromSource( + source, WatermarkStrategy.noWatermarks(), "StateMachineExampleSource"); } else { double errorRate = params.getDouble("error-rate", 0.0); int sleep = params.getInt("sleep", 1); @@ -97,35 +119,16 @@ public static void main(String[] args) throws Exception { errorRate, sleep); System.out.println(); - source = new EventsGeneratorSource(errorRate, sleep); + events = env.addSource(new EventsGeneratorSource(errorRate, sleep)); } // ---- main program ---- - // create the environment to create streams and configure execution - final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.enableCheckpointing(2000L); - - final String stateBackend = params.get("backend", "memory"); - if ("hashmap".equals(stateBackend)) { - final String checkpointDir = params.get("checkpoint-dir"); - boolean asyncCheckpoints = params.getBoolean("async-checkpoints", false); - env.setStateBackend(new HashMapStateBackend(asyncCheckpoints)); - env.getCheckpointConfig().setCheckpointStorage(checkpointDir); - } else if ("rocks".equals(stateBackend)) { - final String checkpointDir = params.get("checkpoint-dir"); - boolean incrementalCheckpoints = params.getBoolean("incremental-checkpoints", false); - env.setStateBackend(new EmbeddedRocksDBStateBackend(incrementalCheckpoints)); - env.getCheckpointConfig().setCheckpointStorage(checkpointDir); - } - final String outputFile = params.get("output"); // make parameters available in the web interface env.getConfig().setGlobalJobParameters(params); - DataStream events = env.addSource(source); - DataStream alerts = events // partition on the address to make sure equal addresses