Skip to content

Commit

Permalink
[FLINK-4019][kinesis-connector] Use Kinesis records' approximateArriv…
Browse files Browse the repository at this point in the history
…alTimestamp

Used in the following:
1) Exposed through the KinesisDeserializationSchema for users to obtain
2) Attatched to records as the default event time

This closes apache#2214
  • Loading branch information
tzulitai authored and rmetzger committed Jul 9, 2016
1 parent 6c5eebb commit 68277a4
Show file tree
Hide file tree
Showing 7 changed files with 71 additions and 9 deletions.
46 changes: 45 additions & 1 deletion docs/apis/streaming/connectors/kinesis.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ kinesisConsumerConfig.put(KinesisConfigConstants.CONFIG_STREAM_INIT_POSITION_TYP
StreamExecutionEnvironment env = StreamExecutionEnvironment.getEnvironment();

DataStream<String> kinesis = env.addSource(new FlinkKinesisConsumer<>(
"kinesis_stream_name", new SimpleStringSchema(), kinesisConsumerConfig))
"kinesis_stream_name", new SimpleStringSchema(), kinesisConsumerConfig));
{% endhighlight %}
</div>
<div data-lang="scala" markdown="1">
Expand Down Expand Up @@ -146,6 +146,50 @@ Also note that Flink can only restart the topology if enough processing slots ar
Therefore, if the topology fails due to loss of a TaskManager, there must still be enough slots available afterwards.
Flink on YARN supports automatic restart of lost YARN containers.

#### Event Time for Consumed Records

<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
{% endhighlight %}
</div>
<div data-lang="scala" markdown="1">
{% highlight scala %}
val env = StreamExecutionEnvironment.getExecutionEnvironment()
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
{% endhighlight %}
</div>
</div>

If streaming topologies choose to use the [event time notion]({{site.baseurl}}/apis/streaming/event_time.html) for record
timestamps, an *approximate arrival timestamp* will be used by default. This timestamp is attached to records by Kinesis once they
were successfully received and stored by streams. Note that this timestamp is typically referred to as a Kinesis server-side
timestamp, and there are no guarantees about the accuracy or order correctness (i.e., the timestamps may not always be
ascending).

Users can choose to override this default with a custom timestamp, as described [here]({{ site.baseurl }}/apis/streaming/event_timestamps_watermarks.html),
or use one from the [predefined ones]({{ site.baseurl }}/apis/streaming/event_timestamp_extractors.html). After doing so,
it can be passed to the consumer in the following way:

<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}
DataStream<String> kinesis = env.addSource(new FlinkKinesisConsumer<>(
"kinesis_stream_name", new SimpleStringSchema(), kinesisConsumerConfig));
kinesis = kinesis.assignTimestampsAndWatermarks(new CustomTimestampAssigner());
{% endhighlight %}
</div>
<div data-lang="scala" markdown="1">
{% highlight scala %}
val kinesis = env.addSource(new FlinkKinesisConsumer[String](
"kinesis_stream_name", new SimpleStringSchema, kinesisConsumerConfig))
kinesis = kinesis.assignTimestampsAndWatermarks(new CustomTimestampAssigner)
{% endhighlight %}
</div>
</div>

#### Threading Model

