Skip to content

Commit

Permalink
[FLINK-16262][connectors/kafka] Set the context classloader for paral…
Browse files Browse the repository at this point in the history
…lel stream in FlinkKafkaProducer

This closes apache#11247
  • Loading branch information
guoweiM committed Mar 28, 2020
1 parent 4f6efed commit ff0d0c9
Showing 1 changed file with 18 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.NetUtils;
import org.apache.flink.util.TemporaryClassLoaderContext;

import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;

Expand Down Expand Up @@ -1095,18 +1096,24 @@ private void resetAvailableTransactionalIdsPool(Collection<String> transactional
// ----------------------------------- Utilities --------------------------

private void abortTransactions(final Set<String> transactionalIds) {
final ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
transactionalIds.parallelStream().forEach(transactionalId -> {
// don't mess with the original configuration or any other properties of the
// original object
// -> create an internal kafka producer on our own and do not rely on
// initTransactionalProducer().
final Properties myConfig = new Properties();
myConfig.putAll(producerConfig);
initTransactionalProducerConfig(myConfig, transactionalId);
try (FlinkKafkaInternalProducer<byte[], byte[]> kafkaProducer =
new FlinkKafkaInternalProducer<>(myConfig)) {
// it suffices to call initTransactions - this will abort any lingering transactions
kafkaProducer.initTransactions();
// The parallelStream executes the consumer in a separated thread pool.
// Because the consumer(e.g. Kafka) uses the context classloader to construct some class
// we should set the correct classloader for it.
try (TemporaryClassLoaderContext ignored = TemporaryClassLoaderContext.of(classLoader)) {
// don't mess with the original configuration or any other properties of the
// original object
// -> create an internal kafka producer on our own and do not rely on
// initTransactionalProducer().
final Properties myConfig = new Properties();
myConfig.putAll(producerConfig);
initTransactionalProducerConfig(myConfig, transactionalId);
try (FlinkKafkaInternalProducer<byte[], byte[]> kafkaProducer =
new FlinkKafkaInternalProducer<>(myConfig)) {
// it suffices to call initTransactions - this will abort any lingering transactions
kafkaProducer.initTransactions();
}
}
});
}
Expand Down

0 comments on commit ff0d0c9

Please sign in to comment.