Skip to content

Commit

Permalink
[FLINK-10440][cassandra] Add CassandraPojoOutputFormat
Browse files Browse the repository at this point in the history
  • Loading branch information
bmeriaux authored and zentol committed Oct 17, 2018
1 parent 953d42e commit b1e3e31
Show file tree
Hide file tree
Showing 4 changed files with 178 additions and 15 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.batch.connectors.cassandra;

import org.apache.flink.api.common.io.RichOutputFormat;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
import org.apache.flink.streaming.connectors.cassandra.MapperOptions;
import org.apache.flink.util.Preconditions;

import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.Session;
import com.datastax.driver.mapping.Mapper;
import com.datastax.driver.mapping.MappingManager;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;

/**
* OutputFormat to write data to Apache Cassandra and from a custom Cassandra annotated object.
*
* @param <OUT> type of outputClass
*/
public class CassandraPojoOutputFormat<OUT> extends RichOutputFormat<OUT> {

private static final Logger LOG = LoggerFactory.getLogger(CassandraPojoOutputFormat.class);
private static final long serialVersionUID = -1701885135103942460L;

private final ClusterBuilder builder;

private final MapperOptions mapperOptions;
private final Class<OUT> outputClass;

private transient Cluster cluster;
private transient Session session;
private transient Mapper<OUT> mapper;
private transient FutureCallback<Void> callback;
private transient Throwable exception = null;

public CassandraPojoOutputFormat(ClusterBuilder builder, Class<OUT> outputClass) {
this(builder, outputClass, null);
}

public CassandraPojoOutputFormat(ClusterBuilder builder, Class<OUT> outputClass, MapperOptions mapperOptions) {
Preconditions.checkNotNull(outputClass, "OutputClass cannot be null");
Preconditions.checkNotNull(builder, "Builder cannot be null");
this.builder = builder;
this.mapperOptions = mapperOptions;
this.outputClass = outputClass;
}

@Override
public void configure(Configuration parameters) {
this.cluster = builder.getCluster();
}

/**
* Opens a Session to Cassandra and initializes the prepared statement.
*
* @param taskNumber The number of the parallel instance.
*/
@Override
public void open(int taskNumber, int numTasks) {
this.session = cluster.connect();
MappingManager mappingManager = new MappingManager(session);
this.mapper = mappingManager.mapper(outputClass);
if (mapperOptions != null) {
Mapper.Option[] optionsArray = mapperOptions.getMapperOptions();
if (optionsArray != null) {
mapper.setDefaultSaveOptions(optionsArray);
}
}
this.callback = new FutureCallback<Void>() {
@Override
public void onSuccess(Void ignored) {
onWriteSuccess();
}

@Override
public void onFailure(Throwable t) {
onWriteFailure(t);
}
};
}

@Override
public void writeRecord(OUT record) throws IOException {
if (exception != null) {
throw new IOException("write record failed", exception);
}

ListenableFuture<Void> result = mapper.saveAsync(record);
Futures.addCallback(result, callback);
}


/**
* Callback that is invoked after a record is written to Cassandra successfully.
*
* <p>Subclass can override to provide its own logic.
*/
protected void onWriteSuccess() {
}

/**
* Callback that is invoked when failing to write to Cassandra.
* Current implementation will record the exception and fail the job upon next record.
*
* <p>Subclass can override to provide its own failure handling logic.
* @param t the exception
*/
protected void onWriteFailure(Throwable t) {
exception = t;
}

/**
* Closes all resources used.
*/
@Override
public void close() {
mapper = null;
try {
if (session != null) {
session.close();
}
} catch (Exception e) {
LOG.error("Error while closing session.", e);
}

try {
if (cluster != null) {
cluster.close();
}
} catch (Exception e) {
LOG.error("Error while closing cluster.", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,40 +19,40 @@

import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.batch.connectors.cassandra.CassandraPojoInputFormat;
import org.apache.flink.batch.connectors.cassandra.CassandraTupleOutputFormat;
import org.apache.flink.batch.connectors.cassandra.CassandraPojoOutputFormat;
import org.apache.flink.batch.connectors.cassandra.CustomCassandraAnnotatedPojo;
import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;

import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.ConsistencyLevel;
import com.datastax.driver.mapping.Mapper;

import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

/**
* This is an example showing the to use the {@link CassandraPojoInputFormat}/{@link CassandraTupleOutputFormat} in the Batch API.
* This is an example showing the to use the {@link CassandraPojoInputFormat}/{@link CassandraPojoOutputFormat} in the Batch API.
*
* <p>The example assumes that a table exists in a local cassandra database, according to the following queries:
* CREATE KEYSPACE IF NOT EXISTS flink WITH replication = {'class': 'SimpleStrategy', 'replication_factor': ‘1’};
* CREATE TABLE IF NOT EXISTS flink.batches (id text, counter int, batch_id int, PRIMARY KEY(id, counter, batchId));
*/
public class BatchPojoExample {
private static final String INSERT_QUERY = "INSERT INTO flink.batches (id, counter, batch_id) VALUES (?,?,?);";
private static final String SELECT_QUERY = "SELECT id, counter, batch_id FROM flink.batches;";

public static void main(String[] args) throws Exception {

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);

ArrayList<Tuple3<String, Integer, Integer>> collection = new ArrayList<>(20);
for (int i = 0; i < 20; i++) {
collection.add(new Tuple3<>("string " + i, i, i));
}
List<CustomCassandraAnnotatedPojo> customCassandraAnnotatedPojos = IntStream.range(0, 20)
.mapToObj(x -> new CustomCassandraAnnotatedPojo(UUID.randomUUID().toString(), x, 0))
.collect(Collectors.toList());

DataSet<Tuple3<String, Integer, Integer>> dataSet = env.fromCollection(collection);
DataSet<CustomCassandraAnnotatedPojo> dataSet = env.fromCollection(customCassandraAnnotatedPojos);

ClusterBuilder clusterBuilder = new ClusterBuilder() {
private static final long serialVersionUID = -1754532803757154795L;
Expand All @@ -63,7 +63,7 @@ protected Cluster buildCluster(Cluster.Builder builder) {
}
};

dataSet.output(new CassandraTupleOutputFormat<>(INSERT_QUERY, clusterBuilder));
dataSet.output(new CassandraPojoOutputFormat<>(clusterBuilder, CustomCassandraAnnotatedPojo.class, () -> new Mapper.Option[]{Mapper.Option.saveNullFields(true)}));

env.execute("Write");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.flink.batch.connectors.cassandra.CassandraInputFormat;
import org.apache.flink.batch.connectors.cassandra.CassandraOutputFormat;
import org.apache.flink.batch.connectors.cassandra.CassandraPojoInputFormat;
import org.apache.flink.batch.connectors.cassandra.CassandraPojoOutputFormat;
import org.apache.flink.batch.connectors.cassandra.CassandraRowOutputFormat;
import org.apache.flink.batch.connectors.cassandra.CassandraTupleOutputFormat;
import org.apache.flink.batch.connectors.cassandra.CustomCassandraAnnotatedPojo;
Expand All @@ -51,6 +52,7 @@
import com.datastax.driver.core.QueryOptions;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Session;
import com.datastax.driver.mapping.Mapper;
import org.apache.cassandra.service.CassandraDaemon;
import org.junit.AfterClass;
import org.junit.Assert;
Expand Down Expand Up @@ -486,13 +488,17 @@ public void testCassandraBatchPojoFormat() throws Exception {

session.execute(CREATE_TABLE_QUERY.replace(TABLE_NAME_VARIABLE, CustomCassandraAnnotatedPojo.TABLE_NAME));

CassandraPojoSink<CustomCassandraAnnotatedPojo> sink = new CassandraPojoSink<>(CustomCassandraAnnotatedPojo.class, builder);
OutputFormat<CustomCassandraAnnotatedPojo> sink = new CassandraPojoOutputFormat<>(builder, CustomCassandraAnnotatedPojo.class, () -> new Mapper.Option[]{Mapper.Option.saveNullFields(true)});

List<CustomCassandraAnnotatedPojo> customCassandraAnnotatedPojos = IntStream.range(0, 20)
.mapToObj(x -> new CustomCassandraAnnotatedPojo(UUID.randomUUID().toString(), x, 0))
.collect(Collectors.toList());
try {
sink.open(new Configuration());
customCassandraAnnotatedPojos.forEach(sink::send);
sink.configure(new Configuration());
sink.open(0, 1);
for (CustomCassandraAnnotatedPojo customCassandraAnnotatedPojo : customCassandraAnnotatedPojos) {
sink.writeRecord(customCassandraAnnotatedPojo);
}
} finally {
sink.close();
}
Expand Down
2 changes: 1 addition & 1 deletion tools/maven/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ under the License.

<!-- Cassandra connectors have to use guava directly -->
<suppress
files="AbstractCassandraTupleSink.java|CassandraInputFormat.java|CassandraOutputFormatBase.java|CassandraSinkBase.java|CassandraSinkBaseTest.java|CassandraPojoSink.java|CassandraRowSink.java|CassandraTupleWriteAheadSink.java|CassandraRowWriteAheadSink.java"
files="AbstractCassandraTupleSink.java|CassandraInputFormat.java|CassandraOutputFormatBase.java|CassandraSinkBase.java|CassandraSinkBaseTest.java|CassandraPojoSink.java|CassandraRowSink.java|CassandraTupleWriteAheadSink.java|CassandraRowWriteAheadSink.java|CassandraPojoOutputFormat.java"
checks="IllegalImport"/>
<!-- Kinesis producer has to use guava directly -->
<suppress
Expand Down

0 comments on commit b1e3e31

Please sign in to comment.