diff --git a/flink-end-to-end-tests/flink-queryable-state-test/pom.xml b/flink-end-to-end-tests/flink-queryable-state-test/pom.xml
new file mode 100644
index 0000000000000..a32deea3ca49e
--- /dev/null
+++ b/flink-end-to-end-tests/flink-queryable-state-test/pom.xml
@@ -0,0 +1,134 @@
+
+
+
+
+ flink-end-to-end-tests
+ org.apache.flink
+ 1.6-SNAPSHOT
+
+ 4.0.0
+
+ flink-queryable-state-test_${scala.binary.version}
+ flink-queryable-state-test
+
+ jar
+
+
+
+ org.apache.flink
+ flink-core
+ ${project.version}
+
+
+ org.apache.flink
+ flink-streaming-java_${scala.binary.version}
+ ${project.version}
+
+
+ org.apache.flink
+ flink-statebackend-rocksdb_${scala.binary.version}
+ ${project.version}
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-jar-plugin
+ 2.4
+
+
+
+ QsStateProducer
+ package
+
+ jar
+
+
+ QsStateProducer
+
+
+
+ org.apache.flink.streaming.tests.queryablestate.QsStateProducer
+
+
+
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-shade-plugin
+
+
+
+ QsStateClient
+ package
+
+ shade
+
+
+ false
+ false
+ false
+
+
+
+ org.apache.flink.streaming.tests.queryablestate.QsStateClient
+
+
+
+ QsStateClient
+
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-antrun-plugin
+ 1.7
+
+
+ rename
+ package
+
+ run
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apache/flink/streaming/tests/queryablestate/Email.java b/flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apache/flink/streaming/tests/queryablestate/Email.java
new file mode 100644
index 0000000000000..c98ac6d7eae31
--- /dev/null
+++ b/flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apache/flink/streaming/tests/queryablestate/Email.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests.queryablestate;
+
+import java.time.Instant;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
+
+/**
+ * Toy email resentation.
+ */
+public class Email {
+
+ private EmailId emailId;
+ private Instant timestamp;
+ private String foo;
+ private LabelSurrogate label;
+
+ public Email(EmailId emailId, Instant timestamp, String foo, LabelSurrogate label) {
+ this.emailId = emailId;
+ this.timestamp = timestamp;
+ this.foo = foo;
+ this.label = label;
+ }
+
+ public EmailId getEmailId() {
+ return emailId;
+ }
+
+ public void setEmailId(EmailId emailId) {
+ this.emailId = emailId;
+ }
+
+ public Instant getTimestamp() {
+ return timestamp;
+ }
+
+ public void setTimestamp(Instant timestamp) {
+ this.timestamp = timestamp;
+ }
+
+ public String getFoo() {
+ return foo;
+ }
+
+ public void setFoo(String foo) {
+ this.foo = foo;
+ }
+
+ public LabelSurrogate getLabel() {
+ return label;
+ }
+
+ public void setLabel(LabelSurrogate label) {
+ this.label = label;
+ }
+
+ public String getDate() {
+ DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd").withZone(ZoneId.of("UTC"));
+ return formatter.format(timestamp);
+ }
+}
diff --git a/flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apache/flink/streaming/tests/queryablestate/EmailId.java b/flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apache/flink/streaming/tests/queryablestate/EmailId.java
new file mode 100644
index 0000000000000..63c627602f267
--- /dev/null
+++ b/flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apache/flink/streaming/tests/queryablestate/EmailId.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests.queryablestate;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+/**
+ * POJO representing an EmailId.
+ */
+public class EmailId implements Serializable {
+
+ private static final long serialVersionUID = -5001464312464872467L;
+
+ private String emailId;
+
+ public EmailId() {
+
+ }
+
+ public EmailId(String emailId) {
+ this.emailId = Objects.requireNonNull(emailId);
+ }
+
+ public void setEmailId(String emailId) {
+ this.emailId = emailId;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ EmailId emailId1 = (EmailId) o;
+
+ return Objects.equals(emailId, emailId1.emailId);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(emailId);
+ }
+
+ public String getEmailId() {
+ return emailId;
+ }
+
+ @Override
+ public String toString() {
+ return "EmailId{" +
+ "emailId='" + emailId + '\'' +
+ '}';
+ }
+}
diff --git a/flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apache/flink/streaming/tests/queryablestate/EmailInformation.java b/flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apache/flink/streaming/tests/queryablestate/EmailInformation.java
new file mode 100644
index 0000000000000..b1d100aa89631
--- /dev/null
+++ b/flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apache/flink/streaming/tests/queryablestate/EmailInformation.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests.queryablestate;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * POJO representing some information about an email.
+ */
+public class EmailInformation implements Serializable {
+
+ private static final long serialVersionUID = -8956979869800484909L;
+
+ public void setEmailId(EmailId emailId) {
+ this.emailId = emailId;
+ }
+
+ private EmailId emailId;
+
+ public void setStuff(List stuff) {
+ this.stuff = stuff;
+ }
+
+ private List stuff;
+
+ public void setAsdf(Long asdf) {
+ this.asdf = asdf;
+ }
+
+ private Long asdf = 0L;
+
+ private transient LabelSurrogate label;
+
+ public EmailInformation() {
+
+ }
+
+ public EmailInformation(Email email) {
+ emailId = email.getEmailId();
+ stuff = new ArrayList<>();
+ stuff.add("1");
+ stuff.add("2");
+ stuff.add("3");
+ label = email.getLabel();
+ }
+
+ public EmailId getEmailId() {
+ return emailId;
+ }
+
+ public List getStuff() {
+ return stuff;
+ }
+
+ public Long getAsdf() {
+ return asdf;
+ }
+
+ public LabelSurrogate getLabel() {
+ return label;
+ }
+
+ public void setLabel(LabelSurrogate label) {
+ this.label = label;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ EmailInformation that = (EmailInformation) o;
+ return Objects.equals(emailId, that.emailId) &&
+ Objects.equals(stuff, that.stuff) &&
+ Objects.equals(asdf, that.asdf) &&
+ Objects.equals(label, that.label);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(emailId, stuff, asdf, label);
+ }
+
+ @Override
+ public String toString() {
+ return "EmailInformation{" +
+ "emailId=" + emailId +
+ ", stuff=" + stuff +
+ ", asdf=" + asdf +
+ ", label=" + label +
+ '}';
+ }
+}
diff --git a/flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apache/flink/streaming/tests/queryablestate/LabelSurrogate.java b/flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apache/flink/streaming/tests/queryablestate/LabelSurrogate.java
new file mode 100644
index 0000000000000..0977ef01b54c5
--- /dev/null
+++ b/flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apache/flink/streaming/tests/queryablestate/LabelSurrogate.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests.queryablestate;
+
+/**
+ * A label surrogate.
+ */
+public class LabelSurrogate {
+
+ private Type type;
+ private String foo;
+
+ public LabelSurrogate(Type type, String foo) {
+ this.type = type;
+ this.foo = foo;
+ }
+
+ public Type getType() {
+ return type;
+ }
+
+ public void setType(Type type) {
+ this.type = type;
+ }
+
+ public String getFoo() {
+ return foo;
+ }
+
+ public void setFoo(String foo) {
+ this.foo = foo;
+ }
+
+ @Override
+ public String toString() {
+ return "LabelSurrogate{" +
+ "type=" + type +
+ ", foo='" + foo + '\'' +
+ '}';
+ }
+
+ /**
+ * An exemplary enum.
+ */
+ public enum Type {
+ FOO,
+ BAR,
+ BAZ
+ }
+}
diff --git a/flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apache/flink/streaming/tests/queryablestate/QsConstants.java b/flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apache/flink/streaming/tests/queryablestate/QsConstants.java
new file mode 100644
index 0000000000000..7b26226cab5d1
--- /dev/null
+++ b/flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apache/flink/streaming/tests/queryablestate/QsConstants.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests.queryablestate;
+
+/**
+ * A class containing the constants used in the end-to-end test.
+ */
+public class QsConstants {
+
+ public static final String QUERY_NAME = "state";
+ public static final String STATE_NAME = "state";
+
+ public static final String KEY = "";
+}
diff --git a/flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apache/flink/streaming/tests/queryablestate/QsStateClient.java b/flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apache/flink/streaming/tests/queryablestate/QsStateClient.java
new file mode 100644
index 0000000000000..b0b8cedffbfd2
--- /dev/null
+++ b/flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apache/flink/streaming/tests/queryablestate/QsStateClient.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests.queryablestate;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.queryablestate.client.QueryableStateClient;
+import org.apache.flink.queryablestate.exceptions.UnknownKeyOrNamespaceException;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+/**
+ * A simple implementation of a queryable state client.
+ * This client queries the state for a while (~2.5 mins) and prints
+ * out the values that it found in the map state
+ *
+ *
Usage: java -jar QsStateClient.jar --host HOST --port PORT --job-id JOB_ID
+ */
+public class QsStateClient {
+
+ private static final int BOOTSTRAP_RETRIES = 240;
+
+ public static void main(final String[] args) throws Exception {
+
+ ParameterTool parameters = ParameterTool.fromArgs(args);
+
+ // setup values
+ String jobId = parameters.getRequired("job-id");
+ String host = parameters.get("host", "localhost");
+ int port = parameters.getInt("port", 9069);
+ int numIterations = parameters.getInt("iterations", 1500);
+
+ QueryableStateClient client = new QueryableStateClient(host, port);
+ client.setExecutionConfig(new ExecutionConfig());
+
+ MapStateDescriptor stateDescriptor =
+ new MapStateDescriptor<>(
+ QsConstants.STATE_NAME,
+ TypeInformation.of(new TypeHint() {
+
+ }),
+ TypeInformation.of(new TypeHint() {
+
+ })
+ );
+
+ // wait for state to exist
+ for (int i = 0; i < BOOTSTRAP_RETRIES; i++) { // ~120s
+ try {
+ getMapState(jobId, client, stateDescriptor);
+ break;
+ } catch (ExecutionException e) {
+ if (e.getCause() instanceof UnknownKeyOrNamespaceException) {
+ System.err.println("State does not exist yet; sleeping 500ms");
+ Thread.sleep(500L);
+ } else {
+ throw e;
+ }
+ }
+
+ if (i == (BOOTSTRAP_RETRIES - 1)) {
+ throw new RuntimeException("Timeout: state doesn't exist after 120s");
+ }
+ }
+
+ // query state
+ for (int iterations = 0; iterations < numIterations; iterations++) {
+
+ MapState mapState =
+ getMapState(jobId, client, stateDescriptor);
+
+ int counter = 0;
+ for (Map.Entry entry: mapState.entries()) {
+ // this is to force deserialization
+ entry.getKey();
+ entry.getValue();
+ counter++;
+ }
+ System.out.println("MapState has " + counter + " entries"); // we look for it in the test
+
+ Thread.sleep(100L);
+ }
+ }
+
+ private static MapState getMapState(
+ String jobId,
+ QueryableStateClient client,
+ MapStateDescriptor stateDescriptor) throws InterruptedException, ExecutionException {
+
+ CompletableFuture> resultFuture =
+ client.getKvState(
+ JobID.fromHexString(jobId),
+ QsConstants.QUERY_NAME,
+ QsConstants.KEY, // which key of the keyed state to access
+ BasicTypeInfo.STRING_TYPE_INFO,
+ stateDescriptor);
+
+ return resultFuture.get();
+ }
+}
diff --git a/flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apache/flink/streaming/tests/queryablestate/QsStateProducer.java b/flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apache/flink/streaming/tests/queryablestate/QsStateProducer.java
new file mode 100644
index 0000000000000..14eaac3f21df5
--- /dev/null
+++ b/flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apache/flink/streaming/tests/queryablestate/QsStateProducer.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests.queryablestate;
+
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.util.Collector;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Random;
+
+/**
+ * Streaming application that creates an {@link Email} pojo with random ids and increasing
+ * timestamps and passes it to a stateful {@link org.apache.flink.api.common.functions.FlatMapFunction},
+ * where it is exposed as queryable state.
+ */
+public class QsStateProducer {
+
+ public static void main(final String[] args) throws Exception {
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ ParameterTool tool = ParameterTool.fromArgs(args);
+ String tmpPath = tool.getRequired("tmp-dir");
+ String stateBackendType = tool.getRequired("state-backend");
+
+ StateBackend stateBackend;
+ switch (stateBackendType) {
+ case "rocksdb":
+ stateBackend = new RocksDBStateBackend(tmpPath);
+ break;
+ case "fs":
+ stateBackend = new FsStateBackend(tmpPath);
+ break;
+ case "memory":
+ stateBackend = new MemoryStateBackend();
+ break;
+ default:
+ throw new RuntimeException("Unsupported state backend " + stateBackendType);
+ }
+
+ env.setStateBackend(stateBackend);
+ env.enableCheckpointing(1000L);
+ env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
+ env.getCheckpointConfig().setMinPauseBetweenCheckpoints(0);
+
+ env.addSource(new EmailSource())
+ .keyBy(new KeySelector() {
+
+ private static final long serialVersionUID = -1480525724620425363L;
+
+ @Override
+ public String getKey(Email value) throws Exception {
+ return QsConstants.KEY;
+ }
+ })
+ .flatMap(new TestFlatMap());
+
+ env.execute();
+ }
+
+ private static class EmailSource extends RichSourceFunction {
+
+ private static final long serialVersionUID = -7286937645300388040L;
+
+ private transient volatile boolean isRunning;
+
+ private transient Random random;
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+ this.random = new Random();
+ this.isRunning = true;
+ }
+
+ @Override
+ public void run(SourceContext ctx) throws Exception {
+ // Sleep for 10 seconds on start to allow time to copy jobid
+ Thread.sleep(10000L);
+
+ int types = LabelSurrogate.Type.values().length;
+
+ while (isRunning) {
+ int r = random.nextInt(100);
+
+ final EmailId emailId = new EmailId(Integer.toString(random.nextInt()));
+ final Instant timestamp = Instant.now().minus(Duration.ofDays(1L));
+ final String foo = String.format("foo #%d", r);
+ final LabelSurrogate label = new LabelSurrogate(LabelSurrogate.Type.values()[r % types], "bar");
+
+ synchronized (ctx.getCheckpointLock()) {
+ ctx.collect(new Email(emailId, timestamp, foo, label));
+ }
+
+ Thread.sleep(30L);
+ }
+ }
+
+ @Override
+ public void cancel() {
+ isRunning = false;
+ }
+ }
+
+ private static class TestFlatMap extends RichFlatMapFunction implements CheckpointedFunction {
+
+ private static final long serialVersionUID = 7821128115999005941L;
+
+ private transient MapState state;
+ private transient int count;
+
+ @Override
+ public void open(Configuration parameters) {
+ MapStateDescriptor stateDescriptor =
+ new MapStateDescriptor<>(
+ QsConstants.STATE_NAME,
+ TypeInformation.of(new TypeHint() {
+
+ }),
+ TypeInformation.of(new TypeHint() {
+
+ })
+ );
+ stateDescriptor.setQueryable(QsConstants.QUERY_NAME);
+ state = getRuntimeContext().getMapState(stateDescriptor);
+ count = -1;
+ }
+
+ @Override
+ public void flatMap(Email value, Collector