Skip to content

Commit

Permalink
[FLINK-9691] [kinesis] Modify runloop to try to track a particular ge…
Browse files Browse the repository at this point in the history
…tRecords() frequency.

This closes apache#6290
  • Loading branch information
Jamie Grier authored and StephanEwen committed Jul 10, 2018
1 parent abfdc1a commit 7953a2e
Showing 1 changed file with 7 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ public void run() {
}
}

long lastTimeNanos = 0;
while (isRunning()) {
if (nextShardItr == null) {
fetcherRef.updateState(subscribedShardStateIndex, SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get());
Expand All @@ -207,7 +208,12 @@ public void run() {
break;
} else {
if (fetchIntervalMillis != 0) {
Thread.sleep(fetchIntervalMillis);
long elapsedTimeNanos = System.nanoTime() - lastTimeNanos;
long sleepTimeMillis = fetchIntervalMillis - (elapsedTimeNanos / 1_000_000);
if (sleepTimeMillis > 0) {
Thread.sleep(sleepTimeMillis);
}
lastTimeNanos = System.nanoTime();
}

GetRecordsResult getRecordsResult = getRecords(nextShardItr, maxNumberOfRecordsPerFetch);
Expand Down

0 comments on commit 7953a2e

Please sign in to comment.