-
Notifications
You must be signed in to change notification settings - Fork 13.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-8982][E2E Tests] Add test for known failure of queryable state #5807
[FLINK-8982][E2E Tests] Add test for known failure of queryable state #5807
Conversation
b177a5a
to
1b77d82
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @florianschmidt1994 ! Thanks for the work. I had some initial comments. Please integrate them and I will keep on looking at this PR.
|
||
private EmailId emailId; | ||
|
||
// public void setTimestamp(Instant timestamp) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove commented methods.
// this.timestamp = timestamp; | ||
// } | ||
|
||
//private Instant timestamp; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same here (remove commented field).
return emailId; | ||
} | ||
|
||
// //public Instant getTimestamp() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove.
|
||
public EmailInformation(Email email) { | ||
emailId = email.getEmailId(); | ||
// timestamp = email.getTimestamp(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove.
} | ||
EmailInformation that = (EmailInformation) o; | ||
return Objects.equals(emailId, that.emailId) && | ||
// Objects.equals(timestamp, that.timestamp) && |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove.
|
||
/** | ||
* Javadoc. | ||
*/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is just a placeholder comment for checkstyle verification to pass. Please write a real comment.
This holds also for other places.
|
||
/** | ||
* Javadoc. | ||
*/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here.
@@ -37,6 +37,14 @@ echo "Flink distribution directory: $FLINK_DIR" | |||
|
|||
EXIT_CODE=0 | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would recommend to move it to the nightly tests. Queryable state is not a core component and the normal builds are already timing out.
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); | ||
|
||
RocksDBStateBackend rocksDb = new RocksDBStateBackend("file:https:///tmp/deleteme-rocksdb"); | ||
env.setStateBackend(rocksDb); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The dir to checkpoint can be a parameter and here it should be a path in the TEST_DIR
of the test itself. In addition, everything should be explicitly cleaned up, e.g. checkpoints, potential output/input data, etc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also check for different backends, i.e. file and rocks. You can have a look to the test_ha.sh
.
Thanks! I addressed the changes in the last two commits |
69e4cd1
to
97ae2d2
Compare
final Instant timestamp = Instant.now().minus(Duration.ofDays(1L)); | ||
final String foo = String.format("foo #%d", r); | ||
final LabelSurrogate label = new LabelSurrogate(LabelSurrogate.Type.values()[r % types], "bar"); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have to take the checkpointLock
before emitting:
synchronized (ctx.getCheckpointLock()) {
ctx.collect(new Email(emailId, timestamp, foo, label));
}
<dependency> | ||
<groupId>org.apache.flink</groupId> | ||
<artifactId>flink-statebackend-rocksdb_2.11</artifactId> | ||
<version>1.6-SNAPSHOT</version> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the version should be ${project.version}
|
||
clean_stdout_files # to ensure there are no files accidentally left behind by previous tests | ||
link_queryable_state_lib | ||
start_ha_cluster |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We do not need an HA cluster. A normal cluster would be enough.
|
||
wait_job_running ${JOB_ID} | ||
|
||
sleep 20 # sleep a little to have some state accumulated |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of waiting, it is safer to ask through REST
for the successful checkpoints for the job right after killing the TM, and then expecting to see more successful checkpoints after the new TM is up. This is safer because it guarantees that the backend is initialized properly and can be done similarly to how it is done in the case of the test_ha.sh
.
start_and_wait_for_tm | ||
|
||
wait_job_running ${JOB_ID} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of just waiting for the job to be running, it is safer to ask through REST
for the successful checkpoints for the job right after killing the TM, and then expecting to see more successful checkpoints after the new TM is up. This is safer because it guarantees that the backend is initialized properly and can be done similarly to how it is done in the case of the test_ha.sh
.
|
||
private static final long serialVersionUID = -7286937645300388040L; | ||
|
||
private Random random; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The random
should be transient
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the work @florianschmidt1994 and also for discovering https://issues.apache.org/jira/browse/FLINK-9336.
I had some comments that I could also integrate when merging, but I would feel more comfortable merging this PR after #5993 is in.
05ea391
to
2454080
Compare
32eeb15
to
ec6d311
Compare
@kl0u can you have a look at this? |
Adds one test for normal execution and one with a TM failure scenario. This closes apache#5807
Add javadoc and remove dead code Move from precommit to nightly tests Add parameters for state backend and tmp dir Remove dead code and add option for num iterations Add utility functions to queryable state Refactor test_queryable_state Minor fixed and improvements to common.sh Some fixes Add a test case where a TM gets restarted Some code cleanup K comments Better naming for queryable state producer and consumer Address some PR comments Wait for checkpoints instead of sleep cleanup log files iterm
ec6d311
to
ddf5163
Compare
Adds one test for normal execution and one with a TM failure scenario. This closes apache#5807
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the work @florianschmidt1994 !
I will merge as soon as Travis gives green light.
Adds one test for normal execution and one with a TM failure scenario. This closes apache#5807.
What is the purpose of the change
Add an end-to-end test to verify that the changes that @kl0u introduced in #5691 fix a known issue with concurrent access to queryable state, by verifying that access to queryable state works as expected.
Brief change log
Verifying this change
This change added tests and can be verified as follows:
./run-pre-commit-tests.sh
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: noDocumentation