Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

v2 #107

Open
fabriziosalmi opened this issue Oct 16, 2023 · 6 comments
Open

v2 #107

fabriziosalmi opened this issue Oct 16, 2023 · 6 comments
Labels
enhancement New feature or request
Milestone

Comments

@fabriziosalmi
Copy link
Owner

sanitize.py

import re
import tldextract
from tqdm import tqdm

# Pre-compiled regex pattern for FQDN validation
fqdn_pattern = re.compile('^(?!-)[A-Za-z0-9-]{1,63}(?<!-)$')

def is_valid_fqdn(s):
    """Check if the string is a valid FQDN."""
    if '*' in s or not s:
        return False
    extracted = tldextract.extract(s)
    if not all([extracted.domain, extracted.suffix]):
        return False
    return all(fqdn_pattern.match(x) for x in s.split('.'))

def remove_prefix(line, prefix):
    """General function to remove specified prefix from a line."""
    if line.startswith(prefix):
        potential_fqdn = line[len(prefix):]
        if is_valid_fqdn(potential_fqdn):
            return potential_fqdn
    return line

def sanitize_line(line, rules):
    """Apply all sanitization rules to a line."""
    for rule in rules:
        line = rule(line.strip())
        if line is None:
            return None
    return line

def get_sanitization_rules():
    """Returns a list of sanitization rules."""
    return [
        lambda line: None if line.startswith("#") else line,       # Remove comment lines
        lambda line: remove_prefix(line, "127.0.0.1"),             # Remove IP prefix 127.0.0.1 without space
        lambda line: remove_prefix(line, "127.0.0.1 "),            # Remove IP prefix 127.0.0.1 with space
        lambda line: remove_prefix(line, "0.0.0.0"),               # Remove IP prefix 0.0.0.0 without space
        lambda line: remove_prefix(line, "0.0.0.0 "),              # Remove IP prefix 0.0.0.0 with space
        lambda line: remove_prefix(line, "||"),                    # Remove double pipes
        lambda line: remove_prefix(line, "http:https://"),               # Remove http prefix
        lambda line: remove_prefix(line, "https://"),              # Remove https prefix
        lambda line: line.rstrip('.'),                             # Remove trailing dot
        lambda line: line.lower()                                  # Convert to lowercase
    ]

def process_large_file(input_file_path, output_file_path):
    """Process large files line by line and track progress."""
    unique_domains = set()
    rules = get_sanitization_rules()

    with open(input_file_path, 'r') as infile:
        total_lines = sum(1 for _ in infile)
        infile.seek(0)  # Reset file pointer to start
        with tqdm(total=total_lines, desc="Processing") as pbar:
            for line in infile:
                sanitized_line = sanitize_line(line, rules)
                if sanitized_line and is_valid_fqdn(sanitized_line):
                    unique_domains.add(sanitized_line)
                pbar.update(1)

    return unique_domains

def write_to_output_file(unique_domains, output_file_path):
    """Write unique domains to the output file and track progress."""
    # Sort the unique domain names in alphabetical order
    sorted_unique_domains = sorted(unique_domains)

    with open(output_file_path, 'w') as outfile:
        with tqdm(total=len(sorted_unique_domains), desc="Writing") as pbar:
            for domain in sorted_unique_domains:
                outfile.write(domain + '\n')
                pbar.update(1)

if __name__ == "__main__":
    input_file_path = 'input.txt'
    output_file_path = 'output.txt'

    unique_domains = process_large_file(input_file_path, output_file_path)
    write_to_output_file(unique_domains, output_file_path)
@fabriziosalmi fabriziosalmi added the enhancement New feature or request label Oct 16, 2023
@fabriziosalmi fabriziosalmi added this to the Improvements milestone Oct 16, 2023
@fabriziosalmi
Copy link
Owner Author

whitelist.py

import os
from pathlib import Path
import argparse
from tqdm import tqdm

