Skip to content

Commit

Permalink
[Spark] Skip reading log entries beyond endOffset, if specified while…
Browse files Browse the repository at this point in the history
… getting file changes for CDC in streaming queries (#3110)

#### Which Delta project/connector is this regarding?
- [x] Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)

## Description
Skip reading log entries beyond endOffset, if specified while getting
file changes for CDC in streaming queries

## How was this patch tested?
Existing unit tests

Also verified using logs to ensure that additional Delta logs are not
read

```

24/05/16 01:21:01 INFO StateStore: StateStore stopped
Run completed in 54 seconds, 237 milliseconds.
Total number of tests run: 1
Suites: completed 1, aborted 0
Tests: succeeded 1, failed 0, canceled 0, ignored 0, pending 0
All tests passed.
```

Before:
```
10457:24/05/16 01:38:37 INFO DeltaSource: [queryId = 199ce] [batchId = 0] Getting CDC dataFrame for delta_log_path=file:/tmp/spark-c3309e79-80cf-4819-8b03-2fc607cc2679/_delta_log with startVersion=0, startIndex=-100, isInitialSnapshot=false, endOffset={"sourceVersion":1,"reservoirId":"75194ee3-4ff6-431e-8e7c-32006684d5ad","reservoirVersion":1,"index":-1,"isStartingVersion":false} took timeMs=52 ms
11114:24/05/16 01:38:39 INFO DeltaSource: [queryId = 199ce] [batchId = 1] Getting CDC dataFrame for delta_log_path=file:/tmp/spark-c3309e79-80cf-4819-8b03-2fc607cc2679/_delta_log with startVersion=1, startIndex=-100, isInitialSnapshot=false, endOffset={"sourceVersion":1,"reservoirId":"75194ee3-4ff6-431e-8e7c-32006684d5ad","reservoirVersion":2,"index":-1,"isStartingVersion":false} took timeMs=25 ms
11518:24/05/16 01:38:39 INFO DeltaSource: [queryId = 199ce] [batchId = 2] Getting CDC dataFrame for delta_log_path=file:/tmp/spark-c3309e79-80cf-4819-8b03-2fc607cc2679/_delta_log with startVersion=2, startIndex=-100, isInitialSnapshot=false, endOffset={"sourceVersion":1,"reservoirId":"75194ee3-4ff6-431e-8e7c-32006684d5ad","reservoirVersion":3,"index":-1,"isStartingVersion":false} took timeMs=24 ms
```

After:
```
10498:24/05/16 01:32:10 INFO DeltaSource: [queryId = ede3f] [batchId = 0] Getting CDC dataFrame for delta_log_path=file:/tmp/spark-270c3d6e-40df-4e6f-b1da-c293af5d6741/_delta_log with startVersion=0, startIndex=-100, isInitialSnapshot=false, endOffset={"sourceVersion":1,"reservoirId":"516bafe0-e0ea-4380-afcb-44e416302a07","reservoirVersion":1,"index":-1,"isStartingVersion":false} took timeMs=39 ms
11155:24/05/16 01:32:11 INFO DeltaSource: [queryId = ede3f] [batchId = 1] Getting CDC dataFrame for delta_log_path=file:/tmp/spark-270c3d6e-40df-4e6f-b1da-c293af5d6741/_delta_log with startVersion=1, startIndex=-100, isInitialSnapshot=false, endOffset={"sourceVersion":1,"reservoirId":"516bafe0-e0ea-4380-afcb-44e416302a07","reservoirVersion":2,"index":-1,"isStartingVersion":false} took timeMs=14 ms
11579:24/05/16 01:32:12 INFO DeltaSource: [queryId = ede3f] [batchId = 2] Getting CDC dataFrame for delta_log_path=file:/tmp/spark-270c3d6e-40df-4e6f-b1da-c293af5d6741/_delta_log with startVersion=2, startIndex=-100, isInitialSnapshot=false, endOffset={"sourceVersion":1,"reservoirId":"516bafe0-e0ea-4380-afcb-44e416302a07","reservoirVersion":3,"index":-1,"isStartingVersion":false} took timeMs=13 ms
```

