Skip to content

Commit

Permalink
More code commenting and cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
carlini committed Feb 24, 2022
1 parent 9cb2706 commit 6f533f4
Showing 1 changed file with 62 additions and 39 deletions.
101 changes: 62 additions & 39 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@

use std::path::Path;
use std::time::Instant;
use std::env;
use std::fs;
use std::io::Read;
use std::io::BufReader;
Expand All @@ -57,10 +56,6 @@ use std::io::prelude::*;
use std::convert::TryInto;
use std::cmp::Reverse;


//use std::ffi::OsString;
use std::path::PathBuf;

extern crate filebuffer;
extern crate zstd;
extern crate crossbeam;
Expand Down Expand Up @@ -182,17 +177,6 @@ pub fn from_bytes(input: Vec<u8>) -> Vec<u64> {
bytes
}

/* Get the next word from the suffix table. */
fn get_next_pointer_from_table(mut tablestream:&mut TableStream) -> u64 {
if tablestream.ptr >= tablestream.cache.len() {
let _ = tablestream.file.read_exact(&mut tablestream.cache);
tablestream.ptr = 0;
}
let out = u64::from_le_bytes(tablestream.cache[tablestream.ptr..tablestream.ptr+8].try_into().expect("sdf")) as u64;
tablestream.ptr += 8;
return out;
}

/* For a suffix array, just compute A[i], but load off disk because A is biiiiiiigggggg. */
fn table_load_disk(table:&mut BufReader<File>, index:usize) -> usize{
table.seek(std::io::SeekFrom::Start ((index*8) as u64)).expect ("Seek failed!");
Expand All @@ -215,13 +199,19 @@ fn off_disk_position(text: &[u8], table: &mut BufReader<File>, query: &[u8]) ->
left
}

/*
* We're going to work with suffix arrays that are on disk, and we often want
* to stream them top-to-bottom. This is a datastructure that helps us do that:
* we read 1MB chunks of data at a time into the cache, and then fetch new data
* when we reach the end.
*/
struct TableStream {
file: BufReader<File>,
cache: Vec<u8>,
ptr: usize
}


/* Make a table from a file path and a given offset into the table */
fn make_table(path: std::string::String, offset: usize) -> TableStream {
let mut table = TableStream {
file: std::io::BufReader::new(fs::File::open(path).unwrap()),
Expand All @@ -232,8 +222,17 @@ fn make_table(path: std::string::String, offset: usize) -> TableStream {
return table;
}

/* Get the next word from the suffix table. */
fn get_next_pointer_from_table(mut tablestream:&mut TableStream) -> u64 {
if tablestream.ptr >= tablestream.cache.len() {
let _ = tablestream.file.read_exact(&mut tablestream.cache);
tablestream.ptr = 0;
}
let out = u64::from_le_bytes(tablestream.cache[tablestream.ptr..tablestream.ptr+8].try_into().expect("This should never happen")) as u64;
tablestream.ptr += 8;
return out;
}

const HACKSIZE:usize=100000;

/*
* Helper function to actually do the count of the number of times something is repeated.
Expand Down Expand Up @@ -452,7 +451,7 @@ fn cmd_self_similar(data_file: &String, length_threshold: &usize, frequency_thre
fs::create_dir(cache_dir)?;
}

fn sdf(text:&[u8], start:usize, end:usize,
fn worker(text:&[u8], start:usize, end:usize,
length_threshold: usize, frequency_threshold: usize, only_save_one: bool,
data_file: String, cache_dir: String) -> usize {
let mut table = make_table(format!("{}.table.bin", data_file), start);
Expand Down Expand Up @@ -521,7 +520,7 @@ fn cmd_self_similar(data_file: &String, length_threshold: &usize, frequency_thre
let text = &text;
for i in 0..num_threads {
let one_result = scope.spawn(move || {
return sdf(text,
return worker(text,
std::cmp::max(0i64,i*increment-1) as usize,
std::cmp::min(((i+1)*increment) as usize, text.len()),
*length_threshold, *frequency_threshold, *only_save_one,
Expand Down Expand Up @@ -580,11 +579,11 @@ fn cmd_across_similar(data_file_1: &String, data_file_2: &String, cache_dir: &St
fs::create_dir(cache_dir)?;
}

fn sdf(text1:&[u8], text2:&[u8],
start1:usize, end1:usize,
start2:usize, end2:usize,
data_file_1: String, data_file_2: String,
cache_dir: String, length_threshold: usize) -> usize {
fn worker(text1:&[u8], text2:&[u8],
start1:usize, end1:usize,
start2:usize, end2:usize,
data_file_1: String, data_file_2: String,
cache_dir: String, length_threshold: usize) -> usize {
let mut table1 = make_table(format!("{}.table.bin", data_file_1), start1);
let mut location1 = get_next_pointer_from_table(&mut table1);

Expand Down Expand Up @@ -627,12 +626,19 @@ fn cmd_across_similar(data_file_1: &String, data_file_2: &String, cache_dir: &St
let mut suf1 = &text1[location1 as usize..];
let mut suf2 = &text2[location2 as usize..];


// Do we have a match between the suffix that begins at location1 in text1
// and the suffix that begins at location2 in text2?
// To check this we need (a) both are long enough, and
// (b) the match is of length at least length_threshold

let does_match = suf1.len() >= length_threshold && suf2.len() >= length_threshold && suf1[..length_threshold] == suf2[..length_threshold];

if does_match {
// We have a match between a subsequence in text1 and text2
let target_suf = &suf1[..length_threshold]; // wlog. equals suf2[..length_threshold]

// We want the matches to be clustered, so let's find all matches from
// the first string that are equal to target_suf
let start = i;
while suf1.len() >= length_threshold && &suf1[..length_threshold] == target_suf {
outfile2.write_all(&to_bytes(&[location1 as u64][..])[..]).expect("Ok");
Expand All @@ -643,6 +649,7 @@ fn cmd_across_similar(data_file_1: &String, data_file_2: &String, cache_dir: &St
}
outfile1_sizes.write_all(&to_bytes(&[(i-start) as u64][..])[..]).expect("Ok");

// And now find all matches from the second string that are equal to target_suf
let start = j;
while suf2.len() >= length_threshold && &suf2[..length_threshold] == target_suf {
outfile1.write_all(&to_bytes(&[location2 as u64][..])[..]).expect("Ok");
Expand All @@ -653,9 +660,11 @@ fn cmd_across_similar(data_file_1: &String, data_file_2: &String, cache_dir: &St
}
outfile2_sizes.write_all(&to_bytes(&[(j-start) as u64][..])[..]).expect("Ok");
} else if suf1 < suf2 {
// No match, and the first suffix is smaller. Increment the smaller one
i += 1;
location1 = get_next_pointer_from_table(&mut table1);
} else if suf2 < suf1 {
// No match, and the second suffix is smaller. Increment the smaller one
j += 1;
location2 = get_next_pointer_from_table(&mut table2);
} else {
Expand All @@ -674,6 +683,7 @@ fn cmd_across_similar(data_file_1: &String, data_file_2: &String, cache_dir: &St
}


// Start a bunch of jobs that each work on non-overlapping regions of the suffix array.
let increment:i64 = (text1.len() as i64-num_threads)/num_threads;
let _answer = crossbeam::scope(|scope| {
let mut result = Vec::with_capacity(num_threads as usize);
Expand All @@ -695,7 +705,7 @@ fn cmd_across_similar(data_file_1: &String, data_file_2: &String, cache_dir: &St
println!("start {} {}", this_start, this_end);
let one_result = scope.spawn(move || {

return sdf(text1, text2,
return worker(text1, text2,
a, b,
this_start, this_end,
data_file_1.clone(), data_file_2.clone(),
Expand Down Expand Up @@ -739,6 +749,7 @@ impl<'a> PartialOrd for MergeState<'a> {
}
}


/*
* Merge together M different suffix arrays (probably created with make-part).
* That is, given strings S_i and suffix arrays A_i compute the suffix array
Expand All @@ -765,6 +776,10 @@ impl<'a> PartialOrd for MergeState<'a> {
* In practice this works. It may not for your use case if there are long duplicates.
*/
fn cmd_merge(data_files: &Vec<String>, output_file: &String, num_threads: i64) -> std::io::Result<()> {
// This value is declared here, but also in scripts/make_suffix_array.py
// If you want to change it, it needs to be changed in both places.
const HACKSIZE:usize=100000;

let nn:usize = data_files.len();

fn load_text2<'s,'t>(fpath:String) -> Vec<u8> {
Expand All @@ -776,6 +791,7 @@ fn cmd_merge(data_files: &Vec<String>, output_file: &String, num_threads: i64)
return text_;
}

// Start out by loading the data files and suffix arrays.
let texts:Vec<Vec<u8>> = (0..nn).map(|x| load_text2(data_files[x].clone())).collect();

let texts_len:Vec<usize> = texts.iter().enumerate().map(|(i,x)| x.len() - (if i+1 == texts.len() {0} else {HACKSIZE})).collect();
Expand All @@ -789,7 +805,7 @@ fn cmd_merge(data_files: &Vec<String>, output_file: &String, num_threads: i64)
let ratio = metadatas[0] / (texts[0].len() as u64);
assert!(ratio == 8);

fn sdf(texts:&Vec<Vec<u8>>, starts:Vec<usize>, ends:Vec<usize>, texts_len:Vec<usize>, part:usize,
fn worker(texts:&Vec<Vec<u8>>, starts:Vec<usize>, ends:Vec<usize>, texts_len:Vec<usize>, part:usize,
output_file: String, data_files: Vec<String>) {

let nn = texts.len();
Expand Down Expand Up @@ -828,9 +844,11 @@ fn cmd_merge(data_files: &Vec<String>, output_file: &String, num_threads: i64)
table_index: x
});
}


let mut prev_position = 0;
// Our algorithm is not linear time if there are really long duplicates
// found in the merge process. If this happens we'll warn once.
let mut did_warn_long_sequences = false;

let mut prev = &texts[0][0..];
while let Some(MergeState {suffix: _suffix, position, table_index}) = heap.pop() {
next_table.write_all(&(position + delta[table_index] as u64).to_le_bytes()).expect("Write OK");
Expand All @@ -841,14 +859,18 @@ fn cmd_merge(data_files: &Vec<String>, output_file: &String, num_threads: i64)
let next = &texts[table_index][position as usize..];

let match_len = (0..50000000).find(|&j| !(j < next.len() && j < prev.len() && next[j] == prev[j]));
if let Some(match_len_) = match_len {
if match_len_ > 5000000 {
println!("{} match len: {}\n", part, match_len_);
println!("Index {} {}", position, prev_position);
println!("ugly {:?}", &next[..300]);
if !did_warn_long_sequences {
if let Some(match_len_) = match_len {
if match_len_ > 5000000 {
println!("There is a match longer than 50,000,000 bytes.");
println!("You probably don't want to be using this code on this dataset---it's (possibly) quadratic runtime now.");
did_warn_long_sequences = true;
}
} else {
println!("There is a match longer than 50,000,000 bytes.");
println!("You probably don't want to be using this code on this dataset---it's quadratic runtime now.");
did_warn_long_sequences = true;
}
} else {
println!("{} match len: xx\n", part);
}

heap.push(MergeState {
Expand All @@ -857,12 +879,13 @@ fn cmd_merge(data_files: &Vec<String>, output_file: &String, num_threads: i64)
table_index: table_index
});
prev = next;
prev_position = position;
}
}
}


// Start a bunch of jobs that each work on non-overlapping regions of the final resulting suffix array
// Each job is going to look at all of the partial suffix arrays to take the relavent slice.
let _answer = crossbeam::scope(|scope| {

let mut tables:Vec<BufReader<File>> = (0..nn).map(|x| {
Expand Down Expand Up @@ -899,7 +922,7 @@ fn cmd_merge(data_files: &Vec<String>, output_file: &String, num_threads: i64)
let ends2 = ends.clone();
let texts_len2 = texts_len.clone();
let _one_result = scope.spawn(move || {
sdf(texts,
worker(texts,
starts2,
ends2,
texts_len2,
Expand Down

0 comments on commit 6f533f4

Please sign in to comment.