def read_fqdn_from_file(file_path: Path, description: str) -> set:
    """Read the file and return a set of FQDNs with a progress bar."""
    with file_path.open('r') as file:
        fqdns = set()
        total_lines = sum(1 for _ in file)
        file.seek(0)
        with tqdm(total=total_lines, desc=description, unit="lines", leave=False) as pbar:
            for line in file:
                fqdn = line.strip()
                fqdns.add(fqdn)
                pbar.update(1)
        return fqdns

def write_fqdn_to_file(file_path: Path, content: set, description: str) -> None:
    """Write a set of FQDNs to the specified file with a progress bar."""
    with file_path.open('w') as file:
        total_fqdns = len(content)
        with tqdm(total=total_fqdns, desc=description, unit="lines", leave=False) as pbar:
            for fqdn in content:
                file.write(fqdn + '\n')
                pbar.update(1)

def ensure_file_exists(file_path: Path) -> None:
    """Check if a file exists or exit the program."""
    if not file_path.is_file():
        print(f"ERROR: File '{file_path}' not found.")
        exit(1)

def main(blacklist_path: Path, whitelist_path: Path, output_path: Path) -> None:
    """Main function to process blacklist and whitelist files."""
    
    # Check if files exist
    ensure_file_exists(blacklist_path)
    ensure_file_exists(whitelist_path)

    blacklist_fqdns = read_fqdn_from_file(blacklist_path, f"Reading {blacklist_path}")
    whitelist_fqdns = read_fqdn_from_file(whitelist_path, f"Reading {whitelist_path}")

    # Filter out whitelisted FQDNs from the blacklist
    filtered_fqdns = blacklist_fqdns - whitelist_fqdns

    write_fqdn_to_file(output_path, filtered_fqdns, f"Writing {output_path}")

    print(f"Blacklist: {len(blacklist_fqdns)} FQDNs.")
    print(f"Whitelist: {len(whitelist_fqdns)} FQDNs.")
    print(f"After Filtering: {len(filtered_fqdns)} FQDNs.")

if __name__ == '__main__':
    parser = argparse.ArgumentParser(description="Process blacklist and whitelist files.")
    parser.add_argument('--blacklist', default='blacklist.txt', type=Path, help='Path to blacklist file')
    parser.add_argument('--whitelist', default='whitelist.txt', type=Path, help='Path to whitelist file')
    parser.add_argument('--output', default='filtered_blacklist.txt', type=Path, help='Path to output file')
    
    args = parser.parse_args()

    try:
        main(args.blacklist, args.whitelist, args.output)
    except Exception as e:
        print(f"ERROR: {e}")
        exit(1)

@fabriziosalmi
Copy link
Owner Author

generate_fqdn.sh

#!/bin/bash

# Description: Setup script for maintaining a domain blacklist.

# Function to display an error message and exit
die() {
  echo "$1" >&2
  exit 1
}

# Check if running with sudo
[ "$EUID" -eq 0 ] || die "Please run this script with sudo."

# Update and install prerequisites
echo "Updating package list..."
sudo apt-get update || die "Failed to update package list."
echo "Installing required packages..."
sudo apt-get install -y python3 python3-pip pv ncftp || die "Failed to install packages."

# Upgrade Python and pip
echo "Upgrading Python and pip..."
python3 -m ensurepip --upgrade || die "Failed to upgrade pip."
pip3 install --no-cache-dir --upgrade pip setuptools tldextract tqdm || die "Failed to upgrade pip packages."

# Function to download a URL
download_url() {
  local url="$1"
  local random_filename=$(uuidgen | tr -dc '[:alnum:]')

  echo "Downloading blacklist: $url"
  
  if wget -q --progress=bar:force -O "$random_filename.fqdn.list" "$url"; then
    echo "Downloaded: $url"
  else
    echo "Failed to download: $url"
  fi
}

# Download URLs from the list
LISTS="blacklists.fqdn.urls"
echo "Download blacklists"
while read -r url; do
  download_url "$url"
done < "$LISTS"

# Aggregate blacklists
echo "Aggregate blacklists"
cat *.fqdn.list | sort -u > all.fqdn.blacklist
rm -f *.fqdn.list

