Skip to content

Commit

Permalink
[FLINK-1638] [streaming] Kafka low level API example, documentation a…
Browse files Browse the repository at this point in the history
…nd fixes
  • Loading branch information
mbalassi authored and StephanEwen committed Mar 10, 2015
1 parent 5327d56 commit ed5ba95
Show file tree
Hide file tree
Showing 9 changed files with 216 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,11 @@
import org.apache.flink.runtime.state.OperatorState;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.api.KafkaSource;
<<<<<<< HEAD
import org.apache.flink.streaming.connectors.kafka.api.simple.PersistentKafkaSource;
=======
>>>>>>> a62796a... s
import org.apache.flink.streaming.connectors.util.JavaDefaultStringSchema;

public class KafkaConsumerExample {
Expand All @@ -37,15 +41,10 @@ public static void main(String[] args) throws Exception {

StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment().setDegreeOfParallelism(4);

@SuppressWarnings("unused")
DataStream<String> stream1 = env
.addSource(
// new KafkaSource<String>(host + ":" + port, topic, new JavaDefaultStringSchema()))
// new SimpleKafkaSource<String>(topic, host, port, new JavaDefaultStringSchema()))
new PersistentKafkaSource<String>(topic, host, port, 10L, new JavaDefaultStringSchema()))
.registerState("kafka", new OperatorState<Long>(null))
.setParallelism(3)
.print().setParallelism(3);
DataStream<String> kafkaStream = env
.addSource(new KafkaSource<String>(host + ":" + port, topic, new JavaDefaultStringSchema()));

kafkaStream.print();

env.execute();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.connectors.kafka;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.api.simple.PersistentKafkaSource;
import org.apache.flink.streaming.connectors.util.JavaDefaultStringSchema;

public class KafkaSimpleConsumerExample {

private static String host;
private static int port;
private static String topic;
private static int partition;
private static long offset;

public static void main(String[] args) throws Exception {

if (!parseParameters(args)) {
return;
}

StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment().setDegreeOfParallelism(4);

DataStream<String> kafkaStream = env
.addSource(new PersistentKafkaSource<String>(topic, host, port, partition, offset, new JavaDefaultStringSchema()));

kafkaStream.print();

env.execute();
}

private static boolean parseParameters(String[] args) {
if (args.length == 4) {
host = args[0];
port = Integer.parseInt(args[1]);
topic = args[2];
partition = Integer.parseInt(args[3]);
offset = Long.parseLong(args[4]);
return true;
} else {
System.err.println("Usage: KafkaConsumerExample <host> <port> <topic> <partition> <offset>");
return false;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
import org.apache.flink.util.Collector;

/**
* Source that listens to a Kafka topic.
* Source that listens to a Kafka topic using the high level Kafka API.
*
* @param <OUT>
* Type of the messages on the topic.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,18 @@
import kafka.javaapi.TopicMetadataRequest;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.message.MessageAndOffset;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Iterates the records received from a partition of a Kafka topic as byte arrays.
*/
public class KafkaConsumerIterator {

private static final long serialVersionUID = 1L;

private static final long DEFAULT_WAIT_ON_EMPTY_FETCH = 1000L;

private List<String> hosts;
private String topic;
private int port;
Expand All @@ -54,11 +61,21 @@ public class KafkaConsumerIterator {
private transient Iterator<MessageAndOffset> iter;
private transient FetchResponse fetchResponse;

public KafkaConsumerIterator(String host, int port, String topic, int partition,
/**
* Constructor with configurable wait time on empty fetch. For connecting to the Kafka service
* we use the so called simple or low level Kafka API thus directly connecting to one of the brokers.
*
* @param hostName Hostname of a known Kafka broker
* @param port Port of the known Kafka broker
* @param topic Name of the topic to listen to
* @param partition Partition in the chosen topic
* @param waitOnEmptyFetch wait time on empty fetch in millis
*/
public KafkaConsumerIterator(String hostName, int port, String topic, int partition,
long waitOnEmptyFetch) {

this.hosts = new ArrayList<String>();
hosts.add(host);
hosts.add(hostName);
this.port = port;

this.topic = topic;
Expand All @@ -68,14 +85,37 @@ public KafkaConsumerIterator(String host, int port, String topic, int partition,
replicaBrokers = new ArrayList<String>();
}

private void initialize() {
/**
* Constructor without configurable wait time on empty fetch. For connecting to the Kafka service
* we use the so called simple or low level Kafka API thus directly connecting to one of the brokers.
*
* @param hostName Hostname of a known Kafka broker
* @param port Port of the known Kafka broker
* @param topic Name of the topic to listen to
* @param partition Partition in the chosen topic
*/
public KafkaConsumerIterator(String hostName, int port, String topic, int partition){
this(hostName, port, topic, partition, DEFAULT_WAIT_ON_EMPTY_FETCH);
}

// --------------------------------------------------------------------------------------------
// Initializing a connection
// --------------------------------------------------------------------------------------------

/**
* Initializes the connection by detecting the leading broker of
* the topic and establishing a connection to it.
*/
private void initialize() throws InterruptedException {
PartitionMetadata metadata;
do {
metadata = findLeader(hosts, port, topic, partition);
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
e.printStackTrace();
if (metadata == null) {
try {
Thread.sleep(waitOnEmptyFetch);
} catch (InterruptedException e) {
throw new InterruptedException("Establishing connection to Kafka failed");
}
}
} while (metadata == null);

Expand All @@ -90,52 +130,78 @@ private void initialize() {
consumer = new SimpleConsumer(leadBroker, port, 100000, 64 * 1024, clientName);
}

public void initializeFromBeginning() {
/**
* Initializes a connection from the earliest available offset.
*/
public void initializeFromBeginning() throws InterruptedException {
initialize();
readOffset = getLastOffset(consumer, topic, partition,
kafka.api.OffsetRequest.EarliestTime(), clientName);

resetFetchResponse(readOffset);
}

public void initializeFromCurrent() {
/**
* Initializes a connection from the latest available offset.
*/
public void initializeFromCurrent() throws InterruptedException {
initialize();
readOffset = getLastOffset(consumer, topic, partition,
kafka.api.OffsetRequest.LatestTime(), clientName);

resetFetchResponse(readOffset);
}

public void initializeFromOffset(long offset) {
/**
* Initializes a connection from the specified offset.
*
* @param offset Desired Kafka offset
*/
public void initializeFromOffset(long offset) throws InterruptedException {
initialize();
readOffset = offset;
resetFetchResponse(readOffset);
}


// --------------------------------------------------------------------------------------------
// Iterator methods
// --------------------------------------------------------------------------------------------

/**
* Convenience method to emulate iterator behaviour.
*
* @return whether the iterator has a next element
*/
public boolean hasNext() {
return true;
}

public byte[] next() {
/**
* Returns the next message received from Kafka as a
* byte array.
*
* @return next message as a byte array.
*/
public byte[] next() throws InterruptedException {
return nextWithOffset().getMessage();
}

private void resetFetchResponse(long offset) {
FetchRequest req = new FetchRequestBuilder().clientId(clientName)
.addFetch(topic, partition, offset, 100000).build();
fetchResponse = consumer.fetch(req);
iter = fetchResponse.messageSet(topic, partition).iterator();
}

public MessageWithOffset nextWithOffset() {
/**
* Returns the next message and its offset received from
* Kafka encapsulated in a POJO.
*
* @return next message and its offset.
*/
public MessageWithOffset nextWithOffset() throws InterruptedException {

synchronized (fetchResponse) {
while (!iter.hasNext()) {
resetFetchResponse(readOffset);
try {
Thread.sleep(waitOnEmptyFetch);
} catch (InterruptedException e) {
e.printStackTrace();
throw new InterruptedException("Fetching from Kafka was interrupted");
}
}

Expand All @@ -152,17 +218,37 @@ public MessageWithOffset nextWithOffset() {

byte[] bytes = new byte[payload.limit()];
payload.get(bytes);

return new MessageWithOffset(messageAndOffset.offset(), bytes);
}
}

/**
* Resets the iterator to a given offset.
*
* @param offset Desired Kafka offset.
*/
public void reset(long offset) {
synchronized (fetchResponse) {
readOffset = offset;
resetFetchResponse(offset);
}
}

// --------------------------------------------------------------------------------------------
// Internal utilities
// --------------------------------------------------------------------------------------------

private void resetFetchResponse(long offset) {
FetchRequest req = new FetchRequestBuilder().clientId(clientName)
.addFetch(topic, partition, offset, 100000).build();
fetchResponse = consumer.fetch(req);

//TODO deal with broker failures

iter = fetchResponse.messageSet(topic, partition).iterator();
}

private PartitionMetadata findLeader(List<String> a_hosts, int a_port, String a_topic,
int a_partition) {
PartitionMetadata returnMetaData = null;
Expand Down Expand Up @@ -212,7 +298,7 @@ private static long getLastOffset(SimpleConsumer consumer, String topic, int par
OffsetResponse response = consumer.getOffsetsBefore(request);

if (response.hasError()) {
throw new RuntimeException("Error fetching data Offset Data the Broker. Reason: "
throw new RuntimeException("Error fetching data from Kafka broker. Reason: "
+ response.errorCode(topic, partition));
}
long[] offsets = response.offsets(topic, partition);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,13 @@ public class KafkaDeserializingConsumerIterator<IN> extends KafkaConsumerIterato

private DeserializationSchema<IN> deserializationSchema;

public KafkaDeserializingConsumerIterator(String host, int port, String topic, int partition, long waitOnEmptyFetch, DeserializationSchema<IN> deserializationSchema) {
public KafkaDeserializingConsumerIterator(String host, int port, String topic, int partition, long waitOnEmptyFetch,
DeserializationSchema<IN> deserializationSchema) {
super(host, port, topic, partition, waitOnEmptyFetch);
this.deserializationSchema = deserializationSchema;
}

public IN nextRecord() {
public IN nextRecord() throws InterruptedException {
return deserializationSchema.deserialize(next());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@

import kafka.admin.AdminUtils;

/**
* Factory for creating custom Kafka partitions.
*/
public class KafkaTopicFactory {

public static void createTopic(String zookeeperServer, String topicName, int numOfPartitions, int replicationFactor) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

package org.apache.flink.streaming.connectors.kafka.api.simple;

/**
* POJO encapsulating records received from Kafka with their offset.
*/
public class MessageWithOffset {
private long offset;
private byte[] message;
Expand Down
Loading

0 comments on commit ed5ba95

Please sign in to comment.