Skip to content

Commit

Permalink
🥚🐍 Initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
victoriadrake committed Feb 6, 2020
1 parent ad24032 commit aa0744f
Show file tree
Hide file tree
Showing 6 changed files with 324 additions and 0 deletions.
21 changes: 21 additions & 0 deletions .github/workflows/run_tests.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
name: test

on:
push:
branches:
- master

jobs:
build:

runs-on: ubuntu-latest
strategy:
matrix:
python-version: [3.6, 3.7, 3.8]

steps:
- uses: actions/checkout@v2
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v1
- name: Run tests for Python ${{ matrix.python-version }}
run: python -m unittest tests/test.py
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
__pycache__
41 changes: 41 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# Hydra: multithreaded site-crawling link checker in Python

![Tests status badge](https://github.com/victoriadrake/hydra-link-checker/workflows/test/badge.svg)

A Python program that ~~crawls~~ slithers 🐍 a website for links and prints a YAML report of broken links.

## Requires

Python 3.6 or higher.

There are no external dependencies, Neo.

## Usage

Run in a terminal:

```sh
python hydra.py [URL]
```

Ensure `URL` is an absolute url including schema, i.e. `https://example.com`.

The report will be [YAML](https://yaml.org/) formatted. To save the output to a file, run:

```sh
python hydra.py [URL] > [PATH/TO/FILE.yaml]
```

To see how long Hydra takes to check your site, add `time`:

```sh
time python hydra.py [URL]
```

## Test

Run:

```sh
python -m unittest tests/test.py
```
172 changes: 172 additions & 0 deletions hydra.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
from concurrent import futures
from html.parser import HTMLParser
from queue import Queue, Empty
from urllib import error, parse, request
import gzip
import sys


class Parser(HTMLParser):
# Tags to check
TAGS = ["a", "link", "img", "script"]
# Valid attributes to check
ATTRS = ["href", "src"]

def __init__(self):
super(Parser, self).__init__()
self.links = []

def handle_starttag(self, tag, attrs):
if tag not in self.TAGS:
return
for a in attrs:
if a[0] in self.ATTRS:
self.links.append(a[1])

def feed_me(self, data):
self.links = []
self.feed(data)
return self.links

def error(self, msg):
return msg


class Checker:
TO_PROCESS = Queue()
# Maximum workers to run
THREADS = 20
# Maximum seconds to wait for HTTP response
TIMEOUT = 60

def __init__(self, url):
self.broken = []
self.domain = self.extract_domain(url)
self.visited = set()
self.pool = futures.ThreadPoolExecutor(max_workers=self.THREADS)

def extract_domain(self, l):
domain = parse.urlsplit(l).netloc
return domain

# Try to retreive contents of a page and record result
def load_url(self, page, timeout):
# Store the link to be checked and its parent in the result
result = {
"url": page["url"],
"parent": page["parent"],
"data": "",
"content_type": "",
}

# Use GET as HEAD is frequently not allowed
r = request.Request(
page["url"],
headers={
"User-Agent": "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:72.0) Gecko/20100101 Firefox/72.0"
},
)

try:
http_response = request.urlopen(r, timeout=self.TIMEOUT)
encoding = http_response.headers.get("Content-Encoding")
content_type = http_response.headers.get("Content-Type")
if encoding and "gzip" in encoding:
data = gzip.decompress(http_response.read()).decode(
encoding="utf-8", errors="ignore"
)
elif encoding is None:
data = http_response.read().decode(encoding="utf-8", errors="ignore")
else:
# Support for other less common directives not handled
raise NotImplementedError

result["data"] = data
result["content_type"] = content_type

except error.HTTPError as e:
code = e.getcode()
reason = e.reason
entry = {
"code": code,
"link": page["url"],
"parent": page["parent"],
"err": reason,
}
self.broken.append(entry)
except (
error.URLError,
UnicodeEncodeError,
UnicodeDecodeError,
NotImplementedError,
) as e:
code = 0
reason = e

entry = {
"code": code,
"link": page["url"],
"parent": page["parent"],
"err": reason,
}
self.broken.append(entry)

return result

def handle_future(self, result):
if result.result():
page = result.result()
self.parse_page(page)

# Get more links from successfully retrieved pages in the same domain
def parse_page(self, page):
if (
self.domain == self.extract_domain(page["url"])
and "text/html" in page["content_type"]
or "text/plain" in page["content_type"]
):
parent = page["url"]
parser = Parser()
links = parser.feed_me(page["data"])
new_links = [x for x in links if x not in self.visited]
full_links = [parse.urljoin(parent, l) for l in new_links]
for l in full_links:
if l not in self.visited:
li = {"parent": parent, "url": l}
self.TO_PROCESS.put(li)

# Parse broken links list into YAML report
def report(self):
self.report = "---\ntitle: Broken Link Report"
self.report += "\nchecked: " + str(len(self.visited))
self.report += "\nbroken: " + str(len(self.broken))
self.report += "\n---\n"
sorted_list = sorted(self.broken, key=lambda k: k["code"])
for link in sorted_list:
self.report += f"\n- code: {link['code']}\n url: {link['link']}\n parent: {link['parent']}\n error: {link['err']}\n"
return self.report

# Run crawler until TO_PROCESS queue is empty
def run(self):
while True:
try:
target_url = self.TO_PROCESS.get(block=True, timeout=self.TIMEOUT + 5)
if target_url["url"] not in self.visited:
self.visited.add(target_url["url"])
job = self.pool.submit(self.load_url, target_url, self.TIMEOUT)
job.add_done_callback(self.handle_future)
except Empty:
return
except Exception as e:
print(e)
continue


if __name__ == "__main__":
url = sys.argv[1]
first_url = {"parent": url, "url": url}

check = Checker(url)
check.TO_PROCESS.put(first_url)
check.run()
print(check.report())
30 changes: 30 additions & 0 deletions tests/data/test-page.html
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
<!DOCTYPE html>
<html>

<head>
<title>Test Data Page</title>

<meta charset="utf-8">
<meta http-equiv="Content-type" content="text/html; charset=UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1">
<link rel="stylesheet" href="style.css" type="text/css">
<script type="text/javascript" src="scripts.js"></script>
</head>

<body>
<div>
<h1>Test Data Page</h1>
<p>This page does not exist: <a href="http:https://baddomain.com/i-donut-exist">Whale</a></p>
<p>This is not a link: <a>No Spoon</a></p>
<img src="image.png" />
<p>This page does not exist: <a href="www.anotherbaddomain.com/multithreading-is-fun">Petunias</a></p>
<p>This page contains more links: <a href="https://example.com/i-have-links">Crawl Me</a></p>
<p>This domain is for use in illustrative examples in documents. You may use this
domain in literature without prior coordination or asking for permission: <a
href="https://example.com">Example</a></p>
</div>


</body>

</html>
59 changes: 59 additions & 0 deletions tests/test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
from hydra import Parser, Checker
import os
import unittest


HTMLDATA = os.path.join(os.path.dirname(__file__), "data/test-page.html")


class TestCases(unittest.TestCase):
# Open and close file from data/
def setUp(self):
self.testfile = open(HTMLDATA)
self.data = self.testfile.read()
self.url = "https://example.com"
self.check = Checker(self.url)
self.parser = Parser()

def tearDown(self):
self.testfile.close()

# Parser gives expected values
def test_parser_expected_output(self):
links = self.parser.feed_me(self.data)
expected_output = [
"style.css",
"scripts.js",
"http:https://baddomain.com/i-donut-exist",
"image.png",
"www.anotherbaddomain.com/multithreading-is-fun",
"https://example.com/i-have-links",
"https://example.com",
]
self.assertEqual(links, expected_output)

# Checker uses correct domain for comparison
def test_domain_extraction(self):
self.assertEqual(self.check.extract_domain(self.url), "example.com")

# Checker doesn't add visited links to queue
def test_process_queue_length(self):
self.pagedata = {
"url": "https://example.com/test-page.html",
"parent": "https://example.com/test-page.html",
"data": '<!DOCTYPE html>\n<html>\n\n <head>\n <title>Test Data Page</title>\n\n <meta charset="utf-8">\n <meta http-equiv="Content-type" content="text/html; charset=UTF-8">\n <meta name="viewport" content="width=device-width, initial-scale=1">\n <link rel="stylesheet" href="style.css" type="text/css">\n <script type="text/javascript" src="scripts.js"></script>\n </head>\n\n <body>\n <div>\n <h1>Test Data Page</h1>\n <p>This page does not exist: <a href="/i-donut-exist">Whale</a></p>\n <p>This is not a link: <a>No Spoon</a></p>\n <img src="image.png" />\n <p>This page does not exist: <a href="/multithreading-is-fun">Petunias</a></p>\n <p>This page contains more links: <a href="/i-have-links">Crawl Me</a></p>\n <p>This domain is for use in illustrative examples in documents. You may use this\n domain in literature without prior coordination or asking for permission: <a\n href="https://example.com">Example</a></p>\n </div>\n\n\n </body>\n\n</html>',
"content_type": "text/html; charset=utf-8",
}
# There are 7 links in pagedata["data"]
first_parse = 7
self.check.parse_page(self.pagedata)
self.assertEqual(len(self.check.TO_PROCESS.queue), first_parse)
self.check.visited.add("https://example.com/style.css")
# Checker should add to queue all but the one visited link
second_parse = 13
self.check.parse_page(self.pagedata)
self.assertEqual(len(self.check.TO_PROCESS.queue), second_parse)


if __name__ == "__main__":
unittest.main()

0 comments on commit aa0744f

Please sign in to comment.