The Flink Kinesis Consumer uses multiple threads for shard discovery and data consumption.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ public class KinesisDataFetcher<T> {
* <ul>
* <li>{@link KinesisDataFetcher#registerNewSubscribedShardState(KinesisStreamShardState)}</li>
* <li>{@link KinesisDataFetcher#updateState(int, SequenceNumber)}</li>
* <li>{@link KinesisDataFetcher#emitRecordAndUpdateState(T, int, SequenceNumber)}</li>
* <li>{@link KinesisDataFetcher#emitRecordAndUpdateState(T, long, int, SequenceNumber)}</li>
* </ul>
*/
private final List<KinesisStreamShardState> subscribedShardsState;
Expand Down Expand Up @@ -491,15 +491,16 @@ protected KinesisDeserializationSchema<T> getClonedDeserializationSchema() {
* This method is called by {@link ShardConsumer}s.
*
* @param record the record to collect
* @param recordTimestamp timestamp to attach to the collected record
* @param shardStateIndex index of the shard to update in subscribedShardsState;
* this index should be the returned value from
* {@link KinesisDataFetcher#registerNewSubscribedShardState(KinesisStreamShardState)}, called
* when the shard state was registered.
* @param lastSequenceNumber the last sequence number value to update
*/
protected void emitRecordAndUpdateState(T record, int shardStateIndex, SequenceNumber lastSequenceNumber) {
protected void emitRecordAndUpdateState(T record, long recordTimestamp, int shardStateIndex, SequenceNumber lastSequenceNumber) {
synchronized (checkpointLock) {
sourceContext.collect(record);
sourceContext.collectWithTimestamp(record, recordTimestamp);
updateState(shardStateIndex, lastSequenceNumber);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.amazonaws.services.kinesis.model.GetRecordsResult;
import com.amazonaws.services.kinesis.model.Record;
import com.amazonaws.services.kinesis.model.ShardIteratorType;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.connectors.kinesis.config.KinesisConfigConstants;
import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
Expand Down Expand Up @@ -185,6 +186,14 @@ private boolean isRunning() {
return !Thread.interrupted();
}

/**
* Deserializes a record for collection, and accordingly updates the shard state in the fetcher.
* Note that the server-side Kinesis timestamp is attached to the record when collected. When the
* user programs uses {@link TimeCharacteristic#EventTime}, this timestamp will be used by default.
*
* @param record
* @throws IOException
*/
private void deserializeRecordForCollectionAndUpdateState(UserRecord record)
throws IOException {
ByteBuffer recordData = record.getData();
Expand All @@ -194,17 +203,21 @@ private void deserializeRecordForCollectionAndUpdateState(UserRecord record)

byte[] keyBytes = record.getPartitionKey().getBytes();

final T value = deserializer.deserialize(keyBytes, dataBytes, subscribedShard.getStreamName(),
record.getSequenceNumber());
final long approxArrivalTimestamp = record.getApproximateArrivalTimestamp().getTime();

final T value = deserializer.deserialize(
keyBytes, dataBytes, subscribedShard.getStreamName(), record.getSequenceNumber(), approxArrivalTimestamp);

if (record.isAggregated()) {
fetcherRef.emitRecordAndUpdateState(
value,
approxArrivalTimestamp,
subscribedShardStateIndex,
new SequenceNumber(record.getSequenceNumber(), record.getSubSequenceNumber()));
} else {
fetcherRef.emitRecordAndUpdateState(
value,
approxArrivalTimestamp,
subscribedShardStateIndex,
new SequenceNumber(record.getSequenceNumber()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,11 @@ public interface KinesisDeserializationSchema<T> extends Serializable, ResultTyp
* @param recordValue the record's value as a byte array
* @param stream the name of the Kinesis stream that this record was sent to
* @param seqNum the sequence number of this record in the Kinesis shard
* @param approxArrivalTimestamp the server-side timestamp of when Kinesis received and stored the record
* @return the deserialized message as an Java object
* @throws IOException
*/
T deserialize(byte[] recordKey, byte[] recordValue, String stream, String seqNum) throws IOException;
T deserialize(byte[] recordKey, byte[] recordValue, String stream, String seqNum, long approxArrivalTimestamp) throws IOException;

/**
* Method to decide whether the element signals the end of the stream. If
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ public KinesisDeserializationSchemaWrapper(DeserializationSchema<T> deserializat
}

@Override
public T deserialize(byte[] recordKey, byte[] recordValue, String stream, String seqNu) throws IOException {
public T deserialize(byte[] recordKey, byte[] recordValue, String stream, String seqNum, long approxArrivalTimestamp)
throws IOException {
return deserializationSchema.deserialize(recordValue);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;

import java.nio.ByteBuffer;
import java.util.Date;
import java.util.HashMap;
import java.util.ArrayList;
import java.util.LinkedList;
Expand Down Expand Up @@ -139,6 +140,7 @@ public static List<Record> createRecordBatchWithRange(int min, int max) {
new Record()
.withData(ByteBuffer.wrap(String.valueOf(i).getBytes()))
.withPartitionKey(UUID.randomUUID().toString())
.withApproximateArrivalTimestamp(new Date(System.currentTimeMillis()))
.withSequenceNumber(String.valueOf(i)));
}
return batch;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ protected KinesisDeserializationSchema<String> getClonedDeserializationSchema()
}

@Override
protected void emitRecordAndUpdateState(String record, int shardStateIndex, SequenceNumber lastSequenceNumber) {
protected void emitRecordAndUpdateState(String record, long recordTimestamp, int shardStateIndex, SequenceNumber lastSequenceNumber) {
synchronized (fakeCheckpointLock) {
this.numElementsCollected++;
updateState(shardStateIndex, lastSequenceNumber);
Expand Down

0 comments on commit 68277a4

Please sign in to comment.