Skip to content

Commit

Permalink
[FLINK-3034] Redis Sink Connector
Browse files Browse the repository at this point in the history
This closes apache#1813
  • Loading branch information
subhankar authored and rmetzger committed Jul 7, 2016
1 parent cb78245 commit 3ab9e36
Show file tree
Hide file tree
Showing 28 changed files with 2,945 additions and 0 deletions.
1 change: 1 addition & 0 deletions docs/apis/streaming/connectors/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ Currently these systems are supported:
* [Twitter Streaming API](https://dev.twitter.com/docs/streaming-apis) (source)
* [Apache NiFi](https://nifi.apache.org) (sink/source)
* [Apache Cassandra](https://cassandra.apache.org/) (sink)
* [Redis](https://redis.io/) (sink)

To run an application using one of these connectors, additional third party
components are usually required to be installed and launched, e.g. the servers
Expand Down
177 changes: 177 additions & 0 deletions docs/apis/streaming/connectors/redis.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
---
title: "Redis Connector"

# Sub-level navigation
sub-nav-group: streaming
sub-nav-parent: connectors
sub-nav-pos: 6
sub-nav-title: Redis
---
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->

This connector provides a Sink that can write to
[Redis](https://redis.io/) and also can publish data to [Redis PubSub](https://redis.io/topics/pubsub). To use this connector, add the
following dependency to your project:
{% highlight xml %}
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-redis{{ site.scala_version_suffix }}</artifactId>
<version>{{site.version }}</version>
</dependency>
{% endhighlight %}
Version Compatibility: This module is compatible with Redis 2.8.5.

Note that the streaming connectors are currently not part of the binary distribution. You need to link them for cluster execution [explicitly]({{site.baseurl}}/apis/cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution).

#### Installing Redis
Follow the instructions from the [Redis download page](https://redis.io/download).

#### Redis Sink
A class providing an interface for sending data to Redis.
The sink can use three different methods for communicating with different type of Redis environments:
1. Single Redis Server
2. Redis Cluster
3. Redis Sentinel

This code shows how to create a sink that communicate to a single redis server:

<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}
public static class RedisExampleMapper implements RedisMapper<Tuple2<String, String>>{

@Override
public RedisCommandDescription getCommandDescription() {
return new RedisCommandDescription(RedisCommand.HSET, "HASH_NAME");
}

@Override
public String getKeyFromData(Tuple2<String, String> data) {
return data.f0;
}

@Override
public String getValueFromData(Tuple2<String, String> data) {
return data.f1;
}
}
FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("127.0.0.1").build();

DataStream<String> stream = ...;
stream.addSink(new RedisSink<Tuple2<String, String>>(conf, new RedisExampleMapper());
{% endhighlight %}
</div>
<div data-lang="scala" markdown="1">
{% highlight scala %}
class RedisExampleMapper extends RedisMapper[(String, String)]{
override def getCommandDescription: RedisCommandDescription = {
new RedisCommandDescription(RedisCommand.HSET, "HASH_NAME")
}

override def getKeyFromData(data: (String, String)): String = data._1

override def getValueFromData(data: (String, String)): String = data._2
}
val conf = new FlinkJedisPoolConfig.Builder().setHost("127.0.0.1").build()
stream.addSink(new RedisSink[(String, String)](conf, new RedisExampleMapper))
{% endhighlight %}
</div>
</div>

This example code does the same, but for Redis Cluster:

<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}

FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder()
.setNodes(new HashSet<InetSocketAddress>(Arrays.asList(new InetSocketAddress(5601)))).build();

DataStream<String> stream = ...;
stream.addSink(new RedisSink<Tuple2<String, String>>(conf, new RedisExampleMapper());
{% endhighlight %}
</div>
<div data-lang="scala" markdown="1">
{% highlight scala %}
val conf = new FlinkJedisPoolConfig.Builder().setNodes(...).build()
stream.addSink(new RedisSink[(String, String)](conf, new RedisExampleMapper))
{% endhighlight %}
</div>
</div>

This example shows when the Redis environment is with Sentinels:

<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}

FlinkJedisSentinelConfig conf = new FlinkJedisSentinelConfig.Builder()
.setMasterName("master").setSentinels(...).build();

DataStream<String> stream = ...;
stream.addSink(new RedisSink<Tuple2<String, String>>(conf, new RedisExampleMapper());
{% endhighlight %}
</div>
<div data-lang="scala" markdown="1">
{% highlight scala %}
val conf = new FlinkJedisSentinelConfig.Builder().setMasterName("master").setSentinels(...).build()
stream.addSink(new RedisSink[(String, String)](conf, new RedisExampleMapper))
{% endhighlight %}
</div>
</div>

This section gives a description of all the available data types and what Redis command used for that.

<table class="table table-bordered" style="width: 75%">
<thead>
<tr>
<th class="text-center" style="width: 20%">Data Type</th>
<th class="text-center" style="width: 25%">Redis Command [Sink]</th>
<th class="text-center" style="width: 25%">Redis Command [Source]</th>
</tr>
</thead>
<tbody>
<tr>
<td>HASH</td><td><a href="https://redis.io/commands/hset">HSET</a></td><td>--NA--</td>
</tr>
<tr>
<td>LIST</td><td>
<a href="https://redis.io/commands/rpush">RPUSH</a>,
<a href="https://redis.io/commands/lpush">LPUSH</a>
</td><td>--NA--</td>
</tr>
<tr>
<td>SET</td><td><a href="https://redis.io/commands/rpush">SADD</a></td><td>--NA--</td>
</tr>
<tr>
<td>PUBSUB</td><td><a href="https://redis.io/commands/publish">PUBLISH</a></td><td>--NA--</td>
</tr>
<tr>
<td>STRING</td><td><a href="https://redis.io/commands/set">SET</a></td><td>--NA--</td>
</tr>
<tr>
<td>HYPER_LOG_LOG</td><td><a href="https://redis.io/commands/pfadd">PFADD</a></td><td>--NA--</td>
</tr>
<tr>
<td>SORTED_SET</td><td><a href="https://redis.io/commands/zadd">ZADD</a></td><td>--NA--</td>
</tr>
</tbody>
</table>
More about Redis can be found [here](https://redis.io/).
5 changes: 5 additions & 0 deletions docs/apis/streaming/fault_tolerance.md
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,11 @@ state updates) of Flink coupled with bundled sinks:
<td>at least once</td>
<td></td>
</tr>
<tr>
<td>Redis sink</td>
<td>at least once</td>
<td></td>
</tr>
</tbody>
</table>

Expand Down
79 changes: 79 additions & 0 deletions flink-streaming-connectors/flink-connector-redis/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<project xmlns="https://maven.apache.org/POM/4.0.0"
xmlns:xsi="https://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="https://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">

<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-connectors</artifactId>
<version>1.1-SNAPSHOT</version>
<relativePath>..</relativePath>
</parent>

<artifactId>flink-connector-redis_2.10</artifactId>
<name>flink-connector-redis</name>

<packaging>jar</packaging>

<properties>
<jedis.version>2.8.0</jedis.version>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.10</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>${jedis.version}</version>
</dependency>

<dependency>
<groupId>com.github.kstyrc</groupId>
<artifactId>embedded-redis</artifactId>
<version>0.6</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.10</artifactId>
<version>${project.version}</version>
<scope>test</scope>
<type>test-jar</type>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils_2.10</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

</project>
Loading

0 comments on commit 3ab9e36

Please sign in to comment.