Skip to content

Commit

Permalink
Finish fixing deduplicator to use smaller files
Browse files Browse the repository at this point in the history
  • Loading branch information
carlini committed Mar 3, 2022
1 parent 6a6eb6b commit cf6b793
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 42 deletions.
2 changes: 1 addition & 1 deletion scripts/finish_dedup_lm1b.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ def _generate_examples(self, split):
#print(byte_start, byte_end, remove[ptr])
while ptr < len(remove) and byte_start <= remove[ptr][0] < byte_end:
#print(remove[ptr])
assert remove[ptr][1] < byte_end+4
assert remove[ptr][1] < byte_end+6
# The magic value 6 here corresponds to the 4-byte index prefix followed by \xff\xff.
remove_ex[i].append((max(int(remove[ptr][0] - byte_start - 6), 0),
min(int(remove[ptr][1] - byte_start), byte_end-byte_start)))
Expand Down
92 changes: 51 additions & 41 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,6 @@ use clap::{Parser, Subcommand};

mod table;

const EIGHT:usize = 8;

#[derive(Parser, Debug)]
#[clap(author, version, about, long_about = None)]
struct Args {
Expand Down Expand Up @@ -188,7 +186,6 @@ fn table_load_disk(table:&mut BufReader<File>,
table.seek(std::io::SeekFrom::Start ((index*size_width) as u64)).expect ("Seek failed!");
let mut tmp = [0u8; 8];
table.read_exact(&mut tmp[..size_width]).unwrap();
//println!("{:?}", &tmp[8-size_width..]);
return u64::from_le_bytes(tmp) as usize;
}

Expand Down Expand Up @@ -319,8 +316,10 @@ fn count_occurances(text: &mut File,
*
* The result of calling this method is a new file with ".table.bin" appended
* to the name which is the suffix array of sorted suffix pointers. This file
* should be exactly 8x larger than the original file (one u64 pointer per
* byte of the original).
* should be at most 8x larger than the original file (one u64 pointer per
* byte of the original). In order to save space, if it turns out we only need
* 32 bits to uniquely point into the data file then we serialize using fewer
* bits (or 24, or 40, or ...), but in memory we always use a u64.
*
* If the file does not fit into memory, then instead you should use the
* alternate save_part and then merge_parallel in two steps. See the comments
Expand Down Expand Up @@ -613,12 +612,10 @@ fn cmd_across_similar(data_file_1: &String, data_file_2: &String, cache_dir: &St
let metadata2 = fs::metadata(format!("{}.table.bin", data_file_2)).expect("suffix array exists for arg 1");

assert!(metadata1.len() % (text1.len() as u64) == 0);
let ratio = metadata1.len()/(text1.len() as u64);
assert!(ratio == 8);
let ratio1 = metadata1.len()/(text1.len() as u64);

assert!(metadata2.len() % (text2.len() as u64) == 0);
let ratio = metadata2.len()/(text2.len() as u64);
assert!(ratio == 8);
let ratio2 = metadata2.len()/(text2.len() as u64);

if !Path::new(&cache_dir).exists() {
fs::create_dir(cache_dir)?;
Expand All @@ -628,41 +625,43 @@ fn cmd_across_similar(data_file_1: &String, data_file_2: &String, cache_dir: &St
start1:usize, end1:usize,
start2:usize, end2:usize,
data_file_1: String, data_file_2: String,
cache_dir: String, length_threshold: usize) -> usize {
let mut table1 = make_table(format!("{}.table.bin", data_file_1), start1, EIGHT);
cache_dir: String, length_threshold: usize,
size_width_1: usize, size_width_2: usize) -> usize {
let mut table1 = make_table(format!("{}.table.bin", data_file_1), start1, size_width_1);
let mut location1 = get_next_pointer_from_table(&mut table1);

let mut table2 = make_table(format!("{}.table.bin", data_file_2), start2, EIGHT);
let mut table2 = make_table(format!("{}.table.bin", data_file_2), start2, size_width_2);
let mut location2 = get_next_pointer_from_table(&mut table2);

// What do you mean this looks ugly. I see no problem here!
let mut outfile2 = std::io::BufWriter::new(fs::File::create(
let mut outfile1 = std::io::BufWriter::new(fs::File::create(
format!("{}/dups_{}_{}-{}_{}_{}-{}",
cache_dir,
data_file_1.split("/").last().unwrap(), start1, end1,
data_file_2.split("/").last().unwrap(), start2, end2,
)).unwrap());
let mut outfile2_sizes = std::io::BufWriter::new(fs::File::create(
let mut outfile1_sizes = std::io::BufWriter::new(fs::File::create(
format!("{}/sizes_{}_{}-{}_{}_{}-{}",
cache_dir,
data_file_1.split("/").last().unwrap(), start1, end1,
data_file_2.split("/").last().unwrap(), start2, end2,
)).unwrap());

let mut outfile1 = std::io::BufWriter::new(fs::File::create(
let mut outfile2 = std::io::BufWriter::new(fs::File::create(
format!("{}/dups_{}_{}-{}_{}_{}-{}",
cache_dir,
data_file_2.split("/").last().unwrap(), start2, end2,
data_file_1.split("/").last().unwrap(), start1, end1,
)).unwrap());
let mut outfile1_sizes = std::io::BufWriter::new(fs::File::create(
let mut outfile2_sizes = std::io::BufWriter::new(fs::File::create(
format!("{}/sizes_{}_{}-{}_{}_{}-{}",
cache_dir,
data_file_2.split("/").last().unwrap(), start2, end2,
data_file_1.split("/").last().unwrap(), start1, end1,
)).unwrap());


let mut duplicate_count = 0;
let mut i = start1;
let mut j = start2;
while i < end1 && j < end2 {
Expand All @@ -686,32 +685,40 @@ fn cmd_across_similar(data_file_1: &String, data_file_2: &String, cache_dir: &St
// the first string that are equal to target_suf
let start = i;
while suf1.len() >= length_threshold && &suf1[..length_threshold] == target_suf {
outfile2.write_all(&to_bytes(&[location1 as u64][..], EIGHT)[..]).expect("Ok");
outfile1.write_all(&to_bytes(&[location1 as u64][..], size_width_1)[..]).expect("Ok");

location1 = get_next_pointer_from_table(&mut table1);
suf1 = &text1[location1 as usize..];
location1 = get_next_pointer_from_table_canfail(&mut table1);
i += 1;
if location1 == std::u64::MAX {
break;
}
suf1 = &text1[location1 as usize..];
}
outfile1_sizes.write_all(&to_bytes(&[(i-start) as u64][..], EIGHT)[..]).expect("Ok");
duplicate_count += i-start;
outfile1_sizes.write_all(&to_bytes(&[(i-start) as u64][..], size_width_1)[..]).expect("Ok");

// And now find all matches from the second string that are equal to target_suf
let start = j;
while suf2.len() >= length_threshold && &suf2[..length_threshold] == target_suf {
outfile1.write_all(&to_bytes(&[location2 as u64][..], EIGHT)[..]).expect("Ok");
outfile2.write_all(&to_bytes(&[location2 as u64][..], size_width_2)[..]).expect("Ok");

location2 = get_next_pointer_from_table(&mut table2);
suf2 = &text2[location2 as usize..];
j += 1;
if location2 == std::u64::MAX {
break;
}
suf2 = &text2[location2 as usize..];
}
outfile2_sizes.write_all(&to_bytes(&[(j-start) as u64][..], EIGHT)[..]).expect("Ok");
duplicate_count += j-start;
outfile2_sizes.write_all(&to_bytes(&[(j-start) as u64][..], size_width_2)[..]).expect("Ok");
} else if suf1 < suf2 {
// No match, and the first suffix is smaller. Increment the smaller one
i += 1;
location1 = get_next_pointer_from_table(&mut table1);
location1 = get_next_pointer_from_table_canfail(&mut table1);
} else if suf2 < suf1 {
// No match, and the second suffix is smaller. Increment the smaller one
j += 1;
location2 = get_next_pointer_from_table(&mut table2);
location2 = get_next_pointer_from_table_canfail(&mut table2);
} else {
// This happens only when
// 1. The two suffixes are identical
Expand All @@ -724,7 +731,7 @@ fn cmd_across_similar(data_file_1: &String, data_file_2: &String, cache_dir: &St
}
}

return 0;
return duplicate_count;
}


Expand All @@ -743,25 +750,26 @@ fn cmd_across_similar(data_file_1: &String, data_file_2: &String, cache_dir: &St
let mut table2 = std::io::BufReader::new(fs::File::open(format!("{}.table.bin", data_file_2)).unwrap());
let this_start = last_end;

let end_seq = &text1[table_load_disk(&mut table1, b, EIGHT)..];
let this_end = off_disk_position(text2, &mut table2, end_seq, EIGHT);
let end_seq = &text1[table_load_disk(&mut table1, b, ratio1 as usize)..];
let this_end = off_disk_position(text2, &mut table2, end_seq, ratio2 as usize);

last_end = this_end;
println!("start {} {}", this_start, this_end);
let one_result = scope.spawn(move || {

return worker(text1, text2,
a, b,
this_start, this_end,
data_file_1.clone(), data_file_2.clone(),
cache_dir.clone(),
length_threshold);
a, b,
this_start, this_end,
data_file_1.clone(), data_file_2.clone(),
cache_dir.clone(),
length_threshold,
ratio1 as usize, ratio2 as usize);
});
result.push(one_result);
}

let thread_sum:usize = result.into_iter().map(|t| t.join()).sum();
println!("Final answer {:?}", thread_sum);
println!("Duplicates found: {:?}", thread_sum);

});
Ok(())
Expand Down Expand Up @@ -847,8 +855,8 @@ fn cmd_merge(data_files: &Vec<String>, output_file: &String, num_threads: i64)
return meta.len();
}).collect();

let big_ratio = ((texts_len.sum() as f64).log2()/8.0).ceil() as usize;
println!("Ratio: {}", ratio);
let big_ratio = ((texts_len.iter().sum::<usize>() as f64).log2()/8.0).ceil() as usize;
println!("Ratio: {}", big_ratio);

let ratio = metadatas[0] / (texts[0].len() as u64);

Expand Down Expand Up @@ -1063,12 +1071,12 @@ fn cmd_collect(data_file: &String, cache_dir: &String, length_threshold: u64) -
for path in path_list.into_iter() {
let path = path.clone();
let out = scope.spawn(move || {
let all_items = from_bytes(fs::read(path.clone()).unwrap(), size_width as usize);
println!("Got {} {:?}", size_width, &all_items[..10]);
let mut all_items = from_bytes(fs::read(path.clone()).unwrap(), size_width as usize);
//println!("Got {} {:?}", size_width, &all_items[..10]);

let mut all_items:Vec<u64> = all_items.into_iter().filter(|&x| x%2 == 0).collect();
//let mut all_items:Vec<u64> = all_items.into_iter().filter(|&x| x%2 == 0).collect();
all_items.sort_unstable();
println!("Done {}", all_items.len());
//println!("Done {}", all_items.len());
return all_items;
});
result.push(out);
Expand All @@ -1085,7 +1093,9 @@ fn cmd_collect(data_file: &String, cache_dir: &String, length_threshold: u64) -

// Seed the heap with the first element of each
for (i, output) in outputs.iter().enumerate() {
heap.push(Reverse((output[0], 0, i)));
if output.len() > 0 {
heap.push(Reverse((output[0], 0, i)));
}
}

let mut ranges:Vec<(u64,u64)> = Vec::with_capacity(1000);
Expand Down

0 comments on commit cf6b793

Please sign in to comment.