# Sanitize blacklists
mv all.fqdn.blacklist input.txt
python sanitize.py
mv output.txt all.fqdn.blacklist

# Remove whitelisted domains
mv all.fqdn.blacklist blacklist.txt
python whitelist.py
mv filtered_blacklist.txt all.fqdn.blacklist
rm blacklist.txt input.txt

total_lines_new=$(wc -l < all.fqdn.blacklist)
echo "Total domains: $total_lines_new."

@fabriziosalmi
Copy link
Owner Author

scripts/update_rpz_blacklist.sh

#!/bin/bash

# ==========================================
# RPZ BLACKLIST UPDATER SCRIPT
# ==========================================

# List of required commands
REQUIRED_COMMANDS=("wget" "tar" "systemctl" "grep" "mkdir" "cat" "date" "named-checkconf")

# Check if required commands are installed
for cmd in "${REQUIRED_COMMANDS[@]}"; do
  if ! command -v "$cmd" >/dev/null 2>&1; then
    echo "Error: $cmd is required but not installed. Exiting."
    exit 1
  fi
done

# Directory to store the RPZ blacklist
RPZ_DIRECTORY="/path/to/store/rpz_blacklist"
# URL of the RPZ blacklist
RPZ_URL="https://github.com/fabriziosalmi/blacklists/raw/main/rpz_blacklist.tar.gz"
# BIND configuration file
BIND_CONFIG="/etc/bind/named.conf.local"

# Ensure the directory for the RPZ blacklist exists
mkdir -p "$RPZ_DIRECTORY"

# Download the latest RPZ blacklist from the repository
wget -O "$RPZ_DIRECTORY/rpz_blacklist.tar.gz" "$RPZ_URL"

# Extract the blacklist
tar -xzf "$RPZ_DIRECTORY/rpz_blacklist.tar.gz" -C "$RPZ_DIRECTORY"

# Check if the configuration is already added to avoid duplicate entries
if ! grep -q "rpz.blacklist" "$BIND_CONFIG"; then
    # Append configuration to BIND's config file
    echo "zone \"rpz.blacklist\" {
        type master;
        file \"$RPZ_DIRECTORY/rpz_blacklist.txt\";
    };" >> "$BIND_CONFIG"

    echo "options {
        response-policy { zone \"rpz.blacklist\"; };
    };" >> "$BIND_CONFIG"
fi

# Check BIND configuration
if ! named-checkconf "$BIND_CONFIG"; then
    echo "Error in BIND configuration. Please check manually!"
    exit 1
fi

echo "Script executed successfully!"

# To manually reload BIND and apply the new blacklist:
# sudo systemctl reload bind9
# You can also schedule this script using cron for automation.
# For example, to run it daily at 2 AM:
# crontab -e
# Add:
# 0 2 * * * /path/to/this_script/update_rpz_blacklist.sh

@fabriziosalmi
Copy link
Owner Author

scripts/nft_blacklist_fqdn.sh

#!/bin/bash

print_error() {
  echo "Error: $1" >&2
  exit 1
}

print_success() {
  echo "Success: $1"
}

validate_domain() {
  local domain="$1"
  local domain_regex="^((?!-)[A-Za-z0-9-]{1,63}(?<!-)\.)+[A-Za-z]{2,63}$"
  [[ ! "$domain" =~ $domain_regex ]] && print_error "Invalid domain name: $domain"
}

readonly BLACKLIST_URL="https://github.com/fabriziosalmi/blacklists/releases/download/latest/blacklist.txt"
readonly INPUT_FILE="/tmp/all.fqdn.blacklist"
readonly RULES_FILE="nftables_rules.nft"
readonly TABLE_NAME="filter"
readonly CHAIN_NAME="input_drop"

if ! wget -q -O "$INPUT_FILE" "$BLACKLIST_URL"; then
  print_error "Failed to download the blacklist from $BLACKLIST_URL"
fi

[[ ! -r "$INPUT_FILE" ]] && print_error "Input file not found or not readable: $INPUT_FILE"

