Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-10800 Enhance the test for validation when the state machine creates a snapshot #10593

Merged
merged 22 commits into from
Oct 26, 2021

Conversation

feyman2016
Copy link
Contributor

@feyman2016 feyman2016 commented Apr 25, 2021

In general, there are two ways of creating a snapshot. One is by the state machine through RaftClient::createSnapshot and SnapshotWriter, this PR mainly adds validation for this case. Another way is by the KafkaRaftClient itself downloading the snapshot from the quorum leader. In the second case, we want to trust the leader's snapshot and not perform the validation.

More detailed description of your change,
if necessary. The PR title and PR message become
the squashed commit message, so use a separate
comment to ping reviewers.

Summary of testing strategy (including rationale)
for the feature or bug fix. Unit and/or integration
tests are expected for any behaviour change and
system tests should be considered for larger changes.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@feyman2016 feyman2016 closed this Apr 25, 2021
@feyman2016 feyman2016 reopened this Apr 25, 2021
@@ -96,7 +96,7 @@ public BatchBuilder(
}

/**
* Append a record to this patch. The caller must first verify there is room for the batch
* Append a record to this batch. The caller must first verify there is room for the batch
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Side fix

@@ -38,7 +38,7 @@

@Test
public void testWritingSnapshot() throws IOException {
OffsetAndEpoch id = new OffsetAndEpoch(10L, 3);
OffsetAndEpoch id = new OffsetAndEpoch(0L, 1);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The highwatermark here is 1, so we need to make the snapshotId's endOffset < 1.

assertThrows(KafkaException.class, () -> context.client.createSnapshot(invalidSnapshotId3));
}

private void advanceHighWatermark(RaftClientTestContext context,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extract the functionality to avoid duplicate

Copy link
Member

@jsancio jsancio Apr 25, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for cleaning up the code duplication.

@@ -2268,6 +2269,20 @@ private Long append(int epoch, List<T> records, boolean isAtomic) {
);
}

private void validateSnapshotId(OffsetAndEpoch snapshotId) {
Optional<LogOffsetMetadata> highWatermarkOpt = quorum().highWatermark();
if (!highWatermarkOpt.isPresent() || highWatermarkOpt.get().offset <= snapshotId.offset) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Conceptually, the snapshotId.offset=highWatermarkOpt.get().offset is ok, because the record at snapshotId.offset is not included in the snapshot, but I'm not sure if there are other restrictions because in the Jira description, it says: The end offset and epoch of the snapshot is less than the high-watermark, please kindly advice @jsancio

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are correct. I think when I created the Jira I overlooked that both snapshot id's end offset and the high-watermark are exclusive values. Update the Jira's description.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you, I updated the PR as well

@feyman2016
Copy link
Contributor Author

@jsancio Could you please help to review? Thanks!
Locally verified, all the failed tests should not be related. See failed tests in https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-10593/2/testReport/?cloudbees-analytics-link=scm-reporting%2Ftests%2Ffailed

Copy link
Member

@jsancio jsancio left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR @feyman2016 . Partial review, I wanted to give you early feedback on the PR.

@@ -2268,6 +2269,20 @@ private Long append(int epoch, List<T> records, boolean isAtomic) {
);
}

private void validateSnapshotId(OffsetAndEpoch snapshotId) {
Optional<LogOffsetMetadata> highWatermarkOpt = quorum().highWatermark();
if (!highWatermarkOpt.isPresent() || highWatermarkOpt.get().offset <= snapshotId.offset) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are correct. I think when I created the Jira I overlooked that both snapshot id's end offset and the high-watermark are exclusive values. Update the Jira's description.

if (snapshotId.epoch > leaderEpoch) {
throw new KafkaException("Trying to creating snapshot with snapshotId: " + snapshotId + " whose epoch is" +
" larger than the current leader epoch: " + leaderEpoch);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the Jira:

The end offset and epoch of the snapshot is valid based on the leader epoch cache.

How about also validating against the leader epoch cache? See https://github.com/apache/kafka/blob/trunk/raft/src/main/java/org/apache/kafka/raft/ReplicatedLog.java#L124. This is important because both the snapshot and the leader epoch cache are used to validate offsets. See https://github.com/apache/kafka/blob/trunk/raft/src/main/java/org/apache/kafka/raft/ReplicatedLog.java#L85. The term leader epoch cache comes the variable name leaderEpochCache used in kafka.log.Log.

Copy link
Contributor Author

@feyman2016 feyman2016 Apr 26, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, previous I thought the quorum epoch is the leader epoch cache as a mistake~
Updated the PR, in the jira, one thing I'm not sure about is that:

  1. The epoch of the snapshot is equal to the quorum epoch.

I think the snapshotId's epoch <= quorum epoch should be fine?

Copy link
Member

@jsancio jsancio Apr 26, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, This is not strictly required for correctness. Oh, I see, the check in 2280 is checking that the epoch > current epoch.

I mistakenly read it as epoch != current epoch. If we perform this check we are basically saying that the caller of createSnapshot needs to catch up to the current quorum epoch before it can generate a snapshot. Yes, I think epoch <= quorum epoch is fine. Let me think about it and I'll update the Jira.

assertThrows(KafkaException.class, () -> context.client.createSnapshot(invalidSnapshotId3));
}

private void advanceHighWatermark(RaftClientTestContext context,
Copy link
Member

@jsancio jsancio Apr 25, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for cleaning up the code duplication.

@feyman2016
Copy link
Contributor Author

feyman2016 commented Apr 26, 2021

@jsancio Thanks for the review! I updated the PR, and the failed tests should be unrelated, thanks!

Copy link
Member

@jsancio jsancio left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The change is looking good in general. Just a few suggestions.

if (!highWatermarkOpt.isPresent() || highWatermarkOpt.get().offset < snapshotId.offset) {
throw new KafkaException("Trying to creating snapshot with invalid snapshotId: " + snapshotId + " whose offset is larger than the high-watermark: " +
highWatermarkOpt + ". This may necessarily mean a bug in the caller, since the there should be a minimum " +
"size of records between the latest snapshot and the high-watermark when creating snapshot");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you mean by ", since the there should be a minimum size of records between the latest snapshot and the high-watermark when creating snapshot"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on https://cwiki.apache.org/confluence/display/KAFKA/KIP-630%3A+Kafka+Raft+Snapshot#KIP630:KafkaRaftSnapshot-WhentoSnapshot,

  1. metadata.snapshot.min.new_records.size is the minimum size of the records in the replicated log between the latest snapshot and then high-watermark.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. I would remove that last sentence because I don\t think this check enforces that and I don't think that it should.

}
OffsetAndEpoch endOffsetAndEpoch = log.endOffsetForEpoch(snapshotId.epoch);
if (endOffsetAndEpoch.epoch != snapshotId.epoch || endOffsetAndEpoch.offset < snapshotId.offset) {
throw new KafkaException("Trying to creating snapshot with invalid snapshotId: " + snapshotId +
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's use InvalidArgumentException instead of KafkaException for all of the exceptions thrown in this method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will fix, but I assume you want IllegalArgumentException instead of InvalidArgumentException?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. IllegalArgumentException.

assertThrows(KafkaException.class, () -> context.client.createSnapshot(invalidSnapshotId1));

// 1.2 high watermark must larger than or equal to the snapshotId's endOffset
advanceHighWatermark(context, currentEpoch, currentEpoch, otherNodeId, localId);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would append a couple of batches after advancing the high-watermark. At this point the HWM equals the LEO.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sense, fixed

assertThrows(KafkaException.class, () -> context.client.createSnapshot(invalidSnapshotId3));

// 3 the snapshotId should be validated against endOffsetForEpoch
OffsetAndEpoch endOffsetForEpoch = context.log.endOffsetForEpoch(currentEpoch);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's use epoch instead of currentEpoch. Since we are using currentEpoch, the endOffsetForEpoch.offset will equal the LEO. If you instead use epoch then the endOffsetForEpoch.offset. will be 4 which is less than the LEO (5).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

@@ -1335,6 +1313,57 @@ public void testFetchSnapshotRequestClusterIdValidation() throws Exception {
context.assertSentFetchSnapshotResponse(Errors.INCONSISTENT_CLUSTER_ID);
}

@Test
public void testCreateSnapshotWithInvalidSnapshotId() throws Exception {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test is checking the snapshotId when the KafkaRaftClient is the leader. Let's have a similar test but when the KafkaRaftClient is a follower. Take a look at KafkaRaftClient::testEmptyRecordSetInFetchResponse for a simple example of how you can advance the high-watermark on the follower.

Note that the followers don't need to wait for the "high-watermark" to reach the current epoch to set (and advance) the high-watermark.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added the testcase testCreateSnapshotAsFollowerWithInvalidSnapshotId

@@ -38,7 +38,7 @@

@Test
public void testWritingSnapshot() throws IOException {
OffsetAndEpoch id = new OffsetAndEpoch(10L, 3);
OffsetAndEpoch id = new OffsetAndEpoch(1L, 1);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This applies to all of the tests in the suite.

Do you know why the context.client.createSnapshot doesn't throw an exception in these tests? Shouldn't the high-watermark be unknown (Optional.empty) in all of these cases?

Copy link
Contributor Author

@feyman2016 feyman2016 Apr 30, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I checked, it's because the client become leader and append one leaderChangeMessage, so the high-watermark is LogOffsetMetadata(offset=1, metadata=Optional[MockOffsetMetadata(id=1)]) , detailed stacktrace as below:

	  at org.apache.kafka.raft.KafkaRaftClient.appendAsLeader(KafkaRaftClient.java:1162)
	  at org.apache.kafka.raft.KafkaRaftClient.appendLeaderChangeMessage(KafkaRaftClient.java:450)
	  at org.apache.kafka.raft.KafkaRaftClient.onBecomeLeader(KafkaRaftClient.java:406)
	  at org.apache.kafka.raft.KafkaRaftClient.maybeTransitionToLeader(KafkaRaftClient.java:464)
	  at org.apache.kafka.raft.KafkaRaftClient.onBecomeCandidate(KafkaRaftClient.java:473)
	  at org.apache.kafka.raft.KafkaRaftClient.transitionToCandidate(KafkaRaftClient.java:493)
	  at org.apache.kafka.raft.KafkaRaftClient.initialize(KafkaRaftClient.java:375)
	  at org.apache.kafka.raft.RaftClientTestContext$Builder.build(RaftClientTestContext.java:231)
	  at org.apache.kafka.snapshot.SnapshotWriterTest.testWritingSnapshot(SnapshotWriterTest.java:43)
	  

Copy link
Contributor Author

@feyman2016 feyman2016 Apr 30, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops, merged the latest trunk, and it failed because the high-watermark is unknown (Optional.empty), I need to update it

@@ -168,12 +179,12 @@ Builder withMemoryPool(MemoryPool pool) {
return this;
}

Builder withAppendLingerMs(int appendLingerMs) {
public Builder withAppendLingerMs(int appendLingerMs) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mark these methods as public to be reused in SnapshotWriterTest, in SnapshotWriterTest, it need a way to advance the high watermark of the snapshot validation won't be passed

@@ -108,6 +108,17 @@

private final List<RaftResponse.Outbound> sentResponses = new ArrayList<>();

public static void advanceHighWatermark(RaftClientTestContext context,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method will be called in both KafkaRaftClientSnapshotTest and SnapshotWriterTest, so mark it as public

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about making it a object/instance method by removing the static keyword so that the user can use it as follow:

context.advanceLeaderHighWatermarkToEndOffset()

Note that I renamed the method to include leader and removed all of the parameters. I could be wrong but RaftClientTestContext should have all of the information it needs to implement this. Also note, that if you want to generalize this then majority of the nodes need to send a FETCH request to the leader for the leader to advance the high-watermark.

@feyman2016
Copy link
Contributor Author

@jsancio Hi, I addressed the comments and the failed tests should not be related

@feyman2016
Copy link
Contributor Author

feyman2016 commented May 5, 2021

@jsancio Hi, could you help to take a look? Thanks!

Copy link
Member

@jsancio jsancio left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apologies for taking so long to review this. Thanks for the changes and the detailed tests. I think we are close. Just a few more comments.

if (!highWatermarkOpt.isPresent() || highWatermarkOpt.get().offset < snapshotId.offset) {
throw new KafkaException("Trying to creating snapshot with invalid snapshotId: " + snapshotId + " whose offset is larger than the high-watermark: " +
highWatermarkOpt + ". This may necessarily mean a bug in the caller, since the there should be a minimum " +
"size of records between the latest snapshot and the high-watermark when creating snapshot");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. I would remove that last sentence because I don\t think this check enforces that and I don't think that it should.

@@ -108,6 +108,17 @@

private final List<RaftResponse.Outbound> sentResponses = new ArrayList<>();

public static void advanceHighWatermark(RaftClientTestContext context,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about making it a object/instance method by removing the static keyword so that the user can use it as follow:

context.advanceLeaderHighWatermarkToEndOffset()

Note that I renamed the method to include leader and removed all of the parameters. I could be wrong but RaftClientTestContext should have all of the information it needs to implement this. Also note, that if you want to generalize this then majority of the nodes need to send a FETCH request to the leader for the leader to advance the high-watermark.

@@ -135,4 +141,18 @@ public static void assertSnapshot(List<List<String>> batches, SnapshotReader<Str

assertEquals(expected, actual);
}

private RaftClientTestContext initContextAsLeaderAndAdvanceHighWatermark(OffsetAndEpoch snapshotId) throws Exception {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay but note that if the quorum is a single node quorum then the high-watermark should advance simply by context.client.poll(). You can simplify this if you want by making the quorum a single member quorum. See the test in KafkaRaftClientTest.testLeaderAppendSingleMemberQuorum().

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool, updated :)

@feyman2016
Copy link
Contributor Author

@jsancio Addressed the comments but just found that I would need to generalize context.advanceLeaderHighWatermarkToEndOffset (), will update later~

@jsancio
Copy link
Member

jsancio commented May 27, 2021

@jsancio Addressed the comments but just found that I would need to generalize context.advanceLeaderHighWatermarkToEndOffset (), will update later~

I needed something similar in a PR I am currently working and this is what I have if you want to adapt it to your PR:

public void advanceLocalLeaderHighWatermarkToLogEndOffset() throws InterruptedException {
    assertEquals(localId, currentLeader());
    long localLogEndOffset = log.endOffset().offset;
    Set<Integer> followers = voters.stream().filter(voter -> voter != localId.getAsInt()).collect(Collectors.toSet());

    // Send a request from every follower
    for (int follower : followers) {
        deliverRequest(
            fetchRequest(currentEpoch(), follower, localLogEndOffset, currentEpoch(), 0)
        );
        pollUntilResponse();
        assertSentFetchPartitionResponse(Errors.NONE, currentEpoch(), localId);
    }

    pollUntil(() -> OptionalLong.of(localLogEndOffset).equals(client.highWatermark()));
}

Copy link
Member

@jsancio jsancio left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR is very close. We just need to improve the method for advancing the high-watermark. @mumrah can you take a look at this PR?

pollUntilResponse();
assertSentFetchPartitionResponse(Errors.NONE, currentEpoch(), localId);
assertEquals(log.endOffset().offset, client.highWatermark().getAsLong());
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I needed something similar in a PR I am currently working and this is what I have if you want to adapt it to your PR:

public void advanceLocalLeaderHighWatermarkToLogEndOffset() throws InterruptedException {
    assertEquals(localId, currentLeader());
    long localLogEndOffset = log.endOffset().offset;
    Set<Integer> followers = voters.stream().filter(voter -> voter != localId.getAsInt()).collect(Collectors.toSet());

    // Send a request from every follower
    for (int follower : followers) {
        deliverRequest(
            fetchRequest(currentEpoch(), follower, localLogEndOffset, currentEpoch(), 0)
        );
        pollUntilResponse();
        assertSentFetchPartitionResponse(Errors.NONE, currentEpoch(), localId);
    }

    pollUntil(() -> OptionalLong.of(localLogEndOffset).equals(client.highWatermark()));
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you, I copy-pasted yours.

@feyman2016
Copy link
Contributor Author

Thanks for your quick reply, call for review again @jsancio :)
The failed test: org.apache.kafka.connect.integration.BlockingConnectorTest.testBlockInSinkTaskStop is not related

Copy link
Member

@jsancio jsancio left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Thanks for the changes.

@feyman2016
Copy link
Contributor Author

Are there any concerns with multi-threading here? I think the answer is "no". Any thoughts @feyman2016 or @jsancio?

Good point. There may be some concurrency issue with createSnapshot. How about moving the validation to onSnapshotFrozen?

This conversation also reminded me to create this issue: https://issues.apache.org/jira/browse/KAFKA-12873. It is slightly related to this point.

@jsancio Can you share more details about the possible concurrency scenario with createSnapshot ? BTW, will moving the validation to onSnapshotFrozen imply that before creating the snapshot, there's no validation? I think maybe we can keep the validation here and add some additional check before freeze() which makes the snapshot visible?

@feyman2016
Copy link
Contributor Author

@mumrah Thanks for the feedback, just addressed, but the potential concurrency issue needs more discussion~

@jsancio
Copy link
Member

jsancio commented Jun 10, 2021

@jsancio Can you share more details about the possible concurrency scenario with createSnapshot ? BTW, will moving the validation to onSnapshotFrozen imply that before creating the snapshot, there's no validation? I think maybe we can keep the validation here and add some additional check before freeze() which makes the snapshot visible?

@feyman2016, I think it is reasonable to do both. Validate when createSnapshot is called and validate again in onSnapshotFrozen. In both cases this validation should be optional. Validate if it is created through RaftClient.createSnapshot. Don't validate if KafkaRaftClient creates the snapshot internally because of a FetchResponse from the leader.

I have been working on a PR related to this if you want to take a look: #10786. It would be nice to get your PR merged before my PR.

@feyman2016
Copy link
Contributor Author

feyman2016 commented Jun 12, 2021

I have been working on a PR related to this if you want to take a look: #10786. It would be nice to get your PR merged before my PR.

@jsancio Thanks for the reply, now I understand the background. I think the optional validation for when createSnapshot is called has been covered in this PR, while the case for onSnapshotFrozen has not been covered yet. Looking at the code, onSnapshotFrozen is called byFileRawSnapshotWriter#freeze which may be called by KafkaRaftClient or SnapshotWriter, if we want to do the optional validation, I assume a KIP is needed for tweaking RawSnapshotWriter#freeze() as sth like RawSnapshotWriter#freeze(Origin origin), based on the origin, it can decide whether validation is needed. But since this is PR is preferable to be merged before #10786 , would it take too much time to go through the KIP and contain the change in this PR? Or may be we can make a seperate PR to handle the validation for onSnapshotFrozen?

@jsancio
Copy link
Member

jsancio commented Jun 14, 2021

I assume a KIP is needed.

We don't need a KIP. All of these API are internal APIs that are not accessible/publish to projects external to Apache Kafka.

@jsancio
Copy link
Member

jsancio commented Jun 14, 2021

Or may be we can make a seperate PR to handle the validation for onSnapshotFrozen?

I am okay with fixing this in a future PR. Do you want to go ahead and file an sub-task for https://issues.apache.org/jira/browse/KAFKA-10310 and link it here?

@feyman2016
Copy link
Contributor Author

Or may be we can make a seperate PR to handle the validation for onSnapshotFrozen?

I am okay with fixing this in a future PR. Do you want to go ahead and file an sub-task for https://issues.apache.org/jira/browse/KAFKA-10310 and link it here?

I created https://issues.apache.org/jira/browse/KAFKA-12956 to track it, thanks!

@feyman2016
Copy link
Contributor Author

feyman2016 commented Jun 17, 2021

Just resolved conflicts, call for review now @jsancio

new OffsetAndEpoch(committedOffset + 1, committedEpoch)
).map(snapshot -> {
final OffsetAndEpoch snapshotId = new OffsetAndEpoch(committedOffset + 1, committedEpoch);
validateSnapshotId(snapshotId);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@feyman2016 Take a look at the implementation for KafkaMetadataLog::createNewSnapshot in trunk. I think that function now implement this check.

The check that it doesn't do explicitly is https://github.com/apache/kafka/pull/10593/files#diff-1da15c51e641ea46ea5c86201ab8f21cfee9e7c575102a39c7bae0d5ffd7de39R2281-R2285

I think it is okay to skip this check because the quorum epoch is always greater than or equal to the largest epoch in the log. I think we already have this check in createNewSnapshot that compares the snapshot id against the log: https://github.com/apache/kafka/pull/10593/files#diff-1da15c51e641ea46ea5c86201ab8f21cfee9e7c575102a39c7bae0d5ffd7de39R2286-R2290

Please confirm but I think we can remove this check since we have something similar in createNewSnapshot in trunk. I should keep your improvement to the tests.

Apologies once again for coding and merging this functionality but I needed this changes as part of #10786

Copy link
Contributor Author

@feyman2016 feyman2016 Jun 18, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jsancio Totally understood, I just confirmed the validation logic has been covered already as you mentioned, so just kept the test and updated the PR name

@feyman2016 feyman2016 changed the title KAFKA-10800 Validate the snapshot id when the state machine creates a snapshot KAFKA-10800 Enhance the test for validation when the state machine creates a snapshot Jun 18, 2021
Copy link
Member

@jsancio jsancio left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@feyman2016
Copy link
Contributor Author

@jsancio Thanks!
@mumrah Hi, could you kindly help to review and merge? Thanks!

Copy link
Member

@jsancio jsancio left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

Comment on lines 1606 to 1608
context.fetchResponse(epoch, otherNodeId, batch2, 4L, Errors.NONE));
context.client.poll();
assertEquals(4L, context.client.highWatermark().getAsLong());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is minor but so we don't confuse future readers of this code, I think the watermark is suppose to be 6L instead of 4L. The high watermark should always be at batch boundaries.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like we should use 3L instead. The leader would not have been able to advance the high watermark past the fetch offset of 3L.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right @hachikuji . For line 1597 to be true, I think the test needs to do another round of fetch.

// The high watermark advances to be larger than log.endOffsetForEpoch(3), to test the case 3

Line 1614 wants to fail because of an invalid offset and epoch based on the leader epoch cache. Not because it is greater than the high watermark.

assertThrows(IllegalArgumentException.class, () -> context.client.createSnapshot(invalidSnapshotId4.offset, invalidSnapshotId4.epoch));

Copy link

@hachikuji hachikuji left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! Note I pushed a small change to address @jsancio's minor comment. Thanks @feyman2016 for the contribution.

Copy link
Member

@jsancio jsancio left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@hachikuji hachikuji merged commit 82d5e1c into apache:trunk Oct 26, 2021
@feyman2016
Copy link
Contributor Author

@hachikuji @jsancio Just noticed this has been merged, thanks a lot for the review and also the commit!

xdgrulez pushed a commit to xdgrulez/kafka that referenced this pull request Dec 22, 2021
…reates a snapshot (apache#10593)

This patch adds additional test cases covering the validations done when snapshots are created by the state machine.

Reviewers: José Armando García Sancio <[email protected]>, Jason Gustafson <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants