diff --git a/filters/highly_repetitive.py b/filters/highly_repetitive.py new file mode 100644 index 0000000..f2c974c --- /dev/null +++ b/filters/highly_repetitive.py @@ -0,0 +1,92 @@ +def break_and_compare(ls: list, k: int) -> list: + """ + This function takes a list ls and an integer k as input and returns a list which is the first chunk of ls that is repeated k times. If no such chunk exists, it returns an empty list. + + Parameters: + + ls (list): The input list. + k (int): The integer value used for splitting and comparing the list. + + """ + n = len(ls) + while n % k != 0: + n -= 1 + to_break = ls[:n] + residual = ls[n:] + chunk_size = n // k + while len(residual) < chunk_size: + # split into chunks + chunks = [to_break[i:i + chunk_size] for i in range(0, len(to_break), chunk_size)] + chunksMatch = True + # compare all chunks to first chunk + for chunk in chunks[1:]: + if chunk != chunks[0]: + chunksMatch = False + break + if chunksMatch: + # compare residual to first chunk + if residual == chunks[0][:len(residual)]: + return chunks[0] + chunk_size -= 1 + new_residual = to_break[chunk_size * k:] + to_break = to_break[:chunk_size * k] + residual = new_residual + residual + return [] + +def break_and_compare_wrapper(ls: list, start_k: int, end_k: int) -> list: + """ + + This function serves as a wrapper for the `break_and_compare` function. It takes an additional two integer parameters `start_k` and `end_k` to define a range of values for `k`. + It iterates over this range and calls `break_and_compare` for each value of `k` within the range. + + Parameters: + - `ls` (list): The input list. + - `start_k` (int): The starting value of `k` for the range (inclusive). + - `end_k` (int): The ending value of `k` for the range (inclusive). + + """ + # end_k is inclusive + ls = list(ls) + length = len(ls) + half = length // 2 + for k in range(start_k, end_k + 1): + for i in range(0, half): + # remove some tokens from the end as well + rem = 2 + # when rem = 0 -> 0.91 0.73 0.81 + # when rem = 1 -> 0.91 0.78 0.84 + # when rem = 2 -> 0.90 0.80 0.84 + # when rem = 3 -> 0.89 0.80 0.84 + # when rem = 4 -> 0.89 0.80 0.84 + # when rem = 5 -> 0.89 0.80 0.84 + # when rem = 6 -> 0.89 0.80 0.84 + for j in range(0, rem+1): + result = break_and_compare(ls[i:length - j], k) + if result: + return result, k + result = break_and_compare(ls[i:], k) + if result: + return result, i, k + result = break_and_compare(ls, k) + if result: + return result, k + return [], -1 + +if __name__ == "__main__": +# from transformers import AutoTokenizer +# inp = """0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, +# 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff""" +# tokenizer = AutoTokenizer.from_pretrained( +# "EleutherAI/pythia-70m-deduped", +# ) +# inp = tokenizer(inp)['input_ids'] +# print(inp) +# # for token in inp: +# # print(token, tokenizer.decode(token)) +# print(break_and_compare_wrapper(inp, 2, 30)) + ls = [1] + start_k = 1 + end_k = 3 + expected = ([1], 1) + output = break_and_compare_wrapper(ls, start_k, end_k) + print(output) \ No newline at end of file diff --git a/filters/pattern_incrementing.py b/filters/pattern_incrementing.py index 53711e9..452638c 100644 --- a/filters/pattern_incrementing.py +++ b/filters/pattern_incrementing.py @@ -1,2 +1,301 @@ -def incrementing_sequences_filter(text): - return True \ No newline at end of file +import re + +def replace_non_numeric_with_whitespace(text: str) -> str: + # Replace non-numeric characters with whitespace + # cleaned_text = re.sub(r'[^0-9]', ' ', text) + new_text = "" + for i in range(len(text)): + if text[i].isdigit(): + new_text += text[i] + elif text[i] == "." and i > 0 and i < len(text)-1 and text[i-1].isdigit() and text[i+1].isdigit(): + new_text += text[i] + else: + new_text += " " + cleaned_text = new_text + + decimal_seen = False + notValidFloat = False + for i in range(len(cleaned_text)): + if cleaned_text[i] == " ": + decimal_seen = False + elif cleaned_text[i] == ".": + if decimal_seen: + notValidFloat = True + break + else: + decimal_seen = True + elif cleaned_text[i].isdigit(): + continue + else: + notValidFloat = True + break + + if notValidFloat: + # Replace non-numeric characters with whitespace + cleaned_text = re.sub(r'[^0-9]', ' ', text) + + # Replace multiple consecutive whitespaces with a single whitespace + cleaned_text = re.sub(r'\s+', ' ', cleaned_text) + + return cleaned_text + +def incrementing_sequences_filter(text : str) -> bool: + # count number of numeric and non-numeric characters + num_numeric = 0 + num_non_numeric = 0 + + for char in text: + if char.isdigit(): + num_numeric += 1 + else: + num_non_numeric += 1 + + ratio_numeric = num_numeric / (num_numeric + num_non_numeric) + + # print("ratio_numeric", ratio_numeric) + + # if less than 5% of characters are numeric, return False + if ratio_numeric < 0.05: + return False + + # remove all non numeric with whitespace + text = replace_non_numeric_with_whitespace(text) + if text.count(" ") != 0: + # convert them to a list + ls = list(map(float, text.split())) + else: + ls = list(text) + + # print("After removing all non-numeric characters with whitespace", ls) + + # Check for incrementing in chunks + # Adding this to handle cases like "A.1 , A.2 , A.3 , A.4, B.1 , B.2, B.3, C.1" + ptr = 0 + min_max = {} + chunk_num = 0 + min_max[chunk_num] = (ls[ptr], ls[ptr+1], 2) + ptr += 1 + while ptr < len(ls)-1: + if ls[ptr] < ls[ptr+1]: + min_max[chunk_num] = ( + min(min_max[chunk_num][0], ls[ptr]), + max(min_max[chunk_num][1], ls[ptr+1]), + min_max[chunk_num][2] + 1 + ) + else: + chunk_num += 1 + if ptr+2 < len(ls): + min_max[chunk_num] = (ls[ptr+1], ls[ptr+2], 1) + else: + min_max[chunk_num] = (ls[ptr+1], ls[ptr+1], 1) + + ptr += 1 + + # remove chunks with size 1 + min_max = {k: v for k, v in min_max.items() if v[2] > 1} + + # if chunk ids are not consecutive, return False + chunksAreConsecutive = True + for i in range(len(min_max)-1): + if i+1 not in min_max: + chunksAreConsecutive = False + break + + # print("min_max", min_max) + + if chunksAreConsecutive: + # if all chunks have same min value and last chunk's max value is less than first chunk's max value, return True + for i in range(len(min_max)-1): + if min_max[i][0] != min_max[i+1][0]: + break + if i == len(min_max)-2 and min_max[i][1] <= min_max[0][1]: + return True + + # When the list is too small, it is not an incrementing sequence + # Some results to decide on the threshold (P, R, F1) + # Without Condition - 0.48 0.69 0.57 + # when threshold is 3 - 0.58 0.69 0.63 + # when threshold is 4 - 0.60 0.68 0.64 + # when threshold is 5 - 0.62 0.65 0.64 + # when threshold is 6 - 0.64 0.64 0.64 + # when threshold is 7 - 0.67 0.63 0.65 + # These values are subject to change based on the dataset and the code modifications done post procuring them + if len(ls) < 6: + return False + + index_to_remove = [] + # remove all repeating at fixed intervals + for i in range(len(ls)-1): + k = 1 + while k < len(ls): + indices = [] + anySame = False + for j in range(i, len(ls), k): + indices.append(j) + if ls[i] != ls[j]: + k += 1 + anySame = True + break + if not anySame and len(indices) > 1: + index_to_remove.extend(indices) + k += 1 + elif not anySame: + k += 1 + + # unravel the list + index_to_remove = list(set(index_to_remove)) + + new_list = [] + for i in range(len(ls)): + if i not in index_to_remove: + new_list.append(ls[i]) + ls = new_list + + # print("After removing repeating at fixed intervals", ls) + + # When post cleanup the list is too small, it is not an incrementing sequence + # This threshold leads to P, R, F1 of 0.71, 0.63, 0.67 + # These values are subject to change based on the dataset and the code modifications done post procuring them + if len(ls) < 4: + return False + + # Basic case where numbers are only increasing or decreasing + isIncreasing = True + isDecreasing = True + for i in range(len(ls)-1): + if ls[i] > ls[i+1]: + isIncreasing = False + if not isDecreasing: + break + if ls[i] < ls[i+1]: + isDecreasing = False + if not isIncreasing: + break + else: + isIncreasing = False + isDecreasing = False + break + + if (isIncreasing or isDecreasing) and len(ls) > 1: + return True + + # break the list into chunks where each chunk is increasing + increasing_chunks = [] + chunk = [] + for i in range(len(ls)-1): + if ls[i] <= ls[i+1]: + chunk.append(ls[i]) + else: + chunk.append(ls[i]) + increasing_chunks.append(chunk) + chunk = [] + if len(ls) > 1: + chunk.append(ls[-1]) + increasing_chunks.append(chunk) + + # break the list into chunks where each chunk is decreasing + decreasing_chunks = [] + chunk = [] + for i in range(len(ls)-1): + if ls[i] >= ls[i+1]: + chunk.append(ls[i]) + else: + chunk.append(ls[i]) + decreasing_chunks.append(chunk) + chunk = [] + if len(ls) > 1: + chunk.append(ls[-1]) + decreasing_chunks.append(chunk) + + #print lengths of chunks + # print("increasing_chunks", increasing_chunks) + # print("decreasing_chunks", decreasing_chunks) + + # if first chunk is of unequal size remove it + if len(increasing_chunks) >= 2 and len(increasing_chunks[0]) != len(increasing_chunks[1]): + increasing_chunks.pop(0) + if len(decreasing_chunks) >= 2 and len(decreasing_chunks[0]) != len(decreasing_chunks[1]): + decreasing_chunks.pop(0) + + # if last chunk is of unequal size remove it + if len(increasing_chunks) >= 2 and len(increasing_chunks[-1]) != len(increasing_chunks[-2]): + increasing_chunks.pop(-1) + if len(decreasing_chunks) >= 2 and len(decreasing_chunks[-1]) != len(decreasing_chunks[-2]): + decreasing_chunks.pop(-1) + + # if any chunk is of unequal size return False + for chunk in increasing_chunks: + if len(chunk) != len(increasing_chunks[0]): + return False + for chunk in decreasing_chunks: + if len(chunk) != len(decreasing_chunks[0]): + return False + + #print lengths of chunks + # print("increasing_chunks", increasing_chunks) + # print("decreasing_chunks", decreasing_chunks) + + if len(increasing_chunks) > 1: + isIncreasing_increasing_chunks = [True]*len(increasing_chunks[0]) + isDecreasing_increasing_chunks = [True]*len(increasing_chunks[0]) + for i in range(len(increasing_chunks)-1): + for j in range(len(increasing_chunks[i])): + if increasing_chunks[i][j] < increasing_chunks[i+1][j]: + isDecreasing_increasing_chunks[j] = False + if not isIncreasing_increasing_chunks[j]: + break + if increasing_chunks[i][j] > increasing_chunks[i+1][j]: + isIncreasing_increasing_chunks[j] = False + if not isDecreasing_increasing_chunks[j]: + break + else: + isIncreasing_increasing_chunks = [] + isDecreasing_increasing_chunks = [] + + if len(decreasing_chunks) > 1: + isIncreasing_decreasing_chunks = [True]*len(decreasing_chunks[0]) + isDecreasing_decreasing_chunks = [True]*len(decreasing_chunks[0]) + for i in range(len(decreasing_chunks)-1): + for j in range(len(decreasing_chunks[i])): + if decreasing_chunks[i][j] < decreasing_chunks[i+1][j]: + isDecreasing_decreasing_chunks[j] = False + if not isIncreasing_decreasing_chunks[j]: + break + if decreasing_chunks[i][j] > decreasing_chunks[i+1][j]: + isIncreasing_decreasing_chunks[j] = False + if not isDecreasing_decreasing_chunks[j]: + break + else: + isIncreasing_decreasing_chunks = [] + isDecreasing_decreasing_chunks = [] + + largest_chunk_size = max(len(isIncreasing_increasing_chunks), len(isDecreasing_increasing_chunks), len(isIncreasing_decreasing_chunks), len(isDecreasing_decreasing_chunks)) + if len(isIncreasing_increasing_chunks) < largest_chunk_size: + isIncreasing_increasing_chunks.extend([False]*(largest_chunk_size - len(isIncreasing_increasing_chunks))) + if len(isDecreasing_increasing_chunks) < largest_chunk_size: + isDecreasing_increasing_chunks.extend([False]*(largest_chunk_size - len(isDecreasing_increasing_chunks))) + if len(isIncreasing_decreasing_chunks) < largest_chunk_size: + isIncreasing_decreasing_chunks.extend([False]*(largest_chunk_size - len(isIncreasing_decreasing_chunks))) + if len(isDecreasing_decreasing_chunks) < largest_chunk_size: + isDecreasing_decreasing_chunks.extend([False]*(largest_chunk_size - len(isDecreasing_decreasing_chunks))) + + # print("isIncreasing_increasing_chunks", isIncreasing_increasing_chunks) + # print("isDecreasing_increasing_chunks", isDecreasing_increasing_chunks) + # print("isIncreasing_decreasing_chunks", isIncreasing_decreasing_chunks) + # print("isDecreasing_decreasing_chunks", isDecreasing_decreasing_chunks) + + if len(isIncreasing_increasing_chunks) >= 1: + resp = isIncreasing_decreasing_chunks[0] or isDecreasing_decreasing_chunks[0] or isIncreasing_increasing_chunks[0] or isDecreasing_increasing_chunks[0] + for i,j,k,l in zip(isIncreasing_increasing_chunks, isDecreasing_increasing_chunks, isIncreasing_decreasing_chunks, isDecreasing_decreasing_chunks): + resp = resp and (i or j or k or l) + if resp: + return True + + return False + +if __name__ == "__main__": + + samp = r""" + "A.1 , A.2 , A.3 , A.4, B.1 , B.2, B.3, C.1" + """ + print(incrementing_sequences_filter(samp)) \ No newline at end of file diff --git a/filters/test_highly_repetitive.py b/filters/test_highly_repetitive.py new file mode 100644 index 0000000..a9c7697 --- /dev/null +++ b/filters/test_highly_repetitive.py @@ -0,0 +1,73 @@ +from .highly_repetitive import break_and_compare, break_and_compare_wrapper +# Test cases for break_and_compare + +# Test case 1: Matching chunks exist +def test_break_and_compare_matching_chunks_exist(): + ls = [1, 2, 3, 1, 2, 3, 1, 2, 3] + k = 3 + expected = [1, 2, 3] + output = break_and_compare(ls, k) + assert output == expected, f"Test case 1 failed. Output: {output}, Expected: {expected}" + +# Test case 2: No matching chunks +def test_break_and_compare_no_matching_chunks(): + ls = [1, 2, 3, 4, 5, 6, 7] + k = 3 + expected = [] + output = break_and_compare(ls, k) + assert output == expected, f"Test case 2 failed. Output: {output}, Expected: {expected}" + +# Test case 3: Empty list +def test_break_and_compare_empty_list(): + ls = [] + k = 4 + expected = [] + output = break_and_compare(ls, k) + assert output == expected, f"Test case 3 failed. Output: {output}, Expected: {expected}" + +# Test case 4: Chunk size larger than list length +def test_break_and_compare_chunk_size_larger_than_list_length(): + ls = [1, 2, 3] + k = 4 + expected = [] + output = break_and_compare(ls, k) + assert output == expected, f"Test case 4 failed. Output: {output}, Expected: {expected}" + +# Test cases for break_and_compare_wrapper + +# Test case 1: Matching chunks within the range +def test_break_and_compare_wrapper_matching_chunks_within_range(): + ls = [1, 2, 3, 1, 2, 3, 1, 2, 3] + start_k = 2 + end_k = 4 + expected = ([1, 2, 3], 2) + output = break_and_compare_wrapper(ls, start_k, end_k) + assert output == expected, f"Test case 1 failed. Output: {output}, Expected: {expected}" + +# Test case 2: No matching chunks within the range +def test_break_and_compare_wrapper_no_matching_chunks_within_range(): + ls = [1, 2, 3, 4, 5, 6, 7] + start_k = 2 + end_k = 5 + expected = ([], -1) + output = break_and_compare_wrapper(ls, start_k, end_k) + assert output == expected, f"Test case 2 failed. Output: {output}, Expected: {expected}" + +# Test case 3: Empty list with range +def test_break_and_compare_wrapper_empty_list_with_range(): + ls = [] + start_k = 1 + end_k = 3 + expected = ([], -1) + output = break_and_compare_wrapper(ls, start_k, end_k) + assert output == expected, f"Test case 3 failed. Output: {output}, Expected: {expected}" + +# Test case 4: Single-element list with range +def test_break_and_compare_wrapper_single_element_list_with_range(): + ls = [1] + start_k = 1 + end_k = 3 + expected = ([1], 1) + output = break_and_compare_wrapper(ls, start_k, end_k) + assert output == expected, f"Test case 4 failed. Output: {output}, Expected: {expected}" + diff --git a/filters/test_pattern_incrementing.py b/filters/test_pattern_incrementing.py new file mode 100644 index 0000000..57940de --- /dev/null +++ b/filters/test_pattern_incrementing.py @@ -0,0 +1,41 @@ +from .pattern_incrementing import incrementing_sequences_filter + + +def test_pattern_incrementing_no_space(): + text = "123456789" + assert incrementing_sequences_filter(text) == True + + +def test_pattern_incrementing_no_space_with_char(): + text = "1A23456789" + assert incrementing_sequences_filter(text) == False + + +def test_pattern_incrementing(): + text = "12.8. 12.9. 13.0. 13.1. 13.2. 13.3." + assert incrementing_sequences_filter(text) == True + + +def test_pattern_new_lines_incrementing(): + text = "128.\n129.\n130.\n131.\n132.\n133." + assert incrementing_sequences_filter(text) == True + + +def test_pattern_list_incrementing(): + text = "- 128.\n- 129.\n- 130.\n- 131.\n- 132.\n- 133." + assert incrementing_sequences_filter(text) == True + + +def test_incrementing_nonnumerical_pattern(): + text = """ +![](edinbmedj75052-0047-b){#f5.123} +![](edinbmedj75052-0049-a){#f6.125} +![](edinbmedj75052-0049-b){#f7.125} +![](edin +""" + assert incrementing_sequences_filter(text) == True + + +def test_incrementing_seminnumerical_pattern(): + text = "A.1 , A.2 , A.3 , A.4, B.1 , B.2, B.3, C.1" + assert incrementing_sequences_filter(text) == True \ No newline at end of file diff --git a/filters/testing_over_annotations/test.py b/filters/testing_over_annotations/test.py new file mode 100644 index 0000000..3f517c9 --- /dev/null +++ b/filters/testing_over_annotations/test.py @@ -0,0 +1,60 @@ +import pandas as pd +import os +import sys +from transformers import AutoTokenizer +sys.path.append(os.path.dirname(os.path.dirname(os.path.realpath(__file__)))) +from pattern_incrementing import incrementing_sequences_filter +from highly_repetitive import break_and_compare_wrapper + +df = pd.read_csv('filters/testing_over_annotations/test_data_full.csv') +df = df[['shortened_text', 'Category']] + +all_categories = ['code', 'nl', 'pattern-incrementing', 'pattern-repeating', 'duplicated', + 'template', 'code+nl', 'empty/blank', 'other', 'random', 'structured'] + +target_category = "pattern-repeating" +use_tokenizer = True + +df.dropna(inplace=True) + +text = df['shortened_text'].to_list() +category = df['Category'].to_list() + +tokenizer = AutoTokenizer.from_pretrained( + "EleutherAI/pythia-70m-deduped", +) + +ls = [] +ls_true = [0]*len(category) +for t in text: + # remove newlines + # replace all non alphanumeric characters with spaces + t = t.replace('\n', ' ').replace('\r', ' ').replace('\t', ' ') + # replace multiple spaces with single space + t = ' '.join(t.split()) + if use_tokenizer: + t = tokenizer(t)['input_ids'] + resp = break_and_compare_wrapper(t, 2, 10) + if resp[-1] != -1: + ls.append(target_category) + else: + ls.append("") +print(ls) +print(category) + +from sklearn.metrics import classification_report +from sklearn.preprocessing import LabelEncoder +encoder = LabelEncoder() +encoder.fit(ls + category) + +encoded_list1 = encoder.transform(ls) +encoded_list2 = encoder.transform(category) + +print(encoded_list1) +print(encoded_list2) + +# Print classification report after a train/test split: +print(classification_report(encoded_list2, encoded_list1)) + +df['predicted'] = ls +df.to_csv('filters/testing_over_annotations/test_data_full_pred.csv', index=False) \ No newline at end of file