Skip to content

Commit

Permalink
Better collection
Browse files Browse the repository at this point in the history
  • Loading branch information
carlini committed Feb 24, 2022
1 parent 6171a87 commit 262cea5
Showing 1 changed file with 98 additions and 43 deletions.
141 changes: 98 additions & 43 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ use std::io::BufReader;
use std::fs::File;
use std::io::prelude::*;
use std::convert::TryInto;
use std::cmp::Reverse;


//use std::ffi::OsString;
Expand Down Expand Up @@ -147,6 +148,8 @@ enum Commands {
data_name: String,
#[clap(short, long)]
cache_dir: String,
#[clap(short, long)]
length_threshold: u64,
}

}
Expand Down Expand Up @@ -180,7 +183,7 @@ pub fn from_bytes(input: Vec<u8>) -> Vec<u64> {
}

/* Get the next word from the suffix table. */
fn get_next_82(mut tablestream:&mut TableStream) -> u64 {
fn get_next_pointer_from_table(mut tablestream:&mut TableStream) -> u64 {
if tablestream.ptr >= tablestream.cache.len() {
let _ = tablestream.file.read_exact(&mut tablestream.cache);
tablestream.ptr = 0;
Expand Down Expand Up @@ -218,35 +221,36 @@ struct TableStream {


#[derive(Copy, Clone, Eq, PartialEq)]
struct State<'a> {
struct MergeState<'a> {
suffix: &'a [u8],
position: u64,
table_index: usize
}

fn make_table(path: std::string::String, offset: usize) -> TableStream {
let mut table = TableStream {
file: std::io::BufReader::new(fs::File::open(path).unwrap()),
cache: vec![0u8; 1024*1024],
ptr: 1024*1024
};
table.file.seek (std::io::SeekFrom::Start ((offset*8) as u64)).expect ("Seek failed!");
return table;
}

impl<'a> Ord for State<'a> {
impl<'a> Ord for MergeState<'a> {
fn cmp(&self, other: &Self) -> Ordering {
other.suffix.cmp(&self.suffix)
}
}

impl<'a> PartialOrd for State<'a> {
impl<'a> PartialOrd for MergeState<'a> {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}


fn make_table(path: std::string::String, offset: usize) -> TableStream {
let mut table = TableStream {
file: std::io::BufReader::new(fs::File::open(path).unwrap()),
cache: vec![0u8; 1024*1024],
ptr: 1024*1024
};
table.file.seek (std::io::SeekFrom::Start ((offset*8) as u64)).expect ("Seek failed!");
return table;
}


const HACKSIZE:usize=100000;

fn count_occurances(text: &mut File, mut table: &mut BufReader<File>, size: u64, str: &[u8]) -> u64{
Expand Down Expand Up @@ -281,7 +285,6 @@ fn count_occurances(text: &mut File, mut table: &mut BufReader<File>, size: u64,
let pos = table_load_disk(&mut table, mid as usize);
text.seek(std::io::SeekFrom::Start(pos as u64)).expect ("Seek failed!");
text.read_exact(&mut buf).unwrap();
//println!("{:?}", buf);
if str != &buf {
high = mid;
} else {
Expand Down Expand Up @@ -434,7 +437,7 @@ fn cmd_self_similar(data_file: &String, length_threshold: &usize, frequency_thre
length_threshold: usize, frequency_threshold: usize, only_save_one: bool,
data_file: String, cache_dir: String) -> usize {
let mut table = make_table(format!("{}.table.bin", data_file), start);
let mut prev_location = get_next_82(&mut table);
let mut prev_location = get_next_pointer_from_table(&mut table);

let mut outfile = std::io::BufWriter::new(fs::File::create(
format!("{}/dups_{}_{}-{}", cache_dir,
Expand All @@ -455,7 +458,7 @@ fn cmd_self_similar(data_file: &String, length_threshold: &usize, frequency_thre
let mut first = true;

loop {
cur_location = get_next_82(&mut table);
cur_location = get_next_pointer_from_table(&mut table);
i += 1;

let suf2 = &text[cur_location as usize..];
Expand Down Expand Up @@ -541,10 +544,10 @@ fn cmd_across_similar(data_file_1: &String, data_file_2: &String, cache_dir: &St
data_file_1: String, data_file_2: String,
cache_dir: String, length_threshold: usize) -> usize {
let mut table1 = make_table(format!("{}.table.bin", data_file_1), start1);
let mut location1 = get_next_82(&mut table1);
let mut location1 = get_next_pointer_from_table(&mut table1);

let mut table2 = make_table(format!("{}.table.bin", data_file_2), start2);
let mut location2 = get_next_82(&mut table2);
let mut location2 = get_next_pointer_from_table(&mut table2);

// What do you mean this looks ugly. I see no problem here!
let mut outfile2 = std::io::BufWriter::new(fs::File::create(
Expand Down Expand Up @@ -592,7 +595,7 @@ fn cmd_across_similar(data_file_1: &String, data_file_2: &String, cache_dir: &St
while suf1.len() >= length_threshold && &suf1[..length_threshold] == target_suf {
outfile2.write_all(&to_bytes(&[location1 as u64][..])[..]).expect("Ok");

location1 = get_next_82(&mut table1);
location1 = get_next_pointer_from_table(&mut table1);
suf1 = &text1[location1 as usize..];
i += 1;
}
Expand All @@ -602,17 +605,17 @@ fn cmd_across_similar(data_file_1: &String, data_file_2: &String, cache_dir: &St
while suf2.len() >= length_threshold && &suf2[..length_threshold] == target_suf {
outfile1.write_all(&to_bytes(&[location2 as u64][..])[..]).expect("Ok");

location2 = get_next_82(&mut table2);
location2 = get_next_pointer_from_table(&mut table2);
suf2 = &text2[location2 as usize..];
j += 1;
}
outfile2_sizes.write_all(&to_bytes(&[(j-start) as u64][..])[..]).expect("Ok");
} else if suf1 < suf2 {
i += 1;
location1 = get_next_82(&mut table1);
location1 = get_next_pointer_from_table(&mut table1);
} else if suf2 < suf1 {
j += 1;
location2 = get_next_82(&mut table2);
location2 = get_next_pointer_from_table(&mut table2);
} else {
// This happens only when
// 1. The two suffixes are identical
Expand All @@ -621,7 +624,7 @@ fn cmd_across_similar(data_file_1: &String, data_file_2: &String, cache_dir: &St
assert!(&suf1 == &suf2);
assert!(suf1.len() < 100 || suf2.len() < 100);
i += 1;
location1 = get_next_82(&mut table1);
location1 = get_next_pointer_from_table(&mut table1);
}
}

Expand Down Expand Up @@ -733,10 +736,10 @@ fn cmd_merge(data_files: &Vec<String>, output_file: &String, num_threads: i64)

fn get_next_maybe_skip(mut tablestream:&mut TableStream,
index:&mut u64, thresh:usize) -> u64 {
let mut location = get_next_82(&mut tablestream);
let mut location = get_next_pointer_from_table(&mut tablestream);
*index += 1;
while location >= thresh as u64 {
location = get_next_82(&mut tablestream);
location = get_next_pointer_from_table(&mut tablestream);
*index += 1;
}
return location;
Expand All @@ -747,7 +750,7 @@ fn cmd_merge(data_files: &Vec<String>, output_file: &String, num_threads: i64)
for x in 0..nn {
let position = get_next_maybe_skip(&mut tables[x],
&mut idxs[x], texts_len[x]);
heap.push(State {
heap.push(MergeState {
suffix: &texts[x][position as usize..],
position: position,
table_index: x
Expand All @@ -757,7 +760,7 @@ fn cmd_merge(data_files: &Vec<String>, output_file: &String, num_threads: i64)

let mut prev_position = 0;
let mut prev = &texts[0][0..];
while let Some(State {suffix: _suffix, position, table_index}) = heap.pop() {
while let Some(MergeState {suffix: _suffix, position, table_index}) = heap.pop() {
next_table.write_all(&(position + delta[table_index] as u64).to_le_bytes()).expect("Write OK");

let position = get_next_maybe_skip(&mut tables[table_index],
Expand All @@ -776,7 +779,7 @@ fn cmd_merge(data_files: &Vec<String>, output_file: &String, num_threads: i64)
println!("{} match len: xx\n", part);
}

heap.push(State {
heap.push(MergeState {
suffix: &texts[table_index][position as usize..],
position: position,
table_index: table_index
Expand Down Expand Up @@ -849,7 +852,29 @@ fn cmd_merge(data_files: &Vec<String>, output_file: &String, num_threads: i64)
Ok(())
}

fn cmd_collect(ds_name: &String, cache_dir: &String) -> std::io::Result<()> {
/*
* Given the output of either self-similar or across-similar,
* compute byte ranges that are duplicates.
*
* The similar outputs are just byte values
* [A_0, A_1, ..., A_N]
* meaning that the bytes from (A_i, A_i + length_threshold) are duplicated somewhere.
*
* This script converts this to ranges [a, b) for complete ranges that should be removed.
* For example if we have a long duplicate sequence
* abcdefg
* then we might have a match for `abcde` and `bcdef` and `cdefg`
* So instead of just saying tokens 0, 1, and 2 match, here we say that [0, 7) match.
*
* To do this we
* (a) sort the output lists, and then
* (b) collapse overlapping buckets.
*
* Note that as a result of doing this, we might have a sequence `qwerty` where the
* entire sequence is never repeated in the dataset multiple times, but each byte
* in the sequence is part of some length_threshold duplicate.
*/
fn cmd_collect(ds_name: &String, cache_dir: &String, length_threshold: u64) -> std::io::Result<()> {
let paths = fs::read_dir(cache_dir).unwrap();

let mut path_list = Vec::with_capacity(1000);
Expand All @@ -861,6 +886,8 @@ fn cmd_collect(ds_name: &String, cache_dir: &String) -> std::io::Result<()> {
path_list.push(path);
}

// 1. Perform an initial sort of each of the found duplicates

let mut result = Vec::with_capacity(100);
crossbeam::scope(|scope| {
for path in path_list.into_iter() {
Expand All @@ -876,20 +903,48 @@ fn cmd_collect(ds_name: &String, cache_dir: &String) -> std::io::Result<()> {
}
});
let outputs:Vec<Vec<u64>> = result.into_iter().map(|t| t.join()).collect();
let mut all_items:Vec<u64> = outputs.into_iter().flatten().collect();
println!("Sorting.");
all_items.sort_unstable();
println!("Sorted.");

let mut all_items:Vec<u64> = Vec::with_capacity(1000);
println!("Merging.");

// 2. Perform a merge of the now-sorted lists

let mut heap = BinaryHeap::new();

// Seed the heap with the first element of each
for (i, output) in outputs.iter().enumerate() {
heap.push(Reverse((output[0], 0, i)));
}

let mut ranges:Vec<(u64,u64)> = Vec::with_capacity(1000);
let mut prev_start = all_items[0];
let mut prev_end = all_items[0]+100;
for x in all_items[1..].iter() {
if *x <= prev_end {
prev_end = *x+100;
let mut prev_start;
let mut prev_end;

// Unroll first iteration of the loop for performance
if let Some(Reverse((data_pointer, index, which_array))) = heap.pop() {
prev_start = data_pointer;
prev_end = data_pointer + length_threshold;
heap.push(Reverse((outputs[which_array][index+1], index+1, which_array)));
} else {
println!("No duplicates found! Either the dataset is duplicate-free or something went wrong.");
return Ok(());
}

// Now walk the the rest of the merging
while let Some(Reverse((data_pointer, index, which_array))) = heap.pop() {
all_items.push(data_pointer);

if data_pointer <= prev_end {
prev_end = data_pointer+length_threshold;
} else {
ranges.push((prev_start, prev_end));
prev_start = *x;
prev_end = *x+100;
prev_start = data_pointer;
prev_end = data_pointer+length_threshold;
}

// If this array has more data, consume it
if index+1 < outputs[which_array].len() {
heap.push(Reverse((outputs[which_array][index+1], index+1, which_array)));
}
}
ranges.push((prev_start, prev_end));
Expand Down Expand Up @@ -934,8 +989,8 @@ fn main() -> std::io::Result<()> {
cmd_merge(suffix_path, output_file, *num_threads)?;
}

Commands::Collect { data_name, cache_dir } => {
cmd_collect(data_name, cache_dir)?;
Commands::Collect { data_name, cache_dir, length_threshold } => {
cmd_collect(data_name, cache_dir, *length_threshold)?;
}
}

Expand Down

0 comments on commit 262cea5

Please sign in to comment.