Skip to content

Commit

Permalink
append to apache#2135, add
Browse files Browse the repository at this point in the history
1). fix issue of NO_TIMESTAMP type in 10;
2). rename field to 'timestamp';
  • Loading branch information
mingmxu authored and davorbonaci committed Mar 24, 2017
1 parent 7412427 commit f10509e
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,14 @@
*/
package org.apache.beam.sdk.io.kafka;

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Collection;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.expression.Expression;
import org.springframework.expression.ExpressionParser;
import org.springframework.expression.spel.SpelParserConfiguration;
Expand All @@ -33,16 +37,28 @@
* to eliminate the method definition differences.
*/
class ConsumerSpEL {
SpelParserConfiguration config = new SpelParserConfiguration(true, true);
ExpressionParser parser = new SpelExpressionParser(config);
private static final Logger LOG = LoggerFactory.getLogger(ConsumerSpEL.class);

private SpelParserConfiguration config = new SpelParserConfiguration(true, true);
private ExpressionParser parser = new SpelExpressionParser(config);

Expression seek2endExpression =
private Expression seek2endExpression =
parser.parseExpression("#consumer.seekToEnd(#tp)");

Expression assignExpression =
private Expression assignExpression =
parser.parseExpression("#consumer.assign(#tp)");

public ConsumerSpEL() {}
private Method timestampMethod;
private boolean hasRecordTimestamp = false;

public ConsumerSpEL() {
try {
timestampMethod = ConsumerRecord.class.getMethod("timestamp", (Class<?>[]) null);
hasRecordTimestamp = timestampMethod.getReturnType().equals(Long.TYPE);
} catch (NoSuchMethodException | SecurityException e) {
LOG.debug("Timestamp for Kafka message is not available.");
}
}

public void evaluateSeek2End(Consumer consumer, TopicPartition topicPartitions) {
StandardEvaluationContext mapContext = new StandardEvaluationContext();
Expand All @@ -57,4 +73,19 @@ public void evaluateAssign(Consumer consumer, Collection<TopicPartition> topicPa
mapContext.setVariable("tp", topicPartitions);
assignExpression.getValue(mapContext);
}

public long getRecordTimestamp(ConsumerRecord<byte[], byte[]> rawRecord) {
long timestamp;
try {
//for Kafka 0.9, set to System.currentTimeMillis();
//for kafka 0.10, when NO_TIMESTAMP also set to System.currentTimeMillis();
if (!hasRecordTimestamp || (timestamp = (long) timestampMethod.invoke(rawRecord)) <= 0L) {
timestamp = System.currentTimeMillis();
}
} catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
// Not expected. Method timestamp() is already checked.
throw new RuntimeException(e);
}
return timestamp;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,14 @@
* {@link ProducerConfig} for sink. E.g. if you would like to enable offset
* <em>auto commit</em> (for external monitoring or other purposes), you can set
* <tt>"group.id"</tt>, <tt>"enable.auto.commit"</tt>, etc.
*
* <h3>Event Timestamp and Watermark</h3>
* By default record timestamp and watermark are based on processing time in KafkaIO reader.
* This can be overridden by providing {@code WatermarkFn} with
* {@link Read#withWatermarkFn(SerializableFunction)}, and {@code TimestampFn} with
* {@link Read#withTimestampFn(SerializableFunction)}.<br>
* Note that {@link KafkaRecord#getTimestamp()} reflects timestamp provided by Kafka if any,
* otherwise it is set to processing time.
*/
public class KafkaIO {
/**
Expand Down Expand Up @@ -428,6 +436,7 @@ public void validate(PBegin input) {
checkNotNull(getValueCoder(), "Value coder must be set");
}

@Override
public PCollection<KafkaRecord<K, V>> expand(PBegin input) {
// Handles unbounded source to bounded conversion if maxNumRecords or maxReadTime is set.
Unbounded<KafkaRecord<K, V>> unbounded =
Expand Down Expand Up @@ -458,6 +467,7 @@ UnboundedSource<KafkaRecord<K, V>, KafkaCheckpointMark> makeSource() {
private static <KeyT, ValueT, OutT> SerializableFunction<KafkaRecord<KeyT, ValueT>, OutT>
unwrapKafkaAndThen(final SerializableFunction<KV<KeyT, ValueT>, OutT> fn) {
return new SerializableFunction<KafkaRecord<KeyT, ValueT>, OutT>() {
@Override
public OutT apply(KafkaRecord<KeyT, ValueT> record) {
return fn.apply(record.getKV());
}
Expand Down Expand Up @@ -499,6 +509,7 @@ public OutT apply(KafkaRecord<KeyT, ValueT> record) {
private static final SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>>
KAFKA_CONSUMER_FACTORY_FN =
new SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>>() {
@Override
public Consumer<byte[], byte[]> apply(Map<String, Object> config) {
return new KafkaConsumer<>(config);
}
Expand Down Expand Up @@ -627,6 +638,7 @@ public List<UnboundedKafkaSource<K, V>> generateInitialSplits(
}

Collections.sort(partitions, new Comparator<TopicPartition>() {
@Override
public int compare(TopicPartition tp1, TopicPartition tp2) {
return ComparisonChain
.start()
Expand Down Expand Up @@ -750,6 +762,7 @@ private static class UnboundedKafkaReader<K, V> extends UnboundedReader<KafkaRec
/** watermark before any records have been read. */
private static Instant initialWatermark = new Instant(Long.MIN_VALUE);

@Override
public String toString() {
return name;
}
Expand Down Expand Up @@ -800,13 +813,14 @@ synchronized long approxBacklogInBytes() {
public UnboundedKafkaReader(
UnboundedKafkaSource<K, V> source,
@Nullable KafkaCheckpointMark checkpointMark) {

this.consumerSpEL = new ConsumerSpEL();
this.source = source;
this.name = "Reader-" + source.id;

List<TopicPartition> partitions = source.spec.getTopicPartitions();
partitionStates = ImmutableList.copyOf(Lists.transform(partitions,
new Function<TopicPartition, PartitionState>() {
@Override
public PartitionState apply(TopicPartition tp) {
return new PartitionState(tp, UNINITIALIZED_OFFSET);
}
Expand Down Expand Up @@ -886,7 +900,6 @@ private void nextBatch() {

@Override
public boolean start() throws IOException {
this.consumerSpEL = new ConsumerSpEL();
Read<K, V> spec = source.spec;
consumer = spec.getConsumerFactoryFn().apply(spec.getConsumerConfig());
consumerSpEL.evaluateAssign(consumer, spec.getTopicPartitions());
Expand All @@ -909,6 +922,7 @@ public boolean start() throws IOException {
// Note that consumer is not thread safe, should not be accessed out side consumerPollLoop().
consumerPollThread.submit(
new Runnable() {
@Override
public void run() {
consumerPollLoop();
}
Expand All @@ -929,6 +943,7 @@ public void run() {

offsetFetcherThread.scheduleAtFixedRate(
new Runnable() {
@Override
public void run() {
updateLatestOffsets();
}
Expand Down Expand Up @@ -986,6 +1001,7 @@ public boolean advance() throws IOException {
rawRecord.topic(),
rawRecord.partition(),
rawRecord.offset(),
consumerSpEL.getRecordTimestamp(rawRecord),
decode(rawRecord.key(), source.spec.getKeyCoder()),
decode(rawRecord.value(), source.spec.getValueCoder()));

Expand Down Expand Up @@ -1059,6 +1075,7 @@ public CheckpointMark getCheckpointMark() {
return new KafkaCheckpointMark(ImmutableList.copyOf(// avoid lazy (consumedOffset can change)
Lists.transform(partitionStates,
new Function<PartitionState, PartitionMark>() {
@Override
public PartitionMark apply(PartitionState p) {
return new PartitionMark(p.topicPartition.topic(),
p.topicPartition.partition(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.io.Serializable;
import java.util.Arrays;

import org.apache.beam.sdk.values.KV;

/**
Expand All @@ -31,25 +32,28 @@ public class KafkaRecord<K, V> implements Serializable {
private final int partition;
private final long offset;
private final KV<K, V> kv;
private final long timestamp;

public KafkaRecord(
String topic,
int partition,
long offset,
long timestamp,
K key,
V value) {
this(topic, partition, offset, KV.of(key, value));
this(topic, partition, offset, timestamp, KV.of(key, value));
}

public KafkaRecord(
String topic,
int partition,
long offset,
long timestamp,
KV<K, V> kv) {

this.topic = topic;
this.partition = partition;
this.offset = offset;
this.timestamp = timestamp;
this.kv = kv;
}

Expand All @@ -69,9 +73,13 @@ public KV<K, V> getKV() {
return kv;
}

public long getTimestamp() {
return timestamp;
}

@Override
public int hashCode() {
return Arrays.deepHashCode(new Object[]{topic, partition, offset, kv});
return Arrays.deepHashCode(new Object[]{topic, partition, offset, timestamp, kv});
}

@Override
Expand All @@ -82,6 +90,7 @@ public boolean equals(Object obj) {
return topic.equals(other.topic)
&& partition == other.partition
&& offset == other.offset
&& timestamp == other.timestamp
&& kv.equals(other.kv);
} else {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.List;

import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.KvCoder;
Expand Down Expand Up @@ -66,6 +68,7 @@ public void encode(KafkaRecord<K, V> value, OutputStream outStream, Context cont
stringCoder.encode(value.getTopic(), outStream, nested);
intCoder.encode(value.getPartition(), outStream, nested);
longCoder.encode(value.getOffset(), outStream, nested);
longCoder.encode(value.getTimestamp(), outStream, nested);
kvCoder.encode(value.getKV(), outStream, context);
}

Expand All @@ -77,6 +80,7 @@ public KafkaRecord<K, V> decode(InputStream inStream, Context context)
stringCoder.decode(inStream, nested),
intCoder.decode(inStream, nested),
longCoder.decode(inStream, nested),
longCoder.decode(inStream, nested),
kvCoder.decode(inStream, context));
}

Expand Down Expand Up @@ -106,6 +110,7 @@ public Object structuralValue(KafkaRecord<K, V> value) throws Exception {
value.getTopic(),
value.getPartition(),
value.getOffset(),
value.getTimestamp(),
(KV<Object, Object>) kvCoder.structuralValue(value.getKV()));
}
}
Expand Down

0 comments on commit f10509e

Please sign in to comment.