Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[Spark] Skip reading log entries beyond endOffset, if specified while…
… getting file changes for CDC in streaming queries (#3110) #### Which Delta project/connector is this regarding? - [x] Spark - [ ] Standalone - [ ] Flink - [ ] Kernel - [ ] Other (fill in here) ## Description Skip reading log entries beyond endOffset, if specified while getting file changes for CDC in streaming queries ## How was this patch tested? Existing unit tests Also verified using logs to ensure that additional Delta logs are not read ``` 24/05/16 01:21:01 INFO StateStore: StateStore stopped Run completed in 54 seconds, 237 milliseconds. Total number of tests run: 1 Suites: completed 1, aborted 0 Tests: succeeded 1, failed 0, canceled 0, ignored 0, pending 0 All tests passed. ``` Before: ``` 10457:24/05/16 01:38:37 INFO DeltaSource: [queryId = 199ce] [batchId = 0] Getting CDC dataFrame for delta_log_path=file:/tmp/spark-c3309e79-80cf-4819-8b03-2fc607cc2679/_delta_log with startVersion=0, startIndex=-100, isInitialSnapshot=false, endOffset={"sourceVersion":1,"reservoirId":"75194ee3-4ff6-431e-8e7c-32006684d5ad","reservoirVersion":1,"index":-1,"isStartingVersion":false} took timeMs=52 ms 11114:24/05/16 01:38:39 INFO DeltaSource: [queryId = 199ce] [batchId = 1] Getting CDC dataFrame for delta_log_path=file:/tmp/spark-c3309e79-80cf-4819-8b03-2fc607cc2679/_delta_log with startVersion=1, startIndex=-100, isInitialSnapshot=false, endOffset={"sourceVersion":1,"reservoirId":"75194ee3-4ff6-431e-8e7c-32006684d5ad","reservoirVersion":2,"index":-1,"isStartingVersion":false} took timeMs=25 ms 11518:24/05/16 01:38:39 INFO DeltaSource: [queryId = 199ce] [batchId = 2] Getting CDC dataFrame for delta_log_path=file:/tmp/spark-c3309e79-80cf-4819-8b03-2fc607cc2679/_delta_log with startVersion=2, startIndex=-100, isInitialSnapshot=false, endOffset={"sourceVersion":1,"reservoirId":"75194ee3-4ff6-431e-8e7c-32006684d5ad","reservoirVersion":3,"index":-1,"isStartingVersion":false} took timeMs=24 ms ``` After: ``` 10498:24/05/16 01:32:10 INFO DeltaSource: [queryId = ede3f] [batchId = 0] Getting CDC dataFrame for delta_log_path=file:/tmp/spark-270c3d6e-40df-4e6f-b1da-c293af5d6741/_delta_log with startVersion=0, startIndex=-100, isInitialSnapshot=false, endOffset={"sourceVersion":1,"reservoirId":"516bafe0-e0ea-4380-afcb-44e416302a07","reservoirVersion":1,"index":-1,"isStartingVersion":false} took timeMs=39 ms 11155:24/05/16 01:32:11 INFO DeltaSource: [queryId = ede3f] [batchId = 1] Getting CDC dataFrame for delta_log_path=file:/tmp/spark-270c3d6e-40df-4e6f-b1da-c293af5d6741/_delta_log with startVersion=1, startIndex=-100, isInitialSnapshot=false, endOffset={"sourceVersion":1,"reservoirId":"516bafe0-e0ea-4380-afcb-44e416302a07","reservoirVersion":2,"index":-1,"isStartingVersion":false} took timeMs=14 ms 11579:24/05/16 01:32:12 INFO DeltaSource: [queryId = ede3f] [batchId = 2] Getting CDC dataFrame for delta_log_path=file:/tmp/spark-270c3d6e-40df-4e6f-b1da-c293af5d6741/_delta_log with startVersion=2, startIndex=-100, isInitialSnapshot=false, endOffset={"sourceVersion":1,"reservoirId":"516bafe0-e0ea-4380-afcb-44e416302a07","reservoirVersion":3,"index":-1,"isStartingVersion":false} took timeMs=13 ms ``` Difference is even more if we are processing/reading through large number of backlog versions. In Cx setup, before the change - batches are taking > 300s. After the change, batches complete is < 15s. ## Does this PR introduce _any_ user-facing changes? No
- Loading branch information