-
Notifications
You must be signed in to change notification settings - Fork 1.7k
/
io.py
103 lines (91 loc) · 3.59 KB
/
io.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
from pathlib import Path
import logging
from farm.data_handler.utils import http_get
import tempfile
import tarfile
import zipfile
logger = logging.getLogger(__name__)
def write_documents_to_db(document_store, document_dir, clean_func=None, only_empty_db=False, split_paragraphs=False):
"""
Write all text files(.txt) in the sub-directories of the given path to the connected database.
:param document_dir: path for the documents to be written to the database
:param clean_func: a custom cleaning function that gets applied to each doc (input: str, output:str)
:param only_empty_db: If true, docs will only be written if db is completely empty.
Useful to avoid indexing the same initial docs again and again.
:return: None
"""
file_paths = Path(document_dir).glob("**/*.txt")
# check if db has already docs
if only_empty_db:
n_docs = document_store.get_document_count()
if n_docs > 0:
logger.info(f"Skip writing documents since DB already contains {n_docs} docs ... "
"(Disable `only_empty_db`, if you want to add docs anyway.)")
return None
# read and add docs
docs_to_index = []
doc_id = 1
for path in file_paths:
with open(path) as doc:
text = doc.read()
if clean_func:
text = clean_func(text)
if split_paragraphs:
for para in text.split("\n\n"):
if not para.strip(): # skip empty paragraphs
continue
docs_to_index.append(
{
"name": path.name,
"text": para,
"document_id": doc_id
}
)
doc_id += 1
else:
docs_to_index.append(
{
"name": path.name,
"text": text,
"document_id": doc_id
}
)
document_store.write_documents(docs_to_index)
logger.info(f"Wrote {len(docs_to_index)} docs to DB")
def fetch_archive_from_http(url, output_dir, proxies=None):
"""
Fetch an archive (zip or tar.gz) from a url via http and extract content to an output directory.
:param url: http address
:type url: str
:param output_dir: local path
:type output_dir: str
:param proxies: proxies details as required by requests library
:type proxies: dict
:return: bool if anything got fetched
"""
# verify & prepare local directory
path = Path(output_dir)
if not path.exists():
path.mkdir(parents=True)
is_not_empty = len(list(Path(path).rglob("*"))) > 0
if is_not_empty:
logger.info(
f"Found data stored in `{output_dir}`. Delete this first if you really want to fetch new data."
)
return False
else:
logger.info(f"Fetching from {url} to `{output_dir}`")
# download & extract
with tempfile.NamedTemporaryFile() as temp_file:
http_get(url, temp_file, proxies=proxies)
temp_file.flush()
temp_file.seek(0) # making tempfile accessible
# extract
if url[-4:] == ".zip":
archive = zipfile.ZipFile(temp_file.name)
archive.extractall(output_dir)
elif url[-7:] == ".tar.gz":
archive = tarfile.open(temp_file.name)
archive.extractall(output_dir)
# temp_file gets deleted here
return True