{
  echo "#!/usr/sbin/nft -f"
  echo "flush ruleset"
  echo "table $TABLE_NAME {"
  echo "    chain $CHAIN_NAME {"

  while IFS= read -r domain || [[ -n "$domain" ]]; do
    validate_domain "$domain"
    echo "        drop ip daddr $domain"
    echo "        drop ip saddr $domain"
  done < "$INPUT_FILE"

  echo "    }"
  echo "}"
} > "$RULES_FILE"

nft -f "$RULES_FILE" || print_error "Error applying nftables rules. Ensure you have the necessary privileges."

rm -f "$INPUT_FILE" "$RULES_FILE"

@fabriziosalmi
Copy link
Owner Author

docker/pihole-squid/squid/update_blacklist.sh

#!/bin/bash

# Define the URL for the latest blacklist
blacklist_url="https://get.domainsblacklists.com/blacklist.txt"
blacklist_file="/etc/squid/conf.d/blacklist.txt"

# Check if 'wget' is installed
if ! command -v wget &> /dev/null; then
    echo "Error: 'wget' is not installed. Please install it."
    exit 1
fi

# Download the latest blacklist and handle errors
if wget -O "$blacklist_file" "$blacklist_url"; then
    echo "Blacklist updated successfully."

    # Check if Squid is installed and restart it
    if command -v squid &> /dev/null; then
        service squid restart
        echo "Squid restarted to apply the changes."
    else
        echo "Warning: Squid is not installed. Please install and configure it separately."
    fi
else
    echo "Error: Failed to update the blacklist. Please check the URL or your internet connection."
    exit 1
fi

@fabriziosalmi
Copy link
Owner Author

sanitize.py

import re
import tldextract
from tqdm import tqdm

# Improved regex pattern for FQDN validation
fqdn_pattern = re.compile(r'^(?!-)[A-Za-z0-9-]{1,63}(?<!-)$')

def is_valid_fqdn(s):
    """Check if the string is a valid FQDN."""
    if '*' in s or not s:
        return False
    extracted = tldextract.extract(s)
    if not all([extracted.domain, extracted.suffix]):
        return False
    return all(fqdn_pattern.match(x) for x in s.split('.'))

def remove_prefix(line, prefixes):
    """General function to remove specified prefixes from a line."""
    for prefix in prefixes:
        if line.startswith(prefix):
            potential_fqdn = line[len(prefix):]
            if is_valid_fqdn(potential_fqdn):
                return potential_fqdn
    return line

def sanitize_line(line, rules):
    """Apply all sanitization rules to a line using list comprehension for efficiency."""
    for rule in rules:
        line = rule(line.strip())
        if line is None:
            return None
    return line

def get_sanitization_rules():
    """Returns a list of sanitization rules, utilizing a single function for prefix removal."""
    prefixes = ["127.0.0.1 ", "127.0.0.1", "0.0.0.0 ", "0.0.0.0", "||", "http:https://", "https://"]
    return [
        lambda line: None if line.startswith("#") else line,
        lambda line: remove_prefix(line, prefixes),
        lambda line: line.rstrip('.'),
        lambda line: line.lower()
    ]

def process_large_file(input_file_path, output_file_path):
    """Process large files line by line with optimized file reading and writing."""
    unique_domains = set()
    rules = get_sanitization_rules()

    with open(input_file_path, 'r') as infile, open(output_file_path, 'w') as outfile:
        total_lines = sum(1 for _ in infile)
        infile.seek(0)  # Reset file pointer to start
        for line in tqdm(infile, total=total_lines, desc="Processing"):
            sanitized_line = sanitize_line(line, rules)
            if sanitized_line and is_valid_fqdn(sanitized_line):
                unique_domains.add(sanitized_line)

        # Sort and write the unique domain names to the output file
        for domain in tqdm(sorted(unique_domains), desc="Writing"):
            outfile.write(domain + '\n')

# Example usage
process_large_file('input.txt', 'output.txt')

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

1 participant