-
Notifications
You must be signed in to change notification settings - Fork 1.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Kernel][LogReplay] Make a single read request for all checkpoint files #2701
Conversation
f8e92d9
to
df6eacc
Compare
a0a53cf
to
844a719
Compare
844a719
to
83c9e69
Compare
@@ -378,6 +378,7 @@ class GoldenTables extends QueryTest with SharedSparkSession { | |||
|
|||
val commitInfoFile = CommitInfo( | |||
version = Some(0L), | |||
inCommitTimestamp = None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is broken on current master.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what does this mean?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there was a commit in delta-spark
that updated the argument list. Our current CI setup just runs the delta-spark
tests and not its dependencies. We need to revisit our CI trigger job to make sure all run tests for all dependent modules.
I can make a separate PR for this change if you would like.
kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/ActionsIterator.java
Show resolved
Hide resolved
@@ -149,23 +151,22 @@ private void tryEnsureNextActionsIterIsReady() { | |||
} | |||
|
|||
/** | |||
* Get the next file from `filesIter` (.json or .checkpoint.parquet), contextualize it | |||
* Get the next file from `filesList` (.json or .checkpoint.parquet), contextualize it |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we still contextualize?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No we don't. Updating the comment.
// We can not read multiple JSON files in parallel (like the checkpoint files), | ||
// because each one has a different version, and we need to associate the version | ||
// with actions read from the JSON file for further optimizations later on. | ||
|
||
final CloseableIterator<ColumnarBatch> dataIter = | ||
tableClient.getJsonHandler().readJsonFiles( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this an optimization that we should consider (not in this PR)? that the returned columnar batches has the file status of the file from which it was read? that would allow parallel json reads.
// Read that file | ||
// We can not read multiple JSON files in parallel (like the checkpoint files), | ||
// because each one has a different version, and we need to associate the version | ||
// with actions read from the JSON file for further optimizations later on. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: what optimizations need the version for each columnar batch?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is basically the faster metadata and protocol loading with a snapshot hint. The optimizations made for the Flink faster job start. Added in the comments.
kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/ActionsIterator.java
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/ActionsIterator.java
Outdated
Show resolved
Hide resolved
} | ||
|
||
// Remove the files from the list | ||
filesList.removeAll(checkpointFiles); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do you need removeAll when you are already doing filesList.pop()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not needed.
b5922b2
to
e586ef3
Compare
* $ build/sbt | ||
* sbt:delta> project kernelDefaults | ||
* sbt:delta> set fork in run := true | ||
* sbt:delta> test:runMain io.delta.kernel.defaults.benchmarks.BenchmarkParallelCheckpointReading |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you add the results you got here. so that its in the code rather than in the github PR which will be harder to find when browsing code.
add some specs of your machine as well
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just one comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor comments LGTM
FileStatus checkpointFile, | ||
long version) { | ||
|
||
// Filter out all the files that are not part of the same checkpoint |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What does this mean?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated the comment. It should have been: // Find the contiguous parquet files that are part of the same checkpoint
|
||
FileStatus peek = filesList.peek(); | ||
while (peek != null && | ||
peek.getPath().endsWith(".parquet") && |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we use FileNames.isCheckpointFile
here instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated here and the existing code.
...ts/src/test/java/io/delta/kernel/defaults/benchmarks/BenchmarkParallelCheckpointReading.java
Outdated
Show resolved
Hide resolved
ScanBuilder scanBuilder = snapshot.getScanBuilder(tableClient); | ||
Scan scan = scanBuilder.build(); | ||
|
||
Row row = scan.getScanState(tableClient); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unused?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is fine if it's to mimic the real case but could you add a comment explaining what we're doing in this block/the intention?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is just to mimic the real case. Added comments.
Description
Currently, the
kernel-api
reads one file (either checkpoint or commit file) at a time. Once the file is fully read, then the next file is read request is issued. This makes reading large checkpoints split over multiple files slower. Insteadkernel-api
could issue read requests for all checkpoint files at once (in case of multi-part checkpoints) using theParquetHandler.readParquetFiles
and let the implementations of theParquetHandler
prefetch or using multiple threads to read the checkpoint parts concurrently.This PR makes the change to
kernel-api
to issue one read request for all checkpoint files that need to be read for state reconstructions.Resolves #2668
Resolves #1965
How was this patch tested?
Existing tests and a benchmark with a test only parallel parquet reader. Here are the sample benchmark results with the test only parallel Parquet reader.
Score
tells the average time to construct the Delta table state.parallelReaderCount
indicates the number of parallel Parquet reading threads used.