forked from microsoft/autogen
-
Notifications
You must be signed in to change notification settings - Fork 0
/
browser_utils.py
282 lines (236 loc) · 11.3 KB
/
browser_utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
import json
import os
import requests
import re
import markdownify
import io
import uuid
import mimetypes
from urllib.parse import urljoin, urlparse
from bs4 import BeautifulSoup
from typing import Any, Dict, List, Optional, Union, Tuple
# Optional PDF support
IS_PDF_CAPABLE = False
try:
import pdfminer
import pdfminer.high_level
IS_PDF_CAPABLE = True
except ModuleNotFoundError:
pass
# Other optional dependencies
try:
import pathvalidate
except ModuleNotFoundError:
pass
class SimpleTextBrowser:
"""(In preview) An extremely simple text-based web browser comparable to Lynx. Suitable for Agentic use."""
def __init__(
self,
start_page: Optional[str] = None,
viewport_size: Optional[int] = 1024 * 8,
downloads_folder: Optional[Union[str, None]] = None,
bing_api_key: Optional[Union[str, None]] = None,
request_kwargs: Optional[Union[Dict[str, Any], None]] = None,
):
self.start_page: str = start_page if start_page else "about:blank"
self.viewport_size = viewport_size # Applies only to the standard uri types
self.downloads_folder = downloads_folder
self.history: List[str] = list()
self.page_title: Optional[str] = None
self.viewport_current_page = 0
self.viewport_pages: List[Tuple[int, int]] = list()
self.set_address(self.start_page)
self.bing_api_key = bing_api_key
self.request_kwargs = request_kwargs
self._page_content = ""
@property
def address(self) -> str:
"""Return the address of the current page."""
return self.history[-1]
def set_address(self, uri_or_path: str) -> None:
self.history.append(uri_or_path)
# Handle special URIs
if uri_or_path == "about:blank":
self._set_page_content("")
elif uri_or_path.startswith("bing:"):
self._bing_search(uri_or_path[len("bing:") :].strip())
else:
if not uri_or_path.startswith("http:") and not uri_or_path.startswith("https:"):
uri_or_path = urljoin(self.address, uri_or_path)
self.history[-1] = uri_or_path # Update the address with the fully-qualified path
self._fetch_page(uri_or_path)
self.viewport_current_page = 0
@property
def viewport(self) -> str:
"""Return the content of the current viewport."""
bounds = self.viewport_pages[self.viewport_current_page]
return self.page_content[bounds[0] : bounds[1]]
@property
def page_content(self) -> str:
"""Return the full contents of the current page."""
return self._page_content
def _set_page_content(self, content: str) -> None:
"""Sets the text content of the current page."""
self._page_content = content
self._split_pages()
if self.viewport_current_page >= len(self.viewport_pages):
self.viewport_current_page = len(self.viewport_pages) - 1
def page_down(self) -> None:
self.viewport_current_page = min(self.viewport_current_page + 1, len(self.viewport_pages) - 1)
def page_up(self) -> None:
self.viewport_current_page = max(self.viewport_current_page - 1, 0)
def visit_page(self, path_or_uri: str) -> str:
"""Update the address, visit the page, and return the content of the viewport."""
self.set_address(path_or_uri)
return self.viewport
def _split_pages(self) -> None:
# Split only regular pages
if not self.address.startswith("http:") and not self.address.startswith("https:"):
self.viewport_pages = [(0, len(self._page_content))]
return
# Handle empty pages
if len(self._page_content) == 0:
self.viewport_pages = [(0, 0)]
return
# Break the viewport into pages
self.viewport_pages = []
start_idx = 0
while start_idx < len(self._page_content):
end_idx = min(start_idx + self.viewport_size, len(self._page_content)) # type: ignore[operator]
# Adjust to end on a space
while end_idx < len(self._page_content) and self._page_content[end_idx - 1] not in [" ", "\t", "\r", "\n"]:
end_idx += 1
self.viewport_pages.append((start_idx, end_idx))
start_idx = end_idx
def _bing_api_call(self, query: str) -> Dict[str, Dict[str, List[Dict[str, Union[str, Dict[str, str]]]]]]:
# Make sure the key was set
if self.bing_api_key is None:
raise ValueError("Missing Bing API key.")
# Prepare the request parameters
request_kwargs = self.request_kwargs.copy() if self.request_kwargs is not None else {}
if "headers" not in request_kwargs:
request_kwargs["headers"] = {}
request_kwargs["headers"]["Ocp-Apim-Subscription-Key"] = self.bing_api_key
if "params" not in request_kwargs:
request_kwargs["params"] = {}
request_kwargs["params"]["q"] = query
request_kwargs["params"]["textDecorations"] = False
request_kwargs["params"]["textFormat"] = "raw"
request_kwargs["stream"] = False
# Make the request
response = requests.get("https://api.bing.microsoft.com/v7.0/search", **request_kwargs)
response.raise_for_status()
results = response.json()
return results # type: ignore[no-any-return]
def _bing_search(self, query: str) -> None:
results = self._bing_api_call(query)
web_snippets: List[str] = list()
idx = 0
for page in results["webPages"]["value"]:
idx += 1
web_snippets.append(f"{idx}. [{page['name']}]({page['url']})\n{page['snippet']}")
if "deepLinks" in page:
for dl in page["deepLinks"]:
idx += 1
web_snippets.append(
f"{idx}. [{dl['name']}]({dl['url']})\n{dl['snippet'] if 'snippet' in dl else ''}" # type: ignore[index]
)
news_snippets = list()
if "news" in results:
for page in results["news"]["value"]:
idx += 1
news_snippets.append(f"{idx}. [{page['name']}]({page['url']})\n{page['description']}")
self.page_title = f"{query} - Search"
content = (
f"A Bing search for '{query}' found {len(web_snippets) + len(news_snippets)} results:\n\n## Web Results\n"
+ "\n\n".join(web_snippets)
)
if len(news_snippets) > 0:
content += "\n\n## News Results:\n" + "\n\n".join(news_snippets)
self._set_page_content(content)
def _fetch_page(self, url: str) -> None:
try:
# Prepare the request parameters
request_kwargs = self.request_kwargs.copy() if self.request_kwargs is not None else {}
request_kwargs["stream"] = True
# Send a HTTP request to the URL
response = requests.get(url, **request_kwargs)
response.raise_for_status()
# If the HTTP request returns a status code 200, proceed
if response.status_code == 200:
content_type = response.headers.get("content-type", "")
for ct in ["text/html", "text/plain", "application/pdf"]:
if ct in content_type.lower():
content_type = ct
break
if content_type == "text/html":
# Get the content of the response
html = ""
for chunk in response.iter_content(chunk_size=512, decode_unicode=True):
html += chunk
soup = BeautifulSoup(html, "html.parser")
# Remove javascript and style blocks
for script in soup(["script", "style"]):
script.extract()
# Convert to markdown -- Wikipedia gets special attention to get a clean version of the page
if url.startswith("https://en.wikipedia.org/"):
body_elm = soup.find("div", {"id": "mw-content-text"})
title_elm = soup.find("span", {"class": "mw-page-title-main"})
if body_elm:
# What's the title
main_title = soup.title.string
if title_elm and len(title_elm) > 0:
main_title = title_elm.string
webpage_text = (
"# " + main_title + "\n\n" + markdownify.MarkdownConverter().convert_soup(body_elm)
)
else:
webpage_text = markdownify.MarkdownConverter().convert_soup(soup)
else:
webpage_text = markdownify.MarkdownConverter().convert_soup(soup)
# Convert newlines
webpage_text = re.sub(r"\r\n", "\n", webpage_text)
# Remove excessive blank lines
self.page_title = soup.title.string
self._set_page_content(re.sub(r"\n{2,}", "\n\n", webpage_text).strip())
elif content_type == "text/plain":
# Get the content of the response
plain_text = ""
for chunk in response.iter_content(chunk_size=512, decode_unicode=True):
plain_text += chunk
self.page_title = None
self._set_page_content(plain_text)
elif IS_PDF_CAPABLE and content_type == "application/pdf":
pdf_data = io.BytesIO(response.raw.read())
self.page_title = None
self._set_page_content(pdfminer.high_level.extract_text(pdf_data))
elif self.downloads_folder is not None:
# Try producing a safe filename
fname = None
try:
fname = pathvalidate.sanitize_filename(os.path.basename(urlparse(url).path)).strip()
except NameError:
pass
# No suitable name, so make one
if fname is None:
extension = mimetypes.guess_extension(content_type)
if extension is None:
extension = ".download"
fname = str(uuid.uuid4()) + extension
# Open a file for writing
download_path = os.path.abspath(os.path.join(self.downloads_folder, fname))
with open(download_path, "wb") as fh:
for chunk in response.iter_content(chunk_size=512):
fh.write(chunk)
# Return a page describing what just happened
self.page_title = "Download complete."
self._set_page_content(f"Downloaded '{url}' to '{download_path}'.")
else:
self.page_title = f"Error - Unsupported Content-Type '{content_type}'"
self._set_page_content(self.page_title)
else:
self.page_title = "Error"
self._set_page_content("Failed to retrieve " + url)
except requests.exceptions.RequestException as e:
self.page_title = "Error"
self._set_page_content(str(e))