Skip to content

Commit

Permalink
[FLINK-16393][kinesis] Skip record emitter thread creation w/o source…
Browse files Browse the repository at this point in the history
… sync
  • Loading branch information
tweise committed Mar 4, 2020
1 parent b098ce5 commit de5f8fa
Showing 1 changed file with 19 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -490,29 +490,31 @@ public void runFetcher() throws Exception {
getConsumerConfiguration().getProperty(ConsumerConfigConstants.WATERMARK_LOOKAHEAD_MILLIS,
Long.toString(0)));
recordEmitter.setMaxLookaheadMillis(Math.max(lookaheadMillis, watermarkSyncMillis * 3));

// record emitter depends on periodic watermark
// it runs in a separate thread since main thread is used for discovery
Runnable recordEmitterRunnable = new Runnable() {
@Override
public void run() {
try {
recordEmitter.run();
} catch (Throwable error) {
// report the error that terminated the emitter loop to source thread
stopWithError(error);
}
}
};

Thread thread = new Thread(recordEmitterRunnable);
thread.setName("recordEmitter-" + runtimeContext.getTaskNameWithSubtasks());
thread.setDaemon(true);
thread.start();
}
}
this.shardIdleIntervalMillis = Long.parseLong(
getConsumerConfiguration().getProperty(ConsumerConfigConstants.SHARD_IDLE_INTERVAL_MILLIS,
Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_IDLE_INTERVAL_MILLIS)));

// run record emitter in separate thread since main thread is used for discovery
Runnable recordEmitterRunnable = new Runnable() {
@Override
public void run() {
try {
recordEmitter.run();
} catch (Throwable error) {
// report the error that terminated the emitter loop to source thread
stopWithError(error);
}
}
};

Thread thread = new Thread(recordEmitterRunnable);
thread.setName("recordEmitter-" + runtimeContext.getTaskNameWithSubtasks());
thread.setDaemon(true);
thread.start();
}

// ------------------------------------------------------------------------
Expand Down

0 comments on commit de5f8fa

Please sign in to comment.