Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sweep:check if book exist skip book download #6

Open
makhalaf opened this issue Aug 9, 2023 · 1 comment · May be fixed by #7
Open

sweep:check if book exist skip book download #6

makhalaf opened this issue Aug 9, 2023 · 1 comment · May be fixed by #7
Labels
sweep Assigns Sweep to an issue or pull request.

Comments

@makhalaf
Copy link
Owner

makhalaf commented Aug 9, 2023

check if book exist skip book download

@makhalaf makhalaf added the sweep Assigns Sweep to an issue or pull request. label Aug 9, 2023
@sweep-ai
Copy link

sweep-ai bot commented Aug 9, 2023

Here's the PR! #7.

⚡ Sweep Free Trial: I used GPT-3.5 to create this ticket. You have 0 GPT-4 tickets left for the month and 2 for the day. For more GPT-4 tickets, visit our payment portal.To get Sweep to recreate this ticket, leave a comment prefixed with "sweep:" or edit the issue.


Step 1: 🔍 Code Search

I found the following snippets in your repository. I will now analyze these snippets and come up with a plan.

Some code snippets I looked at (click to expand). If some file is missing from here, you can mention the path in the ticket description.

safaribooks/safaribooks.py

Lines 642 to 910 in 48ae994

divs = html_root.xpath("//div[contains(lower-case(@id), 'cover') or contains(lower-case(@class), 'cover') or"
"contains(lower-case(@name), 'cover') or contains(lower-case(@src), 'cover')]//img")
if len(divs):
return divs[0]
a = html_root.xpath("//a[contains(lower-case(@id), 'cover') or contains(lower-case(@class), 'cover') or"
"contains(lower-case(@name), 'cover') or contains(lower-case(@src), 'cover')]//img")
if len(a):
return a[0]
return None
def parse_html(self, root, first_page=False):
if random() > 0.8:
if len(root.xpath("//div[@class='controls']/a/text()")):
self.display.exit(self.display.api_error(" "))
book_content = root.xpath("//div[@id='sbo-rt-content']")
if not len(book_content):
self.display.exit(
"Parser: book content's corrupted or not present: %s (%s)" %
(self.filename, self.chapter_title)
)
page_css = ""
if len(self.chapter_stylesheets):
for chapter_css_url in self.chapter_stylesheets:
if chapter_css_url not in self.css:
self.css.append(chapter_css_url)
self.display.log("Crawler: found a new CSS at %s" % chapter_css_url)
page_css += "<link href=\"Styles/Style{0:0>2}.css\" " \
"rel=\"stylesheet\" type=\"text/css\" />\n".format(self.css.index(chapter_css_url))
stylesheet_links = root.xpath("//link[@rel='stylesheet']")
if len(stylesheet_links):
for s in stylesheet_links:
css_url = urljoin("https:", s.attrib["href"]) if s.attrib["href"][:2] == "//" \
else urljoin(self.base_url, s.attrib["href"])
if css_url not in self.css:
self.css.append(css_url)
self.display.log("Crawler: found a new CSS at %s" % css_url)
page_css += "<link href=\"Styles/Style{0:0>2}.css\" " \
"rel=\"stylesheet\" type=\"text/css\" />\n".format(self.css.index(css_url))
stylesheets = root.xpath("//style")
if len(stylesheets):
for css in stylesheets:
if "data-template" in css.attrib and len(css.attrib["data-template"]):
css.text = css.attrib["data-template"]
del css.attrib["data-template"]
try:
page_css += html.tostring(css, method="xml", encoding='unicode') + "\n"
except (html.etree.ParseError, html.etree.ParserError) as parsing_error:
self.display.error(parsing_error)
self.display.exit(
"Parser: error trying to parse one CSS found in this page: %s (%s)" %
(self.filename, self.chapter_title)
)
# TODO: add all not covered tag for `link_replace` function
svg_image_tags = root.xpath("//image")
if len(svg_image_tags):
for img in svg_image_tags:
image_attr_href = [x for x in img.attrib.keys() if "href" in x]
if len(image_attr_href):
svg_url = img.attrib.get(image_attr_href[0])
svg_root = img.getparent().getparent()
new_img = svg_root.makeelement("img")
new_img.attrib.update({"src": svg_url})
svg_root.remove(img.getparent())
svg_root.append(new_img)
book_content = book_content[0]
book_content.rewrite_links(self.link_replace)
xhtml = None
try:
if first_page:
is_cover = self.get_cover(book_content)
if is_cover is not None:
page_css = "<style>" \
"body{display:table;position:absolute;margin:0!important;height:100%;width:100%;}" \
"#Cover{display:table-cell;vertical-align:middle;text-align:center;}" \
"img{height:90vh;margin-left:auto;margin-right:auto;}" \
"</style>"
cover_html = html.fromstring("<div id=\"Cover\"></div>")
cover_div = cover_html.xpath("//div")[0]
cover_img = cover_div.makeelement("img")
cover_img.attrib.update({"src": is_cover.attrib["src"]})
cover_div.append(cover_img)
book_content = cover_html
self.cover = is_cover.attrib["src"]
xhtml = html.tostring(book_content, method="xml", encoding='unicode')
except (html.etree.ParseError, html.etree.ParserError) as parsing_error:
self.display.error(parsing_error)
self.display.exit(
"Parser: error trying to parse HTML of this page: %s (%s)" %
(self.filename, self.chapter_title)
)
return page_css, xhtml
@staticmethod
def escape_dirname(dirname, clean_space=False):
if ":" in dirname:
if dirname.index(":") > 15:
dirname = dirname.split(":")[0]
elif "win" in sys.platform:
dirname = dirname.replace(":", ",")
for ch in ['~', '#', '%', '&', '*', '{', '}', '\\', '<', '>', '?', '/', '`', '\'', '"', '|', '+', ':']:
if ch in dirname:
dirname = dirname.replace(ch, "_")
return dirname if not clean_space else dirname.replace(" ", "")
def create_dirs(self):
if os.path.isdir(self.BOOK_PATH):
self.display.log("Book directory already exists: %s" % self.BOOK_PATH)
else:
os.makedirs(self.BOOK_PATH)
oebps = os.path.join(self.BOOK_PATH, "OEBPS")
if not os.path.isdir(oebps):
self.display.book_ad_info = True
os.makedirs(oebps)
self.css_path = os.path.join(oebps, "Styles")
if os.path.isdir(self.css_path):
self.display.log("CSSs directory already exists: %s" % self.css_path)
else:
os.makedirs(self.css_path)
self.display.css_ad_info.value = 1
self.images_path = os.path.join(oebps, "Images")
if os.path.isdir(self.images_path):
self.display.log("Images directory already exists: %s" % self.images_path)
else:
os.makedirs(self.images_path)
self.display.images_ad_info.value = 1
def save_page_html(self, contents):
self.filename = self.filename.replace(".html", ".xhtml")
open(os.path.join(self.BOOK_PATH, "OEBPS", self.filename), "wb") \
.write(self.BASE_HTML.format(contents[0], contents[1]).encode("utf-8", 'xmlcharrefreplace'))
self.display.log("Created: %s" % self.filename)
def get(self):
len_books = len(self.book_chapters)
for _ in range(len_books):
if not len(self.chapters_queue):
return
first_page = len_books == len(self.chapters_queue)
next_chapter = self.chapters_queue.pop(0)
self.chapter_title = next_chapter["title"]
self.filename = next_chapter["filename"]
asset_base_url = next_chapter['asset_base_url']
api_v2_detected = False
if 'v2' in next_chapter['content']:
asset_base_url = SAFARI_BASE_URL + "/api/v2/epubs/urn:orm:book:{}/files".format(self.book_id)
api_v2_detected = True
if "images" in next_chapter and len(next_chapter["images"]):
for img_url in next_chapter['images']:
if api_v2_detected:
self.images.append(asset_base_url + '/' + img_url)
else:
self.images.append(urljoin(next_chapter['asset_base_url'], img_url))
# Stylesheets
self.chapter_stylesheets = []
if "stylesheets" in next_chapter and len(next_chapter["stylesheets"]):
self.chapter_stylesheets.extend(x["url"] for x in next_chapter["stylesheets"])
if "site_styles" in next_chapter and len(next_chapter["site_styles"]):
self.chapter_stylesheets.extend(next_chapter["site_styles"])
if os.path.isfile(os.path.join(self.BOOK_PATH, "OEBPS", self.filename.replace(".html", ".xhtml"))):
if not self.display.book_ad_info and \
next_chapter not in self.book_chapters[:self.book_chapters.index(next_chapter)]:
self.display.info(
("File `%s` already exists.\n"
" If you want to download again all the book,\n"
" please delete the output directory '" + self.BOOK_PATH + "' and restart the program.")
% self.filename.replace(".html", ".xhtml")
)
self.display.book_ad_info = 2
else:
self.save_page_html(self.parse_html(self.get_html(next_chapter["content"]), first_page))
self.display.state(len_books, len_books - len(self.chapters_queue))
def _thread_download_css(self, url):
css_file = os.path.join(self.css_path, "Style{0:0>2}.css".format(self.css.index(url)))
if os.path.isfile(css_file):
if not self.display.css_ad_info.value and url not in self.css[:self.css.index(url)]:
self.display.info(("File `%s` already exists.\n"
" If you want to download again all the CSSs,\n"
" please delete the output directory '" + self.BOOK_PATH + "'"
" and restart the program.") %
css_file)
self.display.css_ad_info.value = 1
else:
response = self.requests_provider(url)
if response == 0:
self.display.error("Error trying to retrieve this CSS: %s\n From: %s" % (css_file, url))
with open(css_file, 'wb') as s:
s.write(response.content)
self.css_done_queue.put(1)
self.display.state(len(self.css), self.css_done_queue.qsize())
def _thread_download_images(self, url):
image_name = url.split("/")[-1]
image_path = os.path.join(self.images_path, image_name)
if os.path.isfile(image_path):
if not self.display.images_ad_info.value and url not in self.images[:self.images.index(url)]:
self.display.info(("File `%s` already exists.\n"
" If you want to download again all the images,\n"
" please delete the output directory '" + self.BOOK_PATH + "'"
" and restart the program.") %
image_name)
self.display.images_ad_info.value = 1
else:
response = self.requests_provider(urljoin(SAFARI_BASE_URL, url), stream=True)
if response == 0:
self.display.error("Error trying to retrieve this image: %s\n From: %s" % (image_name, url))
return
with open(image_path, 'wb') as img:
for chunk in response.iter_content(1024):
img.write(chunk)
self.images_done_queue.put(1)
self.display.state(len(self.images), self.images_done_queue.qsize())
def _start_multiprocessing(self, operation, full_queue):
if len(full_queue) > 5:
for i in range(0, len(full_queue), 5):
self._start_multiprocessing(operation, full_queue[i:i + 5])
else:
process_queue = [Process(target=operation, args=(arg,)) for arg in full_queue]
for proc in process_queue:
proc.start()

