Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fixed bug in cve_lookup setup + improved error logging #569

Merged
merged 3 commits into from
Apr 23, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
fixed bug in cve_lookup setup + improved error logging
  • Loading branch information
jstucke committed Apr 22, 2021
commit cb9a61a88a25dab76a2124c11715bd670afaccd6
8 changes: 5 additions & 3 deletions src/plugins/analysis/cve_lookup/internal/data_parsing.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from pathlib import Path
from typing import List, Optional, Tuple
from xml.etree.ElementTree import ParseError, parse
from zipfile import ZipFile
from zipfile import BadZipFile, ZipFile

import requests
from requests.exceptions import RequestException
Expand All @@ -31,10 +31,12 @@ def get_cve_links(url: str, selected_years: Optional[List[int]] = None) -> List[
def process_url(download_url: str, path: str):
try:
request = _retrieve_url(download_url)
zipped_data = ZipFile(BytesIO(request.content))
except RequestException as exception:
raise CveLookupException(f'URL {download_url} not found. URL might have changed.') from exception
except BadZipFile as exception:
raise CveLookupException(f'Could not retrieve file from URL {download_url} (bad zip file)') from exception

zipped_data = ZipFile(BytesIO(request.content))
zipped_data.extractall(path)


Expand Down Expand Up @@ -65,7 +67,7 @@ def extract_cpe_data_from_cve(nodes: List[dict]) -> List[Tuple[str, str, str, st
for dicts in nodes:
if 'cpe_match' in dicts.keys():
for cpe in dicts['cpe_match']:
if cpe['vulnerable']:
if 'cpe23Uri' in cpe and cpe['vulnerable']:
cpe_entries.append((
cpe['cpe23Uri'], cpe.get('versionStartIncluding', ''), cpe.get('versionStartExcluding', ''),
cpe.get('versionEndIncluding', ''), cpe.get('versionEndExcluding', '')
Expand Down
11 changes: 7 additions & 4 deletions src/plugins/analysis/cve_lookup/internal/database_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ def __init__(self, db_path: str = DB_PATH):
try:
self.connection = connect(db_path)
except SqliteException as exception:
logging.warning('Could not connect to CPE database: {} {}'.format(type(exception).__name__, exception))
raise exception
logging.warning(f'Could not connect to CPE database: {exception}', exc_info=True)
raise

def execute_query(self, query: str):
with self.get_cursor() as cursor:
Expand All @@ -87,7 +87,10 @@ def fetch_one(self, query: str):

def insert_rows(self, query: str, input_data: list):
with self.get_cursor() as cursor:
cursor.executemany(query, input_data)
wrong_entries = {e for e in input_data if len(e) != query.count('?')}
if wrong_entries:
logging.warning(f"Ignoring possibly wrong entries: {[e[2] for e in wrong_entries]}")
cursor.executemany(query, list(set(input_data) - wrong_entries))
self.connection.commit()

@contextmanager
Expand All @@ -97,7 +100,7 @@ def get_cursor(self):
cursor = self.connection.cursor()
yield cursor
except SqliteException as error:
logging.error('[cve_lookup]: encountered error while accessing DB: {} {}'.format(type(error).__name__, error))
logging.error(f'[cve_lookup]: Encountered error while accessing DB: {error}', exc_info=True)
finally:
with suppress(AttributeError, SqliteException):
cursor.close()
Expand Down
12 changes: 6 additions & 6 deletions src/plugins/analysis/cve_lookup/internal/setup_repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

CURRENT_YEAR = datetime.now().year
DATABASE = DatabaseInterface()
SPLIT_REGEX = r'(?<!\\)[:]'
CPE_SPLIT_REGEX = r'(?<![\\:]):(?!:)|(?<=\\:):' # don't split on '::' or '\:' but split on '\::'

Years = namedtuple('Years', 'start_year end_year')

Expand Down Expand Up @@ -186,7 +186,7 @@ def setup_cve_feeds_table(cve_list: List[CveEntry]) -> List[Tuple[str, ...]]:
year = entry.cve_id.split('-')[1]
score_v2 = entry.impact.get('cvssV2', 'N/A')
score_v3 = entry.impact.get('cvssV3', 'N/A')
cpe_elements = replace_characters_and_wildcards(re.split(SPLIT_REGEX, cpe_id)[2:])
cpe_elements = replace_characters_and_wildcards(re.split(CPE_SPLIT_REGEX, cpe_id)[2:])
row = (
entry.cve_id, year, cpe_id, score_v2, score_v3, *cpe_elements,
version_start_including, version_start_excluding, version_end_including, version_end_excluding
Expand All @@ -197,15 +197,15 @@ def setup_cve_feeds_table(cve_list: List[CveEntry]) -> List[Tuple[str, ...]]:

def setup_cpe_table(cpe_list: list) -> list:
return [
(cpe, *replace_characters_and_wildcards(re.split(SPLIT_REGEX, cpe)[2:]))
(cpe, *replace_characters_and_wildcards(re.split(CPE_SPLIT_REGEX, cpe)[2:]))
for cpe in cpe_list
]


class Choice(Enum):
cpe = 'cpe'
cve = 'cve'
both = 'both'
cpe = 'cpe' # pylint: disable=invalid-name
cve = 'cve' # pylint: disable=invalid-name
both = 'both' # pylint: disable=invalid-name

def cpe_was_chosen(self):
return self.value in [self.cpe.value, self.both.value]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def test_select_functionality():

def test_insert_functionality():
with DatabaseInterface(TEST_DB_PATH) as db:
db.insert_rows(TEST_QUERIES['test_insert'].format('test_table'), [[34]])
db.insert_rows(TEST_QUERIES['test_insert'].format('test_table'), [(34,)])
test_insert_output = list(db.fetch_multiple(query=QUERIES['select_all'].format('test_table')))
assert test_insert_output == [(23,), (34,)]

Expand Down
28 changes: 24 additions & 4 deletions src/plugins/analysis/cve_lookup/test/test_setup_repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,9 +226,11 @@
]

# contain input and expected results of the setup_cpe_format function
CPE_LIST = ['cpe:2.3:a:\\$0.99_kindle_books_project:\\$0.99_kindle_books:6:*:*:*:*:android:*:*',
'cpe:2.3:a:1000guess:1000_guess:-:*:*:*:*:*:*:*', 'cpe:2.3:a:1024cms:1024_cms:0.7:*:*:*:*:*:*:*',
'cpe:2.3:a:1024cms:1024_cms:1.2.5:*:*:*:*:*:*:*', 'cpe:2.3:a:1024cms:1024_cms:1.3.1:*:*:*:*:*:*:*']
CPE_LIST = [
'cpe:2.3:a:\\$0.99_kindle_books_project:\\$0.99_kindle_books:6:*:*:*:*:android:*:*',
'cpe:2.3:a:1000guess:1000_guess:-:*:*:*:*:*:*:*', 'cpe:2.3:a:1024cms:1024_cms:0.7:*:*:*:*:*:*:*',
'cpe:2.3:a:1024cms:1024_cms:1.2.5:*:*:*:*:*:*:*', 'cpe:2.3:a:1024cms:1024_cms:1.3.1:*:*:*:*:*:*:*'
]
CPE_TABLE = [
('cpe:2.3:a:\\$0.99_kindle_books_project:\\$0.99_kindle_books:6:*:*:*:*:android:*:*', 'a',
'\\$0\\.99_kindle_books_project', '\\$0\\.99_kindle_books', '6', 'ANY', 'ANY', 'ANY', 'ANY', 'android', 'ANY', 'ANY'),
Expand Down Expand Up @@ -518,4 +520,22 @@ def test_setup_cve_summary_table():

def test_setup_cpe_table():
result = sr.setup_cpe_table(CPE_LIST)
assert CPE_TABLE == result
for entry in result:
assert len(entry) == 12
for actual, expected in zip(result, CPE_TABLE):
assert actual == expected


def test_setup_cpe_entry_with_colons():
result = sr.setup_cpe_table([
'cpe:2.3:a:net::netmask_project:net::netmask:*:*:*:*:*:perl:*:*',
'cpe:2.3:a:lemonldap-ng:lemonldap\\:\\::1.0.3:*:*:*:*:*:*:*'
])
expected_result = [
('cpe:2.3:a:net::netmask_project:net::netmask:*:*:*:*:*:perl:*:*', 'a', 'net::netmask_project', 'net::netmask', 'ANY', 'ANY', 'ANY', 'ANY', 'ANY', 'perl', 'ANY', 'ANY'),
('cpe:2.3:a:lemonldap-ng:lemonldap\\:\\::1.0.3:*:*:*:*:*:*:*', 'a', 'lemonldap\\-ng', 'lemonldap\\:\\:', '1\\.0\\.3', 'ANY', 'ANY', 'ANY', 'ANY', 'ANY', 'ANY', 'ANY')
]
for entry in result:
assert len(entry) == 12
for actual, expected in zip(result, expected_result):
assert actual == expected