-
Notifications
You must be signed in to change notification settings - Fork 13.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-7343] Simulate network failures in kafka at-least-once test #4470
Conversation
1be136f
to
4b5a7a2
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall very good work! I had just a few minor comments.
On a more high level comment, I noticed that now some boolean flags like secureMode
or hideBehindProxy
start to pile up in the KafkaTestEnvironmentImpl
that are also evaluated in different places and influence the behaviour of the code. I wonder if there could be a different way to abstract this configuration aspect out of the environment (setting up the properties, calculating proper timeouts, ...)? What do you think?
Nevertheless, this should be good to merge.
|
||
@Override | ||
public void exceptionCaught(ChannelHandlerContext context, ExceptionEvent event) throws Exception { | ||
event.getCause().printStackTrace(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suggest to replace the printStackTrace()
with logging.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ops, missed that when cleaning up a copied example code.
ChannelBuffer msg = (ChannelBuffer) event.getMessage(); | ||
Channel targetChannel = sourceToTargetChannels.get(event.getChannel()); | ||
if (targetChannel == null) { | ||
throw new IllegalStateException(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A cause message could be added.
@Override | ||
public void exceptionCaught(ChannelHandlerContext context, ExceptionEvent event) | ||
throws Exception { | ||
event.getCause().printStackTrace(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same suggestion to use logging over printStackTrace ()
.
We shouldn't fail KafkaServers directly, because they might not be able to flush the data. Since we don't want to test how well Kafka implements at-least-once/exactly-once semantic, we just simulate network failure between Flink and Kafka in our at-least-once tests.
4b5a7a2
to
7121a5d
Compare
Yes I also didn't like adding this new flag, but didn't have enough motivation to change it. I have done some refactoring extracting those dynamically set in However it helps only a little bit. Those tests would need a more comprehensive refactor in the future. I particularly don't like that this |
Merging. |
Please close the commit and the Jira issue. |
Thanks :) |
We shouldn't fail KafkaServers directly, because they might not be able to flush the data (
log.flush.interval.***
properties). Since we don't want to test how well Kafka implements at-least-once/exactly-once semantic, it is a better idea (and hopefully more reliable) to just simulate network failure between Flink and Kafka in our at-least-once tests. To achieve that I have implementedNetworkFailuresProxy
class.First two commits are from #4456