Skip to content

Commit

Permalink
[hotfix][examples] Update StateMachineExample to use KafkaSource
Browse files Browse the repository at this point in the history
  • Loading branch information
lindong28 authored and becketqin committed Mar 29, 2021
1 parent 3abf555 commit e079aef
Showing 1 changed file with 36 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,27 +18,27 @@

package org.apache.flink.streaming.examples.statemachine;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.examples.statemachine.dfa.State;
import org.apache.flink.streaming.examples.statemachine.event.Alert;
import org.apache.flink.streaming.examples.statemachine.event.Event;
import org.apache.flink.streaming.examples.statemachine.generator.EventsGeneratorSource;
import org.apache.flink.streaming.examples.statemachine.kafka.EventDeSerializer;
import org.apache.flink.util.Collector;

import java.util.Properties;

/**
* Main class of the state machine example. This class implements the streaming application that
* receives the stream of events and evaluates a state machine (per originating address) to validate
Expand Down Expand Up @@ -69,9 +69,26 @@ public static void main(String[] args) throws Exception {

// ---- determine whether to use the built-in source, or read from Kafka ----

final SourceFunction<Event> source;
final DataStream<Event> events;
final ParameterTool params = ParameterTool.fromArgs(args);

// create the environment to create streams and configure execution
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(2000L);

final String stateBackend = params.get("backend", "memory");
if ("hashmap".equals(stateBackend)) {
final String checkpointDir = params.get("checkpoint-dir");
boolean asyncCheckpoints = params.getBoolean("async-checkpoints", false);
env.setStateBackend(new HashMapStateBackend(asyncCheckpoints));
env.getCheckpointConfig().setCheckpointStorage(checkpointDir);
} else if ("rocks".equals(stateBackend)) {
final String checkpointDir = params.get("checkpoint-dir");
boolean incrementalCheckpoints = params.getBoolean("incremental-checkpoints", false);
env.setStateBackend(new EmbeddedRocksDBStateBackend(incrementalCheckpoints));
env.getCheckpointConfig().setCheckpointStorage(checkpointDir);
}

if (params.has("kafka-topic")) {
// set up the Kafka reader
String kafkaTopic = params.get("kafka-topic");
Expand All @@ -80,14 +97,19 @@ public static void main(String[] args) throws Exception {
System.out.printf("Reading from kafka topic %s @ %s\n", kafkaTopic, brokers);
System.out.println();

Properties kafkaProps = new Properties();
kafkaProps.setProperty("bootstrap.servers", brokers);

FlinkKafkaConsumer<Event> kafka =
new FlinkKafkaConsumer<>(kafkaTopic, new EventDeSerializer(), kafkaProps);
kafka.setStartFromLatest();
kafka.setCommitOffsetsOnCheckpoints(false);
source = kafka;
KafkaSource<Event> source =
KafkaSource.<Event>builder()
.setBootstrapServers(brokers)
.setGroupId("stateMachineExample")
.setTopics(kafkaTopic)
.setDeserializer(
KafkaRecordDeserializationSchema.valueOnly(
new EventDeSerializer()))
.setStartingOffsets(OffsetsInitializer.latest())
.build();
events =
env.fromSource(
source, WatermarkStrategy.noWatermarks(), "StateMachineExampleSource");
} else {
double errorRate = params.getDouble("error-rate", 0.0);
int sleep = params.getInt("sleep", 1);
Expand All @@ -97,35 +119,16 @@ public static void main(String[] args) throws Exception {
errorRate, sleep);
System.out.println();

source = new EventsGeneratorSource(errorRate, sleep);
events = env.addSource(new EventsGeneratorSource(errorRate, sleep));
}

// ---- main program ----

// create the environment to create streams and configure execution
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(2000L);

final String stateBackend = params.get("backend", "memory");
if ("hashmap".equals(stateBackend)) {
final String checkpointDir = params.get("checkpoint-dir");
boolean asyncCheckpoints = params.getBoolean("async-checkpoints", false);
env.setStateBackend(new HashMapStateBackend(asyncCheckpoints));
env.getCheckpointConfig().setCheckpointStorage(checkpointDir);
} else if ("rocks".equals(stateBackend)) {
final String checkpointDir = params.get("checkpoint-dir");
boolean incrementalCheckpoints = params.getBoolean("incremental-checkpoints", false);
env.setStateBackend(new EmbeddedRocksDBStateBackend(incrementalCheckpoints));
env.getCheckpointConfig().setCheckpointStorage(checkpointDir);
}

final String outputFile = params.get("output");

// make parameters available in the web interface
env.getConfig().setGlobalJobParameters(params);

DataStream<Event> events = env.addSource(source);

DataStream<Alert> alerts =
events
// partition on the address to make sure equal addresses
Expand Down

0 comments on commit e079aef

Please sign in to comment.