safaribooks/safaribooks.py

Lines 321 to 448 in 48ae994

self.session.headers.update(self.HEADERS)
self.jwt = {}
if not args.cred:
if not os.path.isfile(COOKIES_FILE):
self.display.exit("Login: unable to find `cookies.json` file.\n"
" Please use the `--cred` or `--login` options to perform the login.")
self.session.cookies.update(json.load(open(COOKIES_FILE)))
else:
self.display.info("Logging into Safari Books Online...", state=True)
self.do_login(*args.cred)
if not args.no_cookies:
json.dump(self.session.cookies.get_dict(), open(COOKIES_FILE, 'w'))
self.check_login()
self.book_id = args.bookid
self.api_url = self.API_TEMPLATE.format(self.book_id)
self.display.info("Retrieving book info...")
self.book_info = self.get_book_info()
self.display.book_info(self.book_info)
self.display.info("Retrieving book chapters...")
self.book_chapters = self.get_book_chapters()
self.chapters_queue = self.book_chapters[:]
if len(self.book_chapters) > sys.getrecursionlimit():
sys.setrecursionlimit(len(self.book_chapters))
self.book_title = self.book_info["title"]
self.base_url = self.book_info["web_url"]
self.clean_book_title = "".join(self.escape_dirname(self.book_title).split(",")[:2]) \
+ " ({0})".format(self.book_id)
books_dir = os.path.join(PATH, "Books")
if not os.path.isdir(books_dir):
os.mkdir(books_dir)
self.BOOK_PATH = os.path.join(books_dir, self.clean_book_title)
self.display.set_output_dir(self.BOOK_PATH)
self.css_path = ""
self.images_path = ""
self.create_dirs()
self.chapter_title = ""
self.filename = ""
self.chapter_stylesheets = []
self.css = []
self.images = []
self.display.info("Downloading book contents... (%s chapters)" % len(self.book_chapters), state=True)
self.BASE_HTML = self.BASE_01_HTML + (self.KINDLE_HTML if not args.kindle else "") + self.BASE_02_HTML
self.cover = False
self.get()
if not self.cover:
self.cover = self.get_default_cover() if "cover" in self.book_info else False
cover_html = self.parse_html(
html.fromstring("<div id=\"sbo-rt-content\"><img src=\"Images/{0}\"></div>".format(self.cover)), True
)
self.book_chapters = [{
"filename": "default_cover.xhtml",
"title": "Cover"
}] + self.book_chapters
self.filename = self.book_chapters[0]["filename"]
self.save_page_html(cover_html)
self.css_done_queue = Queue(0) if "win" not in sys.platform else WinQueue()
self.display.info("Downloading book CSSs... (%s files)" % len(self.css), state=True)
self.collect_css()
self.images_done_queue = Queue(0) if "win" not in sys.platform else WinQueue()
self.display.info("Downloading book images... (%s files)" % len(self.images), state=True)
self.collect_images()
self.display.info("Creating EPUB file...", state=True)
self.create_epub()
if not args.no_cookies:
json.dump(self.session.cookies.get_dict(), open(COOKIES_FILE, "w"))
self.display.done(os.path.join(self.BOOK_PATH, self.book_id + ".epub"))
self.display.unregister()
if not self.display.in_error and not args.log:
os.remove(self.display.log_file)
def handle_cookie_update(self, set_cookie_headers):
for morsel in set_cookie_headers:
# Handle Float 'max-age' Cookie
if self.COOKIE_FLOAT_MAX_AGE_PATTERN.search(morsel):
cookie_key, cookie_value = morsel.split(";")[0].split("=")
self.session.cookies.set(cookie_key, cookie_value)
def requests_provider(self, url, is_post=False, data=None, perform_redirect=True, **kwargs):
try:
response = getattr(self.session, "post" if is_post else "get")(
url,
data=data,
allow_redirects=False,
**kwargs
)
self.handle_cookie_update(response.raw.headers.getlist("Set-Cookie"))
self.display.last_request = (
url, data, kwargs, response.status_code, "\n".join(
["\t{}: {}".format(*h) for h in response.headers.items()]
), response.text
)
except (requests.ConnectionError, requests.ConnectTimeout, requests.RequestException) as request_exception:
self.display.error(str(request_exception))
return 0
if response.is_redirect and perform_redirect:
return self.requests_provider(response.next.url, is_post, None, perform_redirect)
# TODO How about **kwargs?
return response


