Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement simple delete case #832

Closed
wjones127 opened this issue Sep 21, 2022 · 8 comments · Fixed by #1176
Closed

Implement simple delete case #832

wjones127 opened this issue Sep 21, 2022 · 8 comments · Fixed by #1176
Labels
binding/rust Issues for the Rust crate enhancement New feature or request good first issue Good for newcomers

Comments

@wjones127
Copy link
Collaborator

wjones127 commented Sep 21, 2022

Description

It's not uncommon to delete along partition boundaries. In those cases the delete should be very simple to implement; just commit some remove actions.

I think we should probably design the API so we can eventually support any where clause in deletes, rather than have a separate function for this simple case and the case where we have to rewrite files.

See the write operation implementation for inspiration: https://github.com/delta-io/delta-rs/blob/main/rust/src/operations/write.rs

Use Case

Related Issue(s)

@wjones127 wjones127 added enhancement New feature or request good first issue Good for newcomers binding/rust Issues for the Rust crate labels Sep 21, 2022
@Thelin90
Copy link

I would like this!

@wjones127 wjones127 mentioned this issue Feb 5, 2023
21 tasks
@dudzicp
Copy link

dudzicp commented Feb 7, 2023

Hi,
I could probably contribute to this under some kind of supervision as I am new to rust.

If i understand this correctly, the dummy approach to implement this would be to:

  • iterate over parquet files from current version
  • query each file to check if it contains rows that matches predicate, something like:
let df = ctx.read_parquet(...
let filtered = df.select_columns(columns_from_predicate).filter(predicate).limit(0, Some(1))?;
let count = filtered.count().await?;

Then for each file with matched rows (count > 0), rewrite it with (with filters applied), add proper delete actions and commit.
I guess in the next iteration we would need to look into metadata to check min/max values to reduce the number of files we need to scan.
Does this make sense?

@dudzicp
Copy link

dudzicp commented Feb 7, 2023

Surely, as a first step we would need to check if predicate columns matches table partitioning.

@roeap
Copy link
Collaborator

roeap commented Feb 7, 2023

Hey @dudzicp,

Thanks for offering to help, we are more then happy to support along the way!

Your initial thought is going in the right direction I think. One of the beautiful things about delta though is, that we have a lot of valuable information available in the delta log already that we can leverage to exclude some files a-priori and avoid the expensive IO that comes with it. The good news is, the integration with datafusion can already leverage this.

Without having thought about it too much yet, the way I would get started is to leverage the pruning predicate that is implemented for DeltaTable. We do something similar when we do file skipping in the scan operation.

let pruning_predicate = PruningPredicate::try_new(predicate, schema.clone())?;
let files_to_prune = pruning_predicate.prune(self)?;
self.get_state()
.files()
.iter()
.zip(files_to_prune.into_iter())
.for_each(|(action, keep_file)| {
if keep_file {
let part = partitioned_file_from_action(action, &schema);
file_groups
.entry(part.partition_values.clone())
.or_default()
.push(part);
};
});

So one way to go could be to pre-select the files from the log that might have matches using PruningPredicate, which gives a all candidates. From there on the brute force way would be to read all files with the filter applied and write out the data immediately, and just assuming each file has a hit.

As you already mentioned in this case we might already be doing too much work, however we could leverage a lot of the existing implementation. I.e. we get all the parallelism etc that comes out of the box with datafusion almost for free and don't need to implement that here.

An alternative way, that might already get us a bit along the way to also supporting deletion vectors is to create a query plan (LogicalPlan or directly ExecutionPlan from datafusion), that will generate single column record batches containing the row indices for all rows that we want to delete. We would have to somehow keep track which vector belongs to which file. One possible solution would be to partition the plan for each file and track partition indices to file names. These could then be used to create a mask for filtering the data when we want to write new files, or (at a later stage) create the deletion vectors from these arrarys. But this approach probably goes beyond "simple delete case" :).

I guess to get us started and establish the higher level APIs, my suggestion would be to use the pruning predicate to get all relevant files, use these to create a ListingTable that can be sued as a data source for a new execution plan (potentially leveraging higher level DF apis to create the plan) and pass that plan to the writer implementation from the operations module. The writer will give you all the add actions, and we then only need to create the delete actions.

