This repository has been archived by the owner on Dec 17, 2021. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 147
/
a11y.py
176 lines (132 loc) · 4.38 KB
/
a11y.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
import json
import logging
import os
import requests
import yaml
from utils import utils
workers = 3
pa11y = os.environ.get("PA11Y_PATH", "pa11y")
redirects = {}
config = ""
def init(environment, options):
global redirects
global config
cache_dir = options.get("_", {}).get("cache_dir", "./cache")
redirects_file = options.get("a11y_redirects")
config_file = options.get("a11y_config")
# Parse redirects
if redirects_file:
if not redirects_file.endswith(".yml"):
logging.error("--a11y_redirects should be a YML file")
return False
# if remote, try to download
if redirects_file.startswith("http:") or redirects_file.startswith("https:"):
redirects_path = os.path.join(cache_dir, "a11y_redirects.yml")
try:
response = requests.get(redirects_file)
utils.write(response.text, redirects_path)
except:
logging.error("--a11y_redirects URL not downloaded successfully.")
return False
# Otherwise, read it off the disk
else:
redirects_path = redirects_file
if (not os.path.exists(redirects_path)):
logging.error("--a11y_redirects file not found.")
return False
with open(redirects_path, 'r') as f:
redirects = yaml.load(f)
# Get config
if config_file:
if not config_file.endswith(".json"):
logging.error("--a11y_config should be a json file")
return False
# if remote, try to download
if config_file.startswith("http:") or config_file.startswith("https:"):
config_path = os.path.join(cache_dir, "a11y_config.json")
try:
response = requests.get(config_file)
utils.write(response.text, config_path)
except:
logging.error("--a11y_config URL not downloaded successfully.")
return False
config = config_path
return True
# If we have pshtt data, use it to skip some domains. If redirect
# data says so, adjust scan URL for some domains.
def init_domain(domain, environment, options):
cache_dir = options.get("_", {}).get("cache_dir", "./cache")
# If we've got pshtt data, use it to cut down work.
if (
utils.domain_is_redirect(domain, cache_dir=cache_dir) or
utils.domain_not_live(domain, cache_dir=cache_dir)
):
logging.debug("\tSkipping a11y scan based on pshtt data.")
return False
# Use redirect/blacklist data to adjust (or stop) scan URL.
url = get_url_to_scan(domain)
if not url:
logging.debug("\tSkipping a11y scan based on redirect/blacklist data.")
return False
# Send adjusted URL to scan function.
return {'url': url}
# Shell out to a11y and run the scan.
def scan(domain, environment, options):
url = environment.get("url", domain)
errors = run_a11y_scan(url)
return {
'url': url,
'errors': errors
}
def to_rows(data):
rows = []
for error in data['errors']:
rows.append([
data['url'],
error['typeCode'],
error['code'],
error['message'],
error['context'],
error['selector']
])
return rows
headers = [
"redirectedTo",
"typeCode",
"code",
"message",
"context",
"selector"
]
def run_a11y_scan(domain):
command = [pa11y, domain, "--reporter", "json", "--level", "none", "--timeout", "300000"]
if config:
command += ["--config", config]
raw = utils.scan(command)
if not raw or raw == '[]\n':
results = [{
'typeCode': '',
'code': '',
'message': '',
'context': '',
'selector': '',
'type': ''
}]
else:
results = json.loads(raw)
return results
def get_url_to_scan(domain):
global redirects
url_to_scan = None
# Redirects can be blacklists or redirects.
if domain in redirects:
# blacklist will force a domain to be skipped
if redirects[domain]['blacklist']:
url_to_scan = None
# otherwise, scan will be redirected to new location
else:
url_to_scan = redirects[domain]['redirect']
# Otherwise, leave domain alone.
else:
url_to_scan = domain
return url_to_scan