Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-8982][E2E Tests] Add test for known failure of queryable state #5807

Conversation

florianschmidt1994
Copy link
Contributor

What is the purpose of the change

Add an end-to-end test to verify that the changes that @kl0u introduced in #5691 fix a known issue with concurrent access to queryable state, by verifying that access to queryable state works as expected.

Brief change log

  • Add flink app with queryable state the continuously updates mapstate
  • Add queryable state client that periodically queries map state
  • Add end-to-end test that runs client against app and verifies that no unexpected exceptions occur
  • Integrate end-to-end test in testsuite

Verifying this change

This change added tests and can be verified as follows:

  • Run ./run-pre-commit-tests.sh

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): no
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): no
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? no
  • If yes, how is the feature documented? not applicable

@florianschmidt1994 florianschmidt1994 force-pushed the end-to-end-tests-for-queryable-state branch 2 times, most recently from b177a5a to 1b77d82 Compare April 3, 2018 12:55
Copy link
Contributor

@kl0u kl0u left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @florianschmidt1994 ! Thanks for the work. I had some initial comments. Please integrate them and I will keep on looking at this PR.


private EmailId emailId;

// public void setTimestamp(Instant timestamp) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove commented methods.

// this.timestamp = timestamp;
// }

//private Instant timestamp;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here (remove commented field).

return emailId;
}

// //public Instant getTimestamp() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove.


public EmailInformation(Email email) {
emailId = email.getEmailId();
// timestamp = email.getTimestamp();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove.

}
EmailInformation that = (EmailInformation) o;
return Objects.equals(emailId, that.emailId) &&
// Objects.equals(timestamp, that.timestamp) &&
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove.


/**
* Javadoc.
*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just a placeholder comment for checkstyle verification to pass. Please write a real comment.
This holds also for other places.


/**
* Javadoc.
*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here.

@@ -37,6 +37,14 @@ echo "Flink distribution directory: $FLINK_DIR"

EXIT_CODE=0

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would recommend to move it to the nightly tests. Queryable state is not a core component and the normal builds are already timing out.

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

RocksDBStateBackend rocksDb = new RocksDBStateBackend("file:https:///tmp/deleteme-rocksdb");
env.setStateBackend(rocksDb);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The dir to checkpoint can be a parameter and here it should be a path in the TEST_DIR of the test itself. In addition, everything should be explicitly cleaned up, e.g. checkpoints, potential output/input data, etc.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also check for different backends, i.e. file and rocks. You can have a look to the test_ha.sh.

@florianschmidt1994
Copy link
Contributor Author

Thanks! I addressed the changes in the last two commits

@florianschmidt1994 florianschmidt1994 force-pushed the end-to-end-tests-for-queryable-state branch 2 times, most recently from 69e4cd1 to 97ae2d2 Compare April 24, 2018 09:36
final Instant timestamp = Instant.now().minus(Duration.ofDays(1L));
final String foo = String.format("foo #%d", r);
final LabelSurrogate label = new LabelSurrogate(LabelSurrogate.Type.values()[r % types], "bar");

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have to take the checkpointLock before emitting:

synchronized (ctx.getCheckpointLock()) {
	ctx.collect(new Email(emailId, timestamp, foo, label));
}

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_2.11</artifactId>
<version>1.6-SNAPSHOT</version>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the version should be ${project.version}


clean_stdout_files # to ensure there are no files accidentally left behind by previous tests
link_queryable_state_lib
start_ha_cluster
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do not need an HA cluster. A normal cluster would be enough.


wait_job_running ${JOB_ID}

sleep 20 # sleep a little to have some state accumulated
Copy link
Contributor

@kl0u kl0u May 11, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of waiting, it is safer to ask through REST for the successful checkpoints for the job right after killing the TM, and then expecting to see more successful checkpoints after the new TM is up. This is safer because it guarantees that the backend is initialized properly and can be done similarly to how it is done in the case of the test_ha.sh.

start_and_wait_for_tm

wait_job_running ${JOB_ID}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of just waiting for the job to be running, it is safer to ask through REST for the successful checkpoints for the job right after killing the TM, and then expecting to see more successful checkpoints after the new TM is up. This is safer because it guarantees that the backend is initialized properly and can be done similarly to how it is done in the case of the test_ha.sh.


private static final long serialVersionUID = -7286937645300388040L;

private Random random;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The random should be transient

Copy link
Contributor

@kl0u kl0u left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the work @florianschmidt1994 and also for discovering https://issues.apache.org/jira/browse/FLINK-9336.

I had some comments that I could also integrate when merging, but I would feel more comfortable merging this PR after #5993 is in.

@florianschmidt1994 florianschmidt1994 force-pushed the end-to-end-tests-for-queryable-state branch 4 times, most recently from 05ea391 to 2454080 Compare May 14, 2018 14:29
@florianschmidt1994 florianschmidt1994 force-pushed the end-to-end-tests-for-queryable-state branch 2 times, most recently from 32eeb15 to ec6d311 Compare June 15, 2018 13:20
@florianschmidt1994
Copy link
Contributor Author

@kl0u can you have a look at this?

kl0u pushed a commit to kl0u/flink that referenced this pull request Jun 21, 2018
Adds one test for normal execution and
one with a TM failure scenario.

This closes apache#5807
Add javadoc and remove dead code

Move from precommit to nightly tests

Add parameters for state backend and tmp dir

Remove dead code and add option for num iterations

Add utility functions to queryable state

Refactor test_queryable_state

Minor fixed and improvements to common.sh

Some fixes

Add a test case where a TM gets restarted

Some code cleanup

K comments

Better naming for queryable state producer and consumer

Address some PR comments

Wait for checkpoints instead of sleep

cleanup log files

iterm
@florianschmidt1994 florianschmidt1994 force-pushed the end-to-end-tests-for-queryable-state branch from ec6d311 to ddf5163 Compare June 21, 2018 09:48
kl0u pushed a commit to kl0u/flink that referenced this pull request Jun 21, 2018
Adds one test for normal execution and
one with a TM failure scenario.

This closes apache#5807
Copy link
Contributor

@kl0u kl0u left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the work @florianschmidt1994 !
I will merge as soon as Travis gives green light.

@asfgit asfgit closed this in 59a58e5 Jun 21, 2018
sampathBhat pushed a commit to sampathBhat/flink that referenced this pull request Jul 26, 2018
Adds one test for normal execution and
one with a TM failure scenario.

This closes apache#5807.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
3 participants