Step 2: 🧐 Snippet Analysis

From looking through the relevant snippets, I decided to make the following modifications:

File Path Proposed Changes
safaribooks.py - In the _start_multiprocessing method, after line 82, add the following code:
```python
if os.path.isfile(os.path.join(self.BOOK_PATH, self.book_id + ".epub")):
self.display.log("Book already exists: %s" % self.book_id)
return
```

Step 3: 📝 Planning

I have created a plan for writing the pull request. I am now working my plan and coding the required changes to address this issue. Here is the planned pull request:

Add check to skip book download if book already exists
sweep/check-book-existence

Description

This PR adds a check to skip the book download process if the book already exists in the specified output directory. This helps to avoid unnecessary downloads and saves resources.

Summary

  • Added a check to see if the EPUB file already exists in the specified output directory.
  • If the file exists, the download process is skipped and a log message is displayed.
  • This check is implemented in the _start_multiprocessing method in safaribooks.py.

Step 4: ⌨️ Coding

File Instructions Progress
safaribooks.py - In the _start_multiprocessing method, after line 82, add the following code:
python<br/> if os.path.isfile(os.path.join(self.BOOK_PATH, self.book_id + ".epub")):<br/> self.display.log("Book already exists: %s" % self.book_id)<br/> return<br/>
✅ Commit 3872391

Step 5: 🔁 Code Review

Here are the my self-reviews of my changes at sweep/check-book-existence.

Here is the 1st review

No changes required. The code changes in safaribooks.py on lines 1121-1122 are correct and do not contain any errors.

I finished incorporating these changes.


To recreate the pull request, leave a comment prefixed with "sweep:" or edit the issue.
Join Our Discord

@sweep-ai sweep-ai bot linked a pull request Aug 9, 2023 that will close this issue
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
sweep Assigns Sweep to an issue or pull request.
Projects
None yet
Development

Successfully merging a pull request may close this issue.

1 participant