forked from fkie-cad/FACT_core
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request fkie-cad#542 from fkie-cad/statistics_endpoint
Added statistics endpoints and tests
- Loading branch information
Showing
4 changed files
with
107 additions
and
1 deletion.
There are no files selected for viewing
52 changes: 52 additions & 0 deletions
52
src/test/integration/web_interface/rest/test_rest_statistics.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,52 @@ | ||
import json | ||
|
||
from storage.db_interface_statistic import StatisticDbUpdater | ||
from test.integration.web_interface.rest.base import RestTestBase | ||
|
||
|
||
class TestRestStatistics(RestTestBase): | ||
|
||
def setUp(self): | ||
super().setUp() | ||
self.stats_updater = StatisticDbUpdater(config=self.config) | ||
self.stats_updater.update_statistic('file_type', {'file_types': [['application/gzip', 3454]], | ||
'firmware_container': [['application/zip', 3], ['firmware/foo', 1]]}) | ||
self.stats_updater.update_statistic('known_vulnerabilities', {'known_vulnerabilities': [['BackDoor_String', 1]]}) | ||
|
||
def tearDown(self): | ||
self.stats_updater.shutdown() | ||
super().tearDown() | ||
|
||
def test_rest_request_all_statistics(self): | ||
st = self.test_client.get('/rest/statistics', follow_redirects=True) | ||
st_dict = json.loads(st.data) | ||
|
||
assert b'file_type' in st.data | ||
assert bool(st_dict['file_type']) | ||
assert 'file_types' in st_dict['file_type'] | ||
assert 'firmware_container' in st_dict['file_type'] | ||
assert b'known_vulnerabilities' in st.data | ||
assert bool(st_dict['known_vulnerabilities']) | ||
assert 'known_vulnerabilities' in st_dict['known_vulnerabilities'] | ||
assert b'malware' in st.data | ||
assert not st_dict['malware'] | ||
assert b'exploit_mitigations' in st.data | ||
assert not st_dict['exploit_mitigations'] | ||
|
||
def test_rest_request_single_statistic(self): | ||
st = self.test_client.get('/rest/statistics/file_type', follow_redirects=True) | ||
st_dict = json.loads(st.data) | ||
|
||
assert b'file_type' in st.data | ||
assert 'file_types' in st_dict['file_type'] | ||
assert 'firmware_container' in st_dict['file_type'] | ||
assert b'known_vulnerabilities' not in st.data | ||
|
||
def test_rest_request_non_existent_statistic(self): | ||
st = self.test_client.get('/rest/statistics/non_existent_stat', follow_redirects=True) | ||
|
||
assert b'A statistic with the ID non_existent_stat does not exist' in st.data | ||
|
||
def test_rest_request_invalid_data(self): | ||
st = self.test_client.get('/rest/statistics/', follow_redirects=True) | ||
assert b'404 Not Found' in st.data |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,51 @@ | ||
from flask_restful import Resource | ||
|
||
from helperFunctions.database import ConnectTo | ||
from storage.db_interface_statistic import StatisticDbViewer | ||
|
||
from web_interface.security.decorator import roles_accepted | ||
from web_interface.security.privileges import PRIVILEGES | ||
from web_interface.rest.helper import error_message | ||
|
||
|
||
class RestStatistics(Resource): | ||
URL = '/rest/statistics' | ||
STATISTICS = ['general', 'firmware_meta', 'file_type', 'malware', 'crypto_material', 'unpacking', 'ips_and_uris', | ||
'architecture', 'release_date', 'exploit_mitigations', 'known_vulnerabilities', 'software_components', | ||
'elf_executable'] | ||
|
||
def __init__(self, **kwargs): | ||
self.config = kwargs.get('config', None) | ||
|
||
@roles_accepted(*PRIVILEGES['status']) | ||
def get(self, stat_name=None): | ||
if not stat_name: | ||
return self._get_all_stats_from_db() | ||
return self._get_certain_stats_from_db(stat_name) | ||
|
||
def _get_all_stats_from_db(self): | ||
with ConnectTo(StatisticDbViewer, self.config) as stats_db: | ||
statistics_dict = {} | ||
for stat in self.STATISTICS: | ||
statistics_dict[stat] = stats_db.get_statistic(stat) | ||
|
||
self._delete_id_and_check_empty_stat(statistics_dict) | ||
|
||
return statistics_dict | ||
|
||
def _get_certain_stats_from_db(self, statistic_name): | ||
with ConnectTo(StatisticDbViewer, self.config) as stats_db: | ||
statistic_dict = {statistic_name: stats_db.get_statistic(statistic_name)} | ||
self._delete_id_and_check_empty_stat(statistic_dict) | ||
if statistic_name not in self.STATISTICS: | ||
return error_message('A statistic with the ID {} does not exist'.format(statistic_name), self.URL, dict(stat_name=statistic_name)) | ||
|
||
return statistic_dict | ||
|
||
@staticmethod | ||
def _delete_id_and_check_empty_stat(stats_dict): | ||
for stat in stats_dict.copy(): | ||
if stats_dict[stat] is not None: | ||
del stats_dict[stat]['_id'] | ||
if stats_dict[stat] is None: | ||
stats_dict[stat] = {} |
File renamed without changes.