Another angle on this might be to start from the same list of files as above, read all the data we want to keep and for each file-partition check if we have as many rows as indicated in the meta-data, if so, there were no hits on the delete predicate and can (have to) disregard the data. if we have less rows, we saw have a hit, and should write the data.

This is so far just thinking out loud, and I have to think a bit more about the best approach I guess for the first iteration we should aim for scanning each relevant file at most once, and leverage the scheduling / execution capabilities from datafusion as best we can.

Then again I may also have missed something obvious and @houqp, @wjones127, or @schuermannator have some better ideas how to proceed 😆.

@roeap
Copy link
Collaborator

roeap commented Feb 7, 2023

I may also be wrong about scanning files only once. Specifically fi we write the data, we need all columns etc, deciding if we need it may be done on just a small subset of columns and thus potentially require much less IO ...

@wjones127
Copy link
Collaborator Author

Then for each file with matched rows (count > 0), rewrite it with (with filters applied), add proper delete actions and commit.
I guess in the next iteration we would need to look into metadata to check min/max values to reduce the number of files we need to scan.

Yeah I'd almost be tempted to start there. I imagine the flow is something like (sort of Rust, sort of pseudo code):

enum FilterAction {
  Ignore, // No matching rows in the file
  Remove, // The entire file matches the predicate
  Rewrite // At least part of the file matches the predicate
}

/// For an add action, decide whether the predicate applies to the full file,
/// none of the file, or part of the file.
fn check_filter(action: AddAction, predicate: Predicate) -> FilterAction {
  todo!()
}

fn filter_operation(table: DeltaTable, predicate: Predicate) -> {
  let new_actions: Vec<Action> = vec![];
  let files_to_rewrite: Vec<&AddAction> = vec![];

  for action in table.add_actions() {
    match check_filter(action, predicate) {
      Ignore => {},
      Remove => {
        new_actions.push(make_remove_action(action));
      },
      Rewrite => {
        new_actions.push(make_remove_action(action));
        files_to_rewrite.push(action);
      }
    }
  }

  // Read and filter parquet files we need to rewrite
  let plan = ctx.read_parquet(files_to_rewrite).filter(!predicate);
  let files_written = write(plan);
  new_actions.extend(create_add_actions(files_written));

  commit_transaction(new_actions);
}

(This is basically what Robert described.)

check_filter would use PruningPredicate like Robert is suggesting. I guess you might have to evaluate with predicate and !predicate to figure out to differentiate whether an entire file can be removed or we actually have to filter rows. Though I'm not 100% sure that's sound.

From there on the brute force way would be to read all files with the filter applied and write out the data immediately, and just assuming each file has a hit.

I think once you know you need to rewrite a file, there isn't a much more efficient way to do the filtering. (Except maybe you could you Parquet row group statistics to determine which row groups can be passed through to the new file as is without any decoding/decompression. But that would require getting deep into the Parquet implementation, so we'll keep that out of scope.) The nice thing is a scan then filter has no pipeline breakers, so it can be done in a totally streaming fashion (don't have to load the entire table into memory at once).

@Blajda
Copy link
Collaborator

Blajda commented Feb 20, 2023

Hi @dudzicp are you still interesting on taking this work on?
I currently have a branch with a functional implementation just requires further refinement.

@dudzicp
Copy link

dudzicp commented Mar 4, 2023

Yes, I will get back to this topic probably in a month as I am swamped at my primary job. Feel free to push this further if you have time.

wjones127 pushed a commit that referenced this issue May 8, 2023
# Description

This is a full implementation of the Delta Delete Command. Users can now
delete records that match a predicate. This implementation is not
limited to only partition columns and allows non-partition columns.

This also implements a `find_files` function which can be used to
implement the Update command.

# Related Issue(s)

- closes #832 

# Documentation
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
binding/rust Issues for the Rust crate enhancement New feature or request good first issue Good for newcomers
Projects
None yet
Development

Successfully merging a pull request may close this issue.

5 participants