From 5f97ac9281a443cd72b374c293e525ee5e9b4149 Mon Sep 17 00:00:00 2001 From: yew1eb Date: Thu, 10 Aug 2017 23:28:00 +0800 Subject: [PATCH] [FLINK-7415] [cassandra] Add example instructions for creating keyspace This closes #4519. --- .../batch/connectors/cassandra/example/BatchExample.java | 5 +++-- .../cassandra/example/CassandraPojoSinkExample.java | 3 ++- .../cassandra/example/CassandraTupleSinkExample.java | 3 ++- .../example/CassandraTupleWriteAheadSinkExample.java | 3 ++- 4 files changed, 9 insertions(+), 5 deletions(-) diff --git a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/BatchExample.java b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/BatchExample.java index af21f2d4a5eab..20a020802630a 100644 --- a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/BatchExample.java +++ b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/BatchExample.java @@ -34,8 +34,9 @@ /** * This is an example showing the to use the Cassandra Input-/OutputFormats in the Batch API. * - *

The example assumes that a table exists in a local cassandra database, according to the following query: - * CREATE TABLE test.batches (number int, strings text, PRIMARY KEY(number, strings)); + *

The example assumes that a table exists in a local cassandra database, according to the following queries: + * CREATE KEYSPACE IF NOT EXISTS test WITH replication = {'class': 'SimpleStrategy', 'replication_factor': ‘1’}; + * CREATE TABLE IF NOT EXISTS test.batches (number int, strings text, PRIMARY KEY(number, strings)); */ public class BatchExample { private static final String INSERT_QUERY = "INSERT INTO test.batches (number, strings) VALUES (?,?);"; diff --git a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraPojoSinkExample.java b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraPojoSinkExample.java index a38b73b72c991..01cd6e8048f27 100644 --- a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraPojoSinkExample.java +++ b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraPojoSinkExample.java @@ -32,7 +32,8 @@ * *

Pojo's have to be annotated with datastax annotations to work with this sink. * - *

The example assumes that a table exists in a local cassandra database, according to the following query: + *

The example assumes that a table exists in a local cassandra database, according to the following queries: + * CREATE KEYSPACE IF NOT EXISTS test WITH replication = {'class': 'SimpleStrategy', 'replication_factor': ‘1’}; * CREATE TABLE IF NOT EXISTS test.message(body txt PRIMARY KEY) */ public class CassandraPojoSinkExample { diff --git a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraTupleSinkExample.java b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraTupleSinkExample.java index ce2326f4e569c..72013d5141a3f 100644 --- a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraTupleSinkExample.java +++ b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraTupleSinkExample.java @@ -31,7 +31,8 @@ /** * This is an example showing the to use the Tuple Cassandra Sink in the Streaming API. * - *

The example assumes that a table exists in a local cassandra database, according to the following query: + *

The example assumes that a table exists in a local cassandra database, according to the following queries: + * CREATE KEYSPACE IF NOT EXISTS test WITH replication = {'class': 'SimpleStrategy', 'replication_factor': ‘1’}; * CREATE TABLE IF NOT EXISTS test.writetuple(element1 text PRIMARY KEY, element2 int) */ public class CassandraTupleSinkExample { diff --git a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraTupleWriteAheadSinkExample.java b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraTupleWriteAheadSinkExample.java index 38618feaf7771..8cab311be7f90 100644 --- a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraTupleWriteAheadSinkExample.java +++ b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraTupleWriteAheadSinkExample.java @@ -36,7 +36,8 @@ /** * This is an example showing the to use the Cassandra Sink (with write-ahead log) in the Streaming API. * - *

The example assumes that a table exists in a local cassandra database, according to the following query: + *

The example assumes that a table exists in a local cassandra database, according to the following queries: + * CREATE KEYSPACE IF NOT EXISTS example WITH replication = {'class': 'SimpleStrategy', 'replication_factor': ‘1’}; * CREATE TABLE example.values (id text, count int, PRIMARY KEY(id)); * *

Important things to note are that checkpointing is enabled, a StateBackend is set and the enableWriteAheadLog() call