Skip to content

Commit

Permalink
fix entry insertion from browser extension
Browse files Browse the repository at this point in the history
the browser driver assumed the account table to be unchanged
since start which could lead to data loss if accounts had been
inserted using other tools
  • Loading branch information
snopf committed Feb 7, 2020
1 parent 8fac9d5 commit 5d36c2a
Show file tree
Hide file tree
Showing 3 changed files with 132 additions and 31 deletions.
1 change: 1 addition & 0 deletions src/host/.gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
build/
dist/
*.log
65 changes: 34 additions & 31 deletions src/host/pc/native_app/snopf_browser_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,49 +66,50 @@ def error(err_msg):
logging.error(err_msg)
send_msg('error', err_msg)

account_table_path = app_path + '/../snopf_manager/account_table.json'

account_table = {}
try:
with open(account_table_path) as f:
account_table = json.load(f)
except FileNotFoundError:
error('Password file not found')

def save_account_table():
'''
Try to save the current account table to the account table file
'''
def load_account_table(account_table_path):
try:
with open(account_table_path, 'r') as f:
return json.load(f)
except FileNotFoundError:
error('Password file not found')
return {}

def insert_entry(entry, account_table):
'''Insert entry in given account_table dict'''
if not entry['hostname'] in account_table:
account_table[entry['hostname']] = {}

assert not entry['account'] in account_table[entry['hostname']]

account_table[entry['hostname']][entry['account']] = {
'password_length': entry['password_length'],
'password_iteration': entry['password_iteration']
}

def save_new_entry(entry, account_table_path):
''' Save new entry in account table file'''
f = open(account_table_path, 'r+')
try:
fcntl.flock(f, fcntl.LOCK_EX | fcntl.LOCK_NB)
except BlockingIOError:
return False
# Get the current account table, other programs may have changed it
account_table = load_account_table(account_table_path)
insert_entry(entry, account_table)
f.seek(0)
json.dump(account_table, f)
f.truncate()
fcntl.flock(f, fcntl.LOCK_UN)
f.close()
return True

def set_new_account(msg):
'''
Add new account to the account table if necessary and return True
if a new account has been added.
'''
if not msg['hostname'] in account_table:
account_table[msg['hostname']] = {}

if not msg['account'] in account_table[msg['hostname']]:
account_table[msg['hostname']][msg['account']] = {
'password_length': msg['password_length'],
'password_iteration': msg['password_iteration']
}
return True
def entry_is_new(entry, account_table):
return (not (entry['hostname'] in account_table)
or not entry['account'] in account_table[entry['hostname']])

return False

def main_loop():
account_table_path = app_path + '/../snopf_manager/account_table.json'
account_table = load_account_table(account_table_path)

try:
while True:
Expand All @@ -124,8 +125,10 @@ def main_loop():
msg['password_iteration'] = int(msg['password_iteration'])
msg['password_length'] = int(msg['password_length'])
if msg['save_new_info']:
if set_new_account(msg):
if save_account_table():
if entry_is_new(msg, account_table):
if save_new_entry(msg, account_table_path):
account_table = load_account_table(
account_table_path)
send_msg('get_account_table', account_table)
else:
error('Cannot lock account table file')
Expand Down
97 changes: 97 additions & 0 deletions src/host/pc/native_app/test_snopf_browser_driver.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
#!/usr/bin/env python3

# Copyright (c) 2019-2020 Hajo Kortmann <[email protected]>
# License: GNU GPL v2 (see License.txt)

from snopf_browser_driver import *

import pytest
import json
import fcntl

def get_test_table():
return {'hostname1':
{'account1': {
'password_length': 40, 'password_iteration': 0}}}

def get_test_entry():
return {'hostname': 'hostname1', 'account': 'account2',
'password_length': 40, 'password_iteration': 0}

def test_entry_is_new_existing():
test_table = get_test_table()
test_entry = {'hostname': 'hostname1', 'account': 'account1'}
assert not entry_is_new(test_entry, test_table)

def test_entry_is_new_account():
test_table = get_test_table()
test_entry = {'hostname': 'hostname1', 'account': 'account2'}
assert entry_is_new(test_entry, test_table)

def test_entry_is_new_hostname():
test_table = get_test_table()
test_entry = {'hostname': 'hostname2', 'account': 'account1'}
assert entry_is_new(test_entry, test_table)

def test_insert_entry_existing_hostname():
test_table = get_test_table()
test_entry = {'hostname': 'hostname1', 'account': 'account2',
'password_length': 40, 'password_iteration': 0}
insert_entry(test_entry, test_table)
assert test_table == {'hostname1': {'account1': {
'password_length': 40,
'password_iteration': 0},
'account2': {
'password_length': 40,
'password_iteration': 0}
}
}

def test_insert_entry_new_hostname():
test_table = get_test_table()
test_entry = {'hostname': 'hostname2', 'account': 'account1',
'password_length': 40, 'password_iteration': 0}
insert_entry(test_entry, test_table)
assert test_table == {'hostname1': {'account1': {
'password_length': 40,
'password_iteration': 0}},
'hostname2': {'account1': {
'password_length': 40,
'password_iteration': 0}
}
}

def test_insert_entry_exception():
test_table = get_test_table()
test_entry = {'hostname': 'hostname1', 'account': 'account1',
'password_length': 40, 'password_iteration': 0}
with pytest.raises(AssertionError):
insert_entry(test_entry, test_table)

def create_account_table(tmp_path):
account_table_path = tmp_path / "account_table_file"
with open(account_table_path, 'w') as f:
json.dump({}, f)
return account_table_path

def test_save_new_entry_blocked(tmp_path):
account_table_path = create_account_table(tmp_path)
f = open(account_table_path, 'r')
fcntl.flock(f, fcntl.LOCK_EX | fcntl.LOCK_NB)
assert not save_new_entry(get_test_entry(), account_table_path)
fcntl.flock(f, fcntl.LOCK_UN)
f.close()

def test_save_new_entry_append(tmp_path):
account_table_path = create_account_table(tmp_path)
with open(account_table_path, 'w') as f:
json.dump(get_test_table(), f)
save_new_entry(get_test_entry(), account_table_path)
with open(account_table_path, 'r') as f:
file_table = json.load(f)
test_table = get_test_table()
test_table['hostname1']['account2'] = {
'password_iteration': 0, 'password_length': 40}
assert test_table == file_table


0 comments on commit 5d36c2a

Please sign in to comment.