Skip to content

Commit

Permalink
[streaming] Refactored db state connectors and added their license
Browse files Browse the repository at this point in the history
Conflicts:
	flink-addons/flink-streaming/flink-streaming-connectors/pom.xml
  • Loading branch information
mbalassi authored and StephanEwen committed Aug 29, 2014
1 parent d5f8a69 commit 4d407ca
Show file tree
Hide file tree
Showing 21 changed files with 220 additions and 571 deletions.
3 changes: 2 additions & 1 deletion DEPENDENCIES
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,8 @@ under BSD-style licenses

(New BSD License)
- Kryo (https://github.com/EsotericSoftware/kryo) - Copyright (c) 2008, Nathan Sweet

- LevelDB JNI (https://github.com/fusesource/leveldbjni/) - Copyright (c) 2011, FuseSource Corp.

(BSD-like License)
- Scala Library (http:https://www.scala-lang.org/) - Copyright (c) 2002-2014 EPFL, Copyright (c) 2011-2014 Typesafe, Inc.
- Scala Compiler (BSD-like) - (http:https://www.scala-lang.org/) - Copyright (c) 2002-2014 EPFL, Copyright (c) 2011-2014 Typesafe, Inc.
Expand Down
18 changes: 18 additions & 0 deletions LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -251,8 +251,26 @@ THE SOFTWARE.
BSD-style Licenses
-----------------------------------------------------------------------

The Apache Flink project depends on and/or bundles the following components
under BSD-style licenses

(3-clause BSD license)
- D3 (http:https://d3js.org/) - Copyright (c) 2010-2014, Michael Bostock
- Kryo (https://github.com/EsotericSoftware/kryo) - Copyright (c) 2008, Nathan Sweet
- LevelDB (http:https://code.google.com/p/leveldb/) - Copyright (c) 2011, The LevelDB Authors
- Memcached (https://github.com/memcached/memcached) - Copyright (c) 2003, Danga Interactive, Inc.
- Redis (http:https://redis.io/) - Copyright (c) 2009, Salvatore Sanfilippo and Pieter Noordhuis

(BSD-style License)
- Hamcrest (https://code.google.com/p/hamcrest/) - Copyright (c) 2000-2006, www.hamcrest.org

(BSD-like License)
- Scala Library (http:https://www.scala-lang.org/) - Copyright (c) 2002-2014 EPFL, Copyright (c) 2011-2014 Typesafe, Inc.
- Scala Compiler (BSD-like) - (http:https://www.scala-lang.org/) - Copyright (c) 2002-2014 EPFL, Copyright (c) 2011-2014 Typesafe, Inc.
- Scala Compiler Reflect (BSD-like) - (http:https://www.scala-lang.org/) - Copyright (c) 2002-2014 EPFL, Copyright (c) 2011-2014 Typesafe, Inc.
- ASM (BSD-like) - (http:https://asm.ow2.org/) - Copyright (c) 2000-2011 INRIA, France Telecom

(Below is the 3-clause BSD license)

All rights reserved.

Expand Down
22 changes: 21 additions & 1 deletion flink-addons/flink-streaming/flink-streaming-connectors/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,27 @@ under the License.
<version>2.2.0</version>
</dependency>

</dependencies>
<dependency>
<groupId>org.fusesource.leveldbjni</groupId>
<artifactId>leveldbjni-all</artifactId>
<version>1.8</version>
</dependency>

<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.4.2</version>
<type>jar</type>
<scope>compile</scope>
</dependency>

<dependency>
<groupId>com.google.code.simple-spring-memcached</groupId>
<artifactId>spymemcached</artifactId>
<version>2.8.4</version>
</dependency>

</dependencies>

<build>
<plugins>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.flink.streaming.connectors.db;

public interface DBState {

//TODO: consider more general parameters
public void put(String key, String value);

public String get(String key);

public void remove(String key);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.flink.streaming.connectors.db;

public interface DBStateIterator {

public boolean hasNext();

public String getNextKey();

public String getNextValue();

public void next();

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
*
*/

package org.apache.flink.streaming.state.database;
package org.apache.flink.streaming.connectors.db;

import static org.fusesource.leveldbjni.JniDBFactory.asString;
import static org.fusesource.leveldbjni.JniDBFactory.bytes;
Expand All @@ -29,40 +29,41 @@
import org.iq80.leveldb.DB;
import org.iq80.leveldb.Options;

public class LeveldbState {
public class LeveldbState implements DBState{

private DB database;
public LeveldbState(String dbName){

public LeveldbState(String dbName) {
Options options = new Options();
options.createIfMissing(true);
try {
database = factory.open(new File(dbName), options);
} catch (IOException e) {
e.printStackTrace();
throw new RuntimeException(e);
}
}
public void close(){

public void close() {
try {
database.close();
} catch (IOException e) {
e.printStackTrace();
throw new RuntimeException(e);
}
}
public void setTuple(String key, String value){

public void put(String key, String value) {
database.put(bytes(key), bytes(value));
}
public String getTuple(String key){

public String get(String key) {
return asString(database.get(bytes(key)));
}
public void deleteTuple(String key){

public void remove(String key) {
database.delete(bytes(key));
}
public LeveldbStateIterator getIterator(){

public LeveldbStateIterator getIterator() {
return new LeveldbStateIterator(database.iterator());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,32 +17,33 @@
*
*/

package org.apache.flink.streaming.state.database;
package org.apache.flink.streaming.connectors.db;

import static org.fusesource.leveldbjni.JniDBFactory.asString;

import org.iq80.leveldb.DBIterator;

public class LeveldbStateIterator {
public class LeveldbStateIterator implements DBStateIterator {
private DBIterator iterator;
public LeveldbStateIterator(DBIterator iter){
this.iterator=iter;

public LeveldbStateIterator(DBIterator iter) {
this.iterator = iter;
this.iterator.seekToFirst();
}
public boolean hasNext(){

public boolean hasNext() {
return iterator.hasNext();
}
public String getNextKey(){

public String getNextKey() {
return asString(iterator.peekNext().getKey());
}
public String getNextValue(){

public String getNextValue() {
return asString(iterator.peekNext().getValue());
}
public void next(){

public void next() {
iterator.next();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,46 +17,49 @@
*
*/

package org.apache.flink.streaming.state.database;
package org.apache.flink.streaming.connectors.db;

import java.io.IOException;
import java.net.InetSocketAddress;

import net.spy.memcached.MemcachedClient;

public class MemcachedState {

//Needs running Memcached service
public class MemcachedState implements DBState {

private MemcachedClient memcached;
public MemcachedState(){

public MemcachedState() {
try {
memcached=new MemcachedClient(new InetSocketAddress("localhost", 11211));
memcached = new MemcachedClient(new InetSocketAddress("localhost",
11211));
} catch (IOException e) {
e.printStackTrace();
throw new RuntimeException(e);
}
}
public MemcachedState(String hostname, int portNum){

public MemcachedState(String hostname, int portNum) {
try {
memcached=new MemcachedClient(new InetSocketAddress(hostname, portNum));
memcached = new MemcachedClient(new InetSocketAddress(hostname,
portNum));
} catch (IOException e) {
e.printStackTrace();
throw new RuntimeException(e);
}
}
public void close(){

public void close() {
memcached.shutdown();
}
public void setTuple(String key, Object value){

public void put(String key, String value) {
memcached.set(key, 0, value);
}
public Object getTuple(String key){
return memcached.get(key);

public String get(String key) {
return (String) memcached.get(key);
}
public void deleteTuple(String key){

public void remove(String key) {
memcached.delete(key);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@
*
*/

package org.apache.flink.streaming.state.database;
package org.apache.flink.streaming.connectors.db;

import redis.clients.jedis.Jedis;

//this is the redis-supported state. To use this state, the users are required to boot their redis server first.
public class RedisState {
//Needs running Redis service
public class RedisState implements DBState {

private Jedis jedis;

Expand All @@ -34,15 +34,15 @@ public void close(){
jedis.close();
}

public void setTuple(String key, String value){
public void put(String key, String value){
jedis.set(key, value);
}

public String getTuple(String key){
public String get(String key){
return jedis.get(key);
}

public void deleteTuple(String key){
public void remove(String key){
jedis.del(key);
}

Expand Down
Loading

0 comments on commit 4d407ca

Please sign in to comment.