Difference is even more if we are processing/reading through large
number of backlog versions.

In Cx setup, before the change - batches are taking > 300s. After the
change, batches complete is < 15s.

## Does this PR introduce _any_ user-facing changes?
No
  • Loading branch information
anishshri-db authored May 22, 2024
1 parent 699df38 commit 0ee9fd0
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ trait DeltaSourceBase extends Source
}
logInfo(s"Getting dataFrame for delta_log_path=${deltaLog.logPath} with " +
s"startVersion=$startVersion, startIndex=$startIndex, " +
s"isInitialSnapshot=$isInitialSnapshot took timeMs=$duration ms")
s"isInitialSnapshot=$isInitialSnapshot, endOffset=$endOffset took timeMs=$duration ms")
result
} finally {
fileActionsIter.close()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,8 +216,8 @@ trait DeltaSourceCDCSupport { self: DeltaSource =>
cdcInfo.fileChangeDf
}
logInfo(s"Getting CDC dataFrame for delta_log_path=${deltaLog.logPath} with " +
s"startVersion=$startVersion, startIndex=$startIndex, isInitialSnapshot=$isInitialSnapshot " +
s"took timeMs=$duration ms")
s"startVersion=$startVersion, startIndex=$startIndex, " +
s"isInitialSnapshot=$isInitialSnapshot, endOffset=$endOffset took timeMs=$duration ms")
result
}

Expand Down Expand Up @@ -277,6 +277,16 @@ trait DeltaSourceCDCSupport { self: DeltaSource =>
}
}

/** Verifies that provided version is <= endOffset version, if defined. */
def versionLessThanEndOffset(version: Long, endOffset: Option[DeltaSourceOffset]): Boolean = {
endOffset match {
case Some(eo) =>
version <= eo.reservoirVersion
case None =>
true
}
}

val (result, duration) = Utils.timeTakenMs {
val iter: Iterator[(Long, IndexedChangeFileSeq)] = if (isInitialSnapshot) {
// If we are reading change data from the start of the table we need to
Expand All @@ -301,9 +311,11 @@ trait DeltaSourceCDCSupport { self: DeltaSource =>
}

// In this case, filterFiles will consume the available capacity. We use takeWhile
// to stop the iteration when we reach the limit which will save us from reading
// unnecessary log files.
iter.takeWhile(_ => limits.forall(_.hasCapacity)).map { case (version, indexItr) =>
// to stop the iteration when we reach the limit or if endOffset is specified and the
// endVersion is reached which will save us from reading unnecessary log files.
iter.takeWhile { case (version, _) =>
limits.forall(_.hasCapacity) && versionLessThanEndOffset(version, endOffset)
}.map { case (version, indexItr) =>
(version, indexItr.filterFiles(fromVersion, fromIndex, limits, endOffset))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -500,8 +500,11 @@ trait ColumnMappingStreamingBlockedWorkflowSuiteBase extends ColumnMappingStream
val dfStartAtZero = dropCDCFields(dsr
.option(DeltaOptions.STARTING_VERSION_OPTION, "0")
.load(inputDir.getCanonicalPath))
checkStreamStartBlocked(
dfStartAtZero, checkpointDir2, ExpectGenericSchemaIncompatibleFailure)
testStream(dfStartAtZero)(
StartStream(checkpointLocation = checkpointDir2.getCanonicalPath),
ProcessAllAvailableIgnoreError,
ExpectGenericSchemaIncompatibleFailure
)
} else {
// In the trickier case when we rename a column and rename back, we could not
// immediately detect the schema incompatibility at stream start, so we will move on.
Expand Down

0 comments on commit 0ee9fd0

Please sign in to comment.