diff --git a/Tribler/Core/CacheDB/SqliteCacheDBHandler.py b/Tribler/Core/CacheDB/SqliteCacheDBHandler.py index 979c25b21d5..d9866b9e294 100644 --- a/Tribler/Core/CacheDB/SqliteCacheDBHandler.py +++ b/Tribler/Core/CacheDB/SqliteCacheDBHandler.py @@ -321,8 +321,6 @@ def updatePeer(self, permid, commit=True, **argv): def deletePeer(self, permid=None, peer_id=None, force=False, commit=True): # don't delete friend of superpeers, except that force is True - # to do: add transaction - # self._db._begin() # begin a transaction if peer_id is None: peer_id = self._db.getPeerID(permid) if peer_id is None: diff --git a/Tribler/Core/CacheDB/sqlitecachedb.py b/Tribler/Core/CacheDB/sqlitecachedb.py index fa88d246bf8..882d22b3d9c 100644 --- a/Tribler/Core/CacheDB/sqlitecachedb.py +++ b/Tribler/Core/CacheDB/sqlitecachedb.py @@ -52,20 +52,15 @@ # written to the DB in the schema_sdb_v*.sql file!!! CURRENT_MAIN_DB_VERSION = 18 +config_dir = None CREATE_SQL_FILE = None + CREATE_SQL_FILE_POSTFIX = os.path.join(LIBRARYNAME, 'schema_sdb_v' + str(CURRENT_MAIN_DB_VERSION) + '.sql') DB_FILE_NAME = 'tribler.sdb' DB_DIR_NAME = 'sqlite' # db file path = DB_DIR_NAME/DB_FILE_NAME DEFAULT_BUSY_TIMEOUT = 10000 -MAX_SQL_BATCHED_TO_TRANSACTION = 1000 # don't change it unless carefully tested. A transaction with 1000 batched updates took 1.5 seconds NULL = None -icon_dir = None SHOW_ALL_EXECUTE = False -costs = [] -cost_reads = [] -torrent_dir = None -config_dir = None -install_dir = None TEST_OVERRIDE = False INITIAL_UPGRADE_PAUSE = 10 @@ -88,7 +83,6 @@ while exists(DB_DEBUG_FILE): DB_DEBUG_FILE = "tribler_database_queries_%d.txt" % randint(1, 9999999) - class Warning(Exception): pass @@ -105,37 +99,17 @@ def __setitem__(self, *args, **kargs): def init(config, db_exception_handler=None): """ create sqlite database """ global CREATE_SQL_FILE - global icon_dir - global torrent_dir global config_dir - global install_dir - torrent_dir = os.path.abspath(config['torrent_collecting_dir']) config_dir = config['state_dir'] - install_dir = config['install_dir'] - CREATE_SQL_FILE = os.path.join(install_dir, CREATE_SQL_FILE_POSTFIX) - sqlitedb = SQLiteCacheDB.getInstance(db_exception_handler) + CREATE_SQL_FILE = os.path.join(config['install_dir'], CREATE_SQL_FILE_POSTFIX) sqlite_db_path = os.path.join(config_dir, DB_DIR_NAME, DB_FILE_NAME) print >> sys.stderr, "cachedb: init: SQL FILE", sqlite_db_path - icon_dir = os.path.abspath(config['peer_icon_path']) - + sqlitedb = SQLiteCacheDB.getInstance(db_exception_handler) sqlitedb.initDB(sqlite_db_path, CREATE_SQL_FILE) # the first place to create db in Tribler return sqlitedb -def done(): - # Arno, 2012-07-04: Obsolete, each thread must close the DBHandler it uses - # in its own shutdown procedure. There is no global close of all per-thread - # cursors/connections. - # - SQLiteCacheDB.getInstance().close() - -def make_filename(config_dir, filename): - if config_dir is None: - return filename - else: - return os.path.join(config_dir, filename) - def bin2str(bin): # Full BASE64-encoded return encodestring(bin).replace("\n", "") @@ -143,56 +117,6 @@ def bin2str(bin): def str2bin(str): return decodestring(str) -def print_exc_plus(): - """ - Print the usual traceback information, followed by a listing of all the - local variables in each frame. - http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/52215 - http://initd.org/pub/software/pysqlite/apsw/3.3.13-r1/apsw.html#augmentedstacktraces - """ - - tb = sys.exc_info()[2] - stack = [] - - while tb: - stack.append(tb.tb_frame) - tb = tb.tb_next - - print_exc() - print >> sys.stderr, "Locals by frame, innermost last" - - for frame in stack: - print >> sys.stderr - print >> sys.stderr, "Frame %s in %s at line %s" % (frame.f_code.co_name, - frame.f_code.co_filename, - frame.f_lineno) - for key, value in frame.f_locals.items(): - print >> sys.stderr, "\t%20s = " % key, - # We have to be careful not to cause a new error in our error - # printer! Calling str() on an unknown object could cause an - # error we don't want. - try: - print >> sys.stderr, value - except: - print >> sys.stderr, "" - -def debugTime(func): - def invoke_func(*args, **kwargs): - if DEBUG_TIME: - t1 = time() - - result = func(*args, **kwargs) - - if DEBUG_TIME: - diff = time() - t1 - if diff > 0.5: - print >> sys.stderr, "TOOK", diff, args - - return result - - invoke_func.__name__ = func.__name__ - return invoke_func - class safe_dict(dict): def __init__(self, *args, **kw): self.lock = threading.RLock() @@ -239,14 +163,13 @@ class SQLiteCacheDBBase: def __init__(self, db_exception_handler=None): self.exception_handler = db_exception_handler self.cursor_table = safe_dict() # {thread_name:cur} - self.cache_transaction_table = safe_dict() # {thread_name:[sql] self.class_variables = safe_dict({'db_path':None, 'busytimeout':None}) # busytimeout is in milliseconds # Arno, 2012-08-02: As there is just Dispersy thread here, removing # safe_dict() here # 24/09/12 Boudewijn: changed into LimitedOrderedDict to limit memory consumption - self.permid_id = LimitedOrderedDict(1024 * 5) # {} # safe_dict() - self.infohash_id = LimitedOrderedDict(1024 * 5) # {} # safe_dict() + self.permid_id = LimitedOrderedDict(1024 * 5) + self.infohash_id = LimitedOrderedDict(1024 * 5) self.show_execute = False # TODO: All global variables must be protected to be thread safe? @@ -274,7 +197,6 @@ def close(self, clean=False): self.exception_handler = None self.class_variables = safe_dict({'db_path':None, 'busytimeout':None}) self.cursor_table = safe_dict() - self.cache_transaction_table = safe_dict() def close_all(self): for thread_name, cur in self.cursor_table.items(): @@ -286,12 +208,6 @@ def _close_cur(self, thread_name, cur): con.close() del self.cursor_table[thread_name] - # Arno, 2010-01-25: Remove entry in cache_transaction_table for this thread - try: - if thread_name in self.cache_transaction_table.keys(): - del self.cache_transaction_table[thread_name] - except: - print_exc() # --------- static functions -------- def getCursor(self, create=True): @@ -533,7 +449,7 @@ def show_sql(self, switch): # --------- generic functions ------------- def commit(self): - self.transaction() + pass def _execute(self, sql, args=None): cur = self.getCursor() @@ -542,23 +458,6 @@ def _execute(self, sql, args=None): thread_name = threading.currentThread().getName() print >> sys.stderr, '===', thread_name, '===\n', sql, '\n-----\n', args, '\n======\n' - # we should not perform database actions on the GUI (MainThread) thread because that might - # block the GUI - if DEBUG_THREAD: - if threading.currentThread().getName() == "MainThread": - for sql_line in sql.split(";"): - try: - # key, rest = sql_line.strip().split(" ", 1) - key = sql_line[:50] - print >> sys.stderr, "sqlitecachedb.py: should not perform sql", key, "on GUI thread" - # print_stack() - except: - # key = sql.strip() - key = sql_line - if key: - print >> sys.stderr, "sqlitecachedb.py: should not perform sql", key, "on GUI thread" - # print_stack() - try: if args is None: return cur.execute(sql) @@ -566,136 +465,52 @@ def _execute(self, sql, args=None): return cur.execute(sql, args) except Exception, msg: - if True: - if str(msg).startswith("BusyError"): - print >> sys.stderr, "cachedb: busylock error" + if str(msg).startswith("BusyError"): + print >> sys.stderr, "cachedb: busylock error" - else: - print_exc() - print_stack() - print >> sys.stderr, "cachedb: execute error:", Exception, msg - thread_name = threading.currentThread().getName() - print >> sys.stderr, '===', thread_name, '===\nSQL Type:', type(sql), '\n-----\n', sql, '\n-----\n', args, '\n======\n' - - # return None - # ARNODB: this is incorrect, it should reraise the exception - # such that _transaction can rollback or recommit. - # This bug already reported by Johan - raise msg - -# @debugTime - def execute_read(self, sql, args=None): - # this is only called for reading. If you want to write the db, always use execute_write or executemany - return self._execute(sql, args) - - def execute_write(self, sql, args=None, commit=True): - self.cache_transaction(sql, args) - if commit: - self.commit() - - def executemany(self, sql, args, commit=True): - - thread_name = threading.currentThread().getName() - if thread_name not in self.cache_transaction_table: - self.cache_transaction_table[thread_name] = [] - all = [(sql, arg) for arg in args] - self.cache_transaction_table[thread_name].extend(all) - - if commit: - self.commit() + else: + print_exc() + print_stack() + print >> sys.stderr, "cachedb: execute error:", Exception, msg + thread_name = threading.currentThread().getName() + print >> sys.stderr, '===', thread_name, '===\nSQL Type:', type(sql), '\n-----\n', sql, '\n-----\n', args, '\n======\n' - def cache_transaction(self, sql, args=None): - thread_name = threading.currentThread().getName() - if thread_name not in self.cache_transaction_table: - self.cache_transaction_table[thread_name] = [] - self.cache_transaction_table[thread_name].append((sql, args)) + raise msg - def transaction(self, sql=None, args=None): - if sql: - self.cache_transaction(sql, args) + def _executemany(self, sql, args=None): + cur = self.getCursor() - thread_name = threading.currentThread().getName() + if SHOW_ALL_EXECUTE or self.show_execute: + thread_name = threading.currentThread().getName() + print >> sys.stderr, '===', thread_name, '===\n', sql, '\n-----\n', args, '\n======\n' - n = 0 - sql_full = '' - arg_list = [] - sql_queue = self.cache_transaction_table.get(thread_name, None) - if sql_queue: - while True: - try: - _sql, _args = sql_queue.pop(0) - except IndexError: - break - - _sql = _sql.strip() - if not _sql: - continue - if not _sql.endswith(';'): - _sql += ';' - sql_full += _sql + '\n' - if _args != None: - arg_list += list(_args) - n += 1 - - # if too many sql in cache, split them into batches to prevent processing and locking DB for a long time - # TODO: optimize the value of MAX_SQL_BATCHED_TO_TRANSACTION - if n % MAX_SQL_BATCHED_TO_TRANSACTION == 0: - self._transaction(sql_full, arg_list) - sql_full = '' - arg_list = [] - - self._transaction(sql_full, arg_list) - - def _transaction(self, sql, args=None): - if sql: - sql = 'BEGIN TRANSACTION; \n' + sql + 'COMMIT TRANSACTION;' - try: - self._execute(sql, args) - except Exception, e: - self.commit_retry_if_busy_or_rollback(e, 0, sql=sql) + try: + if args is None: + return cur.executemany(sql) + else: + return cur.executemany(sql, args) - def commit_retry_if_busy_or_rollback(self, e, tries, sql=None): - """ - Arno: - SQL_BUSY errors happen at the beginning of the experiment, - very quickly after startup (e.g. 0.001 s), so the busy timeout - is not honoured for some reason. After the initial errors, - they no longer occur. - """ - print >> sys.stderr, "sqlcachedb: commit_retry: after", str(e), repr(sql) + except Exception, msg: + if str(msg).startswith("BusyError"): + print >> sys.stderr, "cachedb: busylock error" + else: + print_exc() + print_stack() + print >> sys.stderr, "cachedb: execute error:", Exception, msg + thread_name = threading.currentThread().getName() + print >> sys.stderr, '===', thread_name, '===\nSQL Type:', type(sql), '\n-----\n', sql, '\n-----\n', args, '\n======\n' - if str(e).startswith("BusyError"): - try: - self._execute("COMMIT") - except Exception, e2: - if tries < 5: # self.max_commit_retries - # Spec is unclear whether next commit will also has - # 'busytimeout' seconds to try to get a write lock. - sleep(pow(2.0, tries + 2) / 100.0) - self.commit_retry_if_busy_or_rollback(e2, tries + 1) - else: - self.rollback(tries) - raise Exception, e2 - else: - self.rollback(tries) - m = "cachedb: TRANSACTION ERROR " + threading.currentThread().getName() + ' ' + str(e) - raise Exception, m + raise msg + def execute_read(self, sql, args=None): + return self._execute(sql, args) - def rollback(self, tries): - print_exc() - try: - self._execute("ROLLBACK") - except Exception, e: - # May be harmless, see above. Unfortunately they don't specify - # what the error is when an attempt is made to roll back - # an automatically rolled back transaction. - m = "cachedb: ROLLBACK ERROR " + threading.currentThread().getName() + ' ' + str(e) - # print >> sys.stderr, 'SQLite Database', m - raise Exception, m + def execute_write(self, sql, args=None, commit=True): + self._execute(sql, args) + def executemany(self, sql, args, commit=True): + self.__executemany(sql, args) - # -------- Write Operations -------- def insert_or_replace(self, table_name, commit=True, **argv): if len(argv) == 1: sql = 'INSERT OR REPLACE INTO %s (%s) VALUES (?);' % (table_name, argv.keys()[0]) @@ -2553,28 +2368,8 @@ def executemany(self, sql, args, commit=True): return self._executemany(sql, args) - def cache_transaction(self, sql, args=None): - if not onDBThread(): - SQLiteCacheDBV5.cache_transaction(self, sql, args) - elif DEPRECATION_DEBUG: - raise DeprecationWarning('Please do not use cache_transaction') - - def transaction(self, sql=None, args=None): - if not onDBThread(): - SQLiteCacheDBV5.transaction(self, sql, args) - elif DEPRECATION_DEBUG: - raise DeprecationWarning('Please do not use transaction') - - def _transaction(self, sql, args=None): - if not onDBThread(): - SQLiteCacheDBV5._transaction(self, sql, args) - elif DEPRECATION_DEBUG: - raise DeprecationWarning('Please do not use _transaction') - def commit(self): - if not onDBThread(): - SQLiteCacheDBV5.commit(self) - elif DEPRECATION_DEBUG: + if DEPRECATION_DEBUG: raise DeprecationWarning('Please do not use commit') def clean_db(self, vacuum=False, exiting=False): diff --git a/Tribler/Core/Swift/SwiftDownloadImpl.py b/Tribler/Core/Swift/SwiftDownloadImpl.py index babfc20ebbd..19393f37e37 100644 --- a/Tribler/Core/Swift/SwiftDownloadImpl.py +++ b/Tribler/Core/Swift/SwiftDownloadImpl.py @@ -1,37 +1,37 @@ # Written by Arno Bakker # see LICENSE.txt for license information # -# TODO: +# TODO: # - set rate limits # * Check if current policy of limiting hint_out_size is sane. -# - test case: start unlimited, wait 10 s, then set to 512 K. In one +# - test case: start unlimited, wait 10 s, then set to 512 K. In one # test speed dropped to few bytes/s then rose again to 512 K. # * upload rate limit -# * test if you get 512K for each swarm when you download two in parallel +# * test if you get 512K for each swarm when you download two in parallel # in one swift proc. # -# - HASHCHECKING +# - HASHCHECKING # * get progress from swift -# * Current cmdgw impl will open and thus hashcheck on main thread, halting -# all network traffic, etc. in all other swarms. BitTornado interleaves +# * Current cmdgw impl will open and thus hashcheck on main thread, halting +# all network traffic, etc. in all other swarms. BitTornado interleaves # on netw thread. # - Run cmdgw on separate thread(s)? # # - STATS -# * store 2 consecutive more info dicts and calc speeds, and convert +# * store 2 consecutive more info dicts and calc speeds, and convert # those to DownloadState.get_peerlist() format. -# +# # - BUGS -# * Try to recv ICMP port unreach on Mac such that we can clean up Channel +# * Try to recv ICMP port unreach on Mac such that we can clean up Channel # (Linux done) -# +# import sys import copy -from traceback import print_exc,print_stack -from threading import RLock,currentThread +from traceback import print_exc, print_stack +from threading import RLock, currentThread from Tribler.Core import NoDispersyRLock from Tribler.Core.simpledefs import * @@ -42,18 +42,18 @@ # ARNOSMPTODO: MODIFY WITH cmdgw.cpp::CMDGW_PREBUFFER_BYTES_AS_LAYER # Send PLAY after receiving 2^layer * 1024 bytes -CMDGW_PREBUFFER_BYTES = (2 ** 8) * 1024 +CMDGW_PREBUFFER_BYTES = (2 ** 8) * 1024 SWIFT_ALIVE_CHECK_INTERVAL = 60.0 DEBUG = False -class SwiftDownloadImpl(SwiftDownloadRuntimeConfig): +class SwiftDownloadImpl(SwiftDownloadRuntimeConfig): """ Download subclass that represents a swift download. The actual swift download takes places in a SwiftProcess. """ - - def __init__(self,session,sdef): + + def __init__(self, session, sdef): self.dllock = NoDispersyRLock() self.session = session self.sdef = sdef @@ -70,17 +70,17 @@ def __init__(self,session,sdef): self.dlstatus = DLSTATUS_WAITING4HASHCHECK self.dynasize = 0L self.progress = 0.0 - self.curspeeds = {DOWNLOAD:0.0,UPLOAD:0.0} # bytes/s + self.curspeeds = {DOWNLOAD:0.0, UPLOAD:0.0} # bytes/s self.numleech = 0 self.numseeds = 0 - self.contentbytes = {DOWNLOAD:0,UPLOAD:0} # bytes + self.contentbytes = {DOWNLOAD:0, UPLOAD:0} # bytes self.done = False # when set it means this download is being removed self.midict = {} self.time_seeding = [0, None] self.total_up = 0 self.total_down = 0 - + self.lm_network_vod_event_callback = None self.askmoreinfo = False @@ -97,7 +97,7 @@ def get_def(self): # # Creating a Download # - def setup(self,dcfg=None,pstate=None,initialdlstatus=None,lm_network_engine_wrapper_created_callback=None,lm_network_vod_event_callback=None): + def setup(self, dcfg=None, pstate=None, initialdlstatus=None, lm_network_engine_wrapper_created_callback=None, lm_network_vod_event_callback=None): """ Create a Download object. Used internally by Session. @param dcfg DownloadStartupConfig or None (in which case @@ -106,7 +106,7 @@ def setup(self,dcfg=None,pstate=None,initialdlstatus=None,lm_network_engine_wrap """ # Called by any thread, assume sessionlock is held try: - self.dllock.acquire() # not really needed, no other threads know of this object + self.dllock.acquire() # not really needed, no other threads know of this object # Copy dlconfig, from default if not specified if dcfg is None: @@ -114,10 +114,10 @@ def setup(self,dcfg=None,pstate=None,initialdlstatus=None,lm_network_engine_wrap else: cdcfg = dcfg self.dlconfig = copy.copy(cdcfg.dlconfig) - + # Things that only exist at runtime - self.dlruntimeconfig= {} + self.dlruntimeconfig = {} self.dlruntimeconfig['max_desired_upload_rate'] = 0 self.dlruntimeconfig['max_desired_download_rate'] = 0 @@ -128,33 +128,33 @@ def setup(self,dcfg=None,pstate=None,initialdlstatus=None,lm_network_engine_wrap if dlstate.has_key('total_up'): self.total_up = dlstate['total_up'] if dlstate.has_key('total_down'): - self.total_down = dlstate['total_down'] - + self.total_down = dlstate['total_down'] + if DEBUG: - print >>sys.stderr,"SwiftDownloadImpl: setup: initialdlstatus",`self.sdef.get_roothash_as_hex()`,initialdlstatus + print >> sys.stderr, "SwiftDownloadImpl: setup: initialdlstatus", `self.sdef.get_roothash_as_hex()`, initialdlstatus # Note: initialdlstatus now only works for STOPPED if initialdlstatus != DLSTATUS_STOPPED: - self.create_engine_wrapper(lm_network_engine_wrapper_created_callback,pstate,lm_network_vod_event_callback) - + self.create_engine_wrapper(lm_network_engine_wrapper_created_callback, pstate, lm_network_vod_event_callback) + self.dllock.release() - except Exception,e: + except Exception, e: print_exc() self.set_error(e) self.dllock.release() - def create_engine_wrapper(self,lm_network_engine_wrapper_created_callback,pstate,lm_network_vod_event_callback,initialdlstatus=None): - network_create_engine_wrapper_lambda = lambda:self.network_create_engine_wrapper(lm_network_engine_wrapper_created_callback,pstate,lm_network_vod_event_callback,initialdlstatus) - self.session.lm.rawserver.add_task(network_create_engine_wrapper_lambda) - - def network_create_engine_wrapper(self,lm_network_engine_wrapper_created_callback,pstate,lm_network_vod_event_callback,initialdlstatus=None): + def create_engine_wrapper(self, lm_network_engine_wrapper_created_callback, pstate, lm_network_vod_event_callback, initialdlstatus=None): + network_create_engine_wrapper_lambda = lambda:self.network_create_engine_wrapper(lm_network_engine_wrapper_created_callback, pstate, lm_network_vod_event_callback, initialdlstatus) + self.session.lm.rawserver.add_task(network_create_engine_wrapper_lambda) + + def network_create_engine_wrapper(self, lm_network_engine_wrapper_created_callback, pstate, lm_network_vod_event_callback, initialdlstatus=None): """ Called by any thread, assume dllock already acquired """ if DEBUG: - print >>sys.stderr,"SwiftDownloadImpl: create_engine_wrapper()" + print >> sys.stderr, "SwiftDownloadImpl: create_engine_wrapper()" - if self.get_mode() == DLMODE_VOD: + if self.get_mode() == DLMODE_VOD: self.lm_network_vod_event_callback = lm_network_vod_event_callback - + if not self.dlconfig.has_key('swiftmetadir') and not os.path.isdir(self.get_dest_dir()): # We must be dealing with a checkpoint from a previous release (<6.1.0). Move the swift metadata to the right directory. metadir = os.path.join(get_default_dest_dir(), STATEDIR_SWIFTRESEED_DIR) @@ -173,15 +173,16 @@ def network_create_engine_wrapper(self,lm_network_engine_wrapper_created_callbac shutil.move(path_old + '.mbinmap', path_new + '.mbinmap') except: print_exc() - + # Synchronous: starts process if needed - self.sp = self.session.lm.spm.get_or_create_sp(self.session.get_swift_working_dir(),self.session.get_torrent_collecting_dir(),self.get_swift_listen_port(), self.get_swift_httpgw_listen_port(), self.get_swift_cmdgw_listen_port() ) - self.sp.start_download(self) - - self.session.lm.rawserver.add_task(self.network_check_swift_alive,SWIFT_ALIVE_CHECK_INTERVAL) - + self.sp = self.session.lm.spm.get_or_create_sp(self.session.get_swift_working_dir(), self.session.get_torrent_collecting_dir(), self.get_swift_listen_port(), self.get_swift_httpgw_listen_port(), self.get_swift_cmdgw_listen_port()) + if self.sp: + self.sp.start_download(self) + + self.session.lm.rawserver.add_task(self.network_check_swift_alive, SWIFT_ALIVE_CHECK_INTERVAL) + # Arno: if used, make sure to switch to network thread first! - #if lm_network_engine_wrapper_created_callback is not None: + # if lm_network_engine_wrapper_created_callback is not None: # sp = self.sp # exc = self.error # lm_network_engine_wrapper_created_callback(self,sp,exc,pstate) @@ -189,7 +190,7 @@ def network_create_engine_wrapper(self,lm_network_engine_wrapper_created_callbac # # SwiftProcess callbacks # - def i2ithread_info_callback(self,dlstatus,progress,dynasize,dlspeed,ulspeed,numleech,numseeds,contentdl,contentul): + def i2ithread_info_callback(self, dlstatus, progress, dynasize, dlspeed, ulspeed, numleech, numseeds, contentdl, contentul): self.dllock.acquire() try: if dlstatus == DLSTATUS_SEEDING and self.dlstatus != dlstatus: @@ -200,7 +201,7 @@ def i2ithread_info_callback(self,dlstatus,progress,dynasize,dlspeed,ulspeed,numl # stopped seeding self.time_seeding[0] = self.get_seeding_time() self.time_seeding[1] = None - + self.dlstatus = dlstatus self.dynasize = dynasize self.progress = progress @@ -208,57 +209,57 @@ def i2ithread_info_callback(self,dlstatus,progress,dynasize,dlspeed,ulspeed,numl self.curspeeds[UPLOAD] = ulspeed self.numleech = numleech self.numseeds = numseeds - self.contentbytes = {DOWNLOAD:contentdl,UPLOAD:contentul} + self.contentbytes = {DOWNLOAD:contentdl, UPLOAD:contentul} finally: self.dllock.release() - - def i2ithread_vod_event_callback(self,event,httpurl): + + def i2ithread_vod_event_callback(self, event, httpurl): if DEBUG: - print >>sys.stderr,"SwiftDownloadImpl: i2ithread_vod_event_callback: ENTER",event,httpurl,"mode",self.get_mode() - + print >> sys.stderr, "SwiftDownloadImpl: i2ithread_vod_event_callback: ENTER", event, httpurl, "mode", self.get_mode() + self.dllock.acquire() try: if event == VODEVENT_START: - + if self.get_mode() != DLMODE_VOD: return - + # Fix firefox idiosyncrasies - duration = self.sdef.get_duration() + duration = self.sdef.get_duration() if duration is not None: - httpurl += '@'+duration - - vod_usercallback_wrapper = lambda event,params:self.session.uch.perform_vod_usercallback(self,self.dlconfig['vod_usercallback'],event,params) + httpurl += '@' + duration + + vod_usercallback_wrapper = lambda event, params:self.session.uch.perform_vod_usercallback(self, self.dlconfig['vod_usercallback'], event, params) videoinfo = {} videoinfo['usercallback'] = vod_usercallback_wrapper - + # ARNOSMPTODO: if complete, return file directly - + # Allow direct connection of video renderer with swift HTTP server # via new "url" param. - # - - if DEBUG: - print >>sys.stderr,"SwiftDownloadImpl: i2ithread_vod_event_callback",event,httpurl - + # + + if DEBUG: + print >> sys.stderr, "SwiftDownloadImpl: i2ithread_vod_event_callback", event, httpurl + # Arno: No threading violation, lm_network_* is safe at the moment - self.lm_network_vod_event_callback( videoinfo, VODEVENT_START, { + self.lm_network_vod_event_callback(videoinfo, VODEVENT_START, { "complete": False, "filename": None, - "mimetype": 'application/octet-stream', # ARNOSMPTODO + "mimetype": 'application/octet-stream', # ARNOSMPTODO "stream": None, "length": self.get_dynasize(), - "bitrate": None, # ARNOSMPTODO + "bitrate": None, # ARNOSMPTODO "url": httpurl, - } ) + }) finally: self.dllock.release() - def i2ithread_moreinfo_callback(self,midict): + def i2ithread_moreinfo_callback(self, midict): self.dllock.acquire() try: - #print >>sys.stderr,"SwiftDownloadImpl: Got moreinfo",midict.keys() + # print >>sys.stderr,"SwiftDownloadImpl: Got moreinfo",midict.keys() self.midict = midict finally: self.dllock.release() @@ -275,13 +276,13 @@ def get_status(self): finally: self.dllock.release() - + def get_dynasize(self): """ Returns the size of the swift content. Note this may vary (generally ~1KiB because of dynamic size determination by the swift protocol @return long - """ + """ self.dllock.acquire() try: return self.dynasize @@ -299,17 +300,17 @@ def get_progress(self): finally: self.dllock.release() - def get_current_speed(self,dir): + def get_current_speed(self, dir): """ Return last reported speed in KB/s @return float """ self.dllock.acquire() try: - return self.curspeeds[dir]/1024.0 + return self.curspeeds[dir] / 1024.0 finally: self.dllock.release() - def get_moreinfo_stats(self,dir): + def get_moreinfo_stats(self, dir): """ Return last reported more info dict @return dict """ @@ -318,16 +319,16 @@ def get_moreinfo_stats(self,dir): return self.midict finally: self.dllock.release() - + def get_seeding_time(self): return self.time_seeding[0] + (time.time() - self.time_seeding[1] if self.time_seeding[1] != None else 0) - + def get_total_up(self): return self.total_up + self.contentbytes[UPLOAD] - + def get_total_down(self): return self.total_down + self.contentbytes[DOWNLOAD] - + def get_seeding_statistics(self): seeding_stats = {} seeding_stats['total_up'] = self.get_total_up() @@ -335,13 +336,13 @@ def get_seeding_statistics(self): seeding_stats['time_seeding'] = self.get_seeding_time() return seeding_stats - def network_get_stats(self,getpeerlist): + def network_get_stats(self, getpeerlist): """ @return (status,stats,logmsgs,coopdl_helpers,coopdl_coordinator) """ # dllock held # ARNOSMPTODO: Have a status for when swift is hashchecking the file on disk - + if self.sp is None: status = DLSTATUS_STOPPED else: @@ -362,47 +363,47 @@ def network_get_stats(self,getpeerlist): stats['spew'] = self.network_create_spew_from_peerlist() seeding_stats = self.get_seeding_statistics() - + logmsgs = [] - return (status,stats,seeding_stats,logmsgs) + return (status, stats, seeding_stats, logmsgs) def network_create_statistics_reponse(self): - return SwiftStatisticsResponse(self.numleech,self.numseeds,self.midict) - + return SwiftStatisticsResponse(self.numleech, self.numseeds, self.midict) + def network_calc_eta(self): - bytestogof = (1.0-self.progress) * float(self.dynasize) - dlspeed = max(0.000001,self.curspeeds[DOWNLOAD]) - return bytestogof/dlspeed + bytestogof = (1.0 - self.progress) * float(self.dynasize) + dlspeed = max(0.000001, self.curspeeds[DOWNLOAD]) + return bytestogof / dlspeed def network_calc_prebuf_frac(self): gotbytesf = self.progress * float(self.dynasize) prebuff = float(CMDGW_PREBUFFER_BYTES) - return min(1.0,gotbytesf/prebuff) + return min(1.0, gotbytesf / prebuff) def network_calc_prebuf_eta(self): - bytestogof = (1.0-self.network_calc_prebuf_frac()) * float(CMDGW_PREBUFFER_BYTES) - dlspeed = max(0.000001,self.curspeeds[DOWNLOAD]) - return bytestogof/dlspeed + bytestogof = (1.0 - self.network_calc_prebuf_frac()) * float(CMDGW_PREBUFFER_BYTES) + dlspeed = max(0.000001, self.curspeeds[DOWNLOAD]) + return bytestogof / dlspeed def network_get_vod_stats(self): # More would have to be sent from swift process to set these correctly d = {} d['played'] = None - d['late'] = None - d['dropped'] = None - d['stall'] = None + d['late'] = None + d['dropped'] = None + d['stall'] = None d['pos'] = None d['prebuf'] = None - d['firstpiece'] = 0 - d['npieces'] = ((self.dynasize +1023) / 1024) + d['firstpiece'] = 0 + d['npieces'] = ((self.dynasize + 1023) / 1024) return d def network_create_spew_from_peerlist(self): if not 'channels' in self.midict: return [] - + plist = [] channels = self.midict['channels'] for channel in channels: @@ -412,36 +413,36 @@ def network_create_spew_from_peerlist(self): d['utotal'] = channel['bytes_up'] / 1024.0 d['dtotal'] = channel['bytes_down'] / 1024.0 plist.append(d) - + return plist - + # # Retrieving DownloadState # - def set_state_callback(self,usercallback,getpeerlist=False,delay=0.0): + def set_state_callback(self, usercallback, getpeerlist=False, delay=0.0): """ Called by any thread """ self.dllock.acquire() try: - network_get_state_lambda = lambda:self.network_get_state(usercallback,getpeerlist) + network_get_state_lambda = lambda:self.network_get_state(usercallback, getpeerlist) # First time on general rawserver - self.session.lm.rawserver.add_task(network_get_state_lambda,delay) + self.session.lm.rawserver.add_task(network_get_state_lambda, delay) finally: self.dllock.release() - def network_get_state(self,usercallback,getpeerlist,sessioncalling=False): + def network_get_state(self, usercallback, getpeerlist, sessioncalling=False): """ Called by network thread """ self.dllock.acquire() try: if self.sp is None: if DEBUG: - print >>sys.stderr,"SwiftDownloadImpl: network_get_state: Download not running" - ds = DownloadState(self,DLSTATUS_STOPPED,self.error,self.progressbeforestop,seeding_stats=self.get_seeding_statistics()) + print >> sys.stderr, "SwiftDownloadImpl: network_get_state: Download not running" + ds = DownloadState(self, DLSTATUS_STOPPED, self.error, self.progressbeforestop, seeding_stats=self.get_seeding_statistics()) else: - (status,stats,seeding_stats,logmsgs) = self.network_get_stats(getpeerlist) - ds = DownloadState(self,status,self.error,self.get_progress(),stats=stats,seeding_stats=seeding_stats,logmsgs=logmsgs) + (status, stats, seeding_stats, logmsgs) = self.network_get_stats(getpeerlist) + ds = DownloadState(self, status, self.error, self.get_progress(), stats=stats, seeding_stats=seeding_stats, logmsgs=logmsgs) self.progressbeforestop = ds.get_progress() - + if sessioncalling: return ds @@ -449,20 +450,20 @@ def network_get_state(self,usercallback,getpeerlist,sessioncalling=False): # After the callback is invoked, the return values will be passed to # the returncallback for post-callback processing. if not self.done: - self.session.uch.perform_getstate_usercallback(usercallback,ds,self.sesscb_get_state_returncallback) + self.session.uch.perform_getstate_usercallback(usercallback, ds, self.sesscb_get_state_returncallback) finally: self.dllock.release() - def sesscb_get_state_returncallback(self,usercallback,when,newgetpeerlist): + def sesscb_get_state_returncallback(self, usercallback, when, newgetpeerlist): """ Called by SessionCallbackThread """ self.dllock.acquire() try: if when > 0.0 and not self.done: # Schedule next invocation, either on general or DL specific - # Note this continues when dl is stopped. - network_get_state_lambda = lambda:self.network_get_state(usercallback,newgetpeerlist) - self.session.lm.rawserver.add_task(network_get_state_lambda,when) + # Note this continues when dl is stopped. + network_get_state_lambda = lambda:self.network_get_state(usercallback, newgetpeerlist) + self.session.lm.rawserver.add_task(network_get_state_lambda, when) finally: self.dllock.release() @@ -472,29 +473,29 @@ def sesscb_get_state_returncallback(self,usercallback,when,newgetpeerlist): # def stop(self): """ Called by any thread """ - self.stop_remove(False,removestate=False,removecontent=False) + self.stop_remove(False, removestate=False, removecontent=False) - def stop_remove(self,removedl,removestate=False,removecontent=False): + def stop_remove(self, removedl, removestate=False, removecontent=False): """ Called by any thread. Called on Session.remove_download() """ # Arno, 2013-01-29: This download is being removed, not just stopped. self.done = removedl - self.network_stop(removestate=removestate,removecontent=removecontent) + self.network_stop(removestate=removestate, removecontent=removecontent) - def network_stop(self,removestate,removecontent): + def network_stop(self, removestate, removecontent): """ Called by network thread, but safe for any """ self.dllock.acquire() try: if DEBUG: - print >>sys.stderr,"SwiftDownloadImpl: network_stop",`self.sdef.get_name()` + print >> sys.stderr, "SwiftDownloadImpl: network_stop", `self.sdef.get_name()` pstate = self.network_get_persistent_state() if self.sp is not None: - self.sp.remove_download(self,removestate,removecontent) + self.sp.remove_download(self, removestate, removecontent) self.session.lm.spm.release_sp(self.sp) self.sp = None self.time_seeding = [self.get_seeding_time(), None] - + # Offload the removal of the dlcheckpoint to another thread if removestate: # To remove: @@ -503,28 +504,28 @@ def network_stop(self,removestate,removecontent): # 3. content (if so desired) # content and .mhash file is removed by swift engine if requested - roothash = self.sdef.get_roothash() - self.session.uch.perform_removestate_callback(roothash,None,False) - - return (self.sdef.get_roothash(),pstate) + roothash = self.sdef.get_roothash() + self.session.uch.perform_removestate_callback(roothash, None, False) + + return (self.sdef.get_roothash(), pstate) finally: self.dllock.release() def get_content_dest(self): """ Returns the file to which the downloaded content is saved. """ - return os.path.join(self.get_dest_dir(),self.sdef.get_roothash_as_hex()) + return os.path.join(self.get_dest_dir(), self.sdef.get_roothash_as_hex()) def restart(self, initialdlstatus=None): """ Restart the Download """ - # Called by any thread + # Called by any thread if DEBUG: - print >>sys.stderr,"SwiftDownloadImpl: restart:",`self.sdef.get_name()` + print >> sys.stderr, "SwiftDownloadImpl: restart:", `self.sdef.get_name()` self.dllock.acquire() try: if self.sp is None: - self.error = None # assume fatal error is reproducible - self.create_engine_wrapper(self.session.lm.network_engine_wrapper_created_callback,None,self.session.lm.network_vod_event_callback,initialdlstatus=initialdlstatus) + self.error = None # assume fatal error is reproducible + self.create_engine_wrapper(self.session.lm.network_engine_wrapper_created_callback, None, self.session.lm.network_vod_event_callback, initialdlstatus=initialdlstatus) # No exception if already started, for convenience finally: @@ -532,14 +533,14 @@ def restart(self, initialdlstatus=None): # - # Config parameters that only exists at runtime + # Config parameters that only exists at runtime # - def set_max_desired_speed(self,direct,speed): + def set_max_desired_speed(self, direct, speed): if DEBUG: - print >>sys.stderr,"Download: set_max_desired_speed",direct,speed - #if speed < 10: + print >> sys.stderr, "Download: set_max_desired_speed", direct, speed + # if speed < 10: # print_stack() - + self.dllock.acquire() if direct == UPLOAD: self.dlruntimeconfig['max_desired_upload_rate'] = speed @@ -547,7 +548,7 @@ def set_max_desired_speed(self,direct,speed): self.dlruntimeconfig['max_desired_download_rate'] = speed self.dllock.release() - def get_max_desired_speed(self,direct): + def get_max_desired_speed(self, direct): self.dllock.acquire() try: if direct == UPLOAD: @@ -579,40 +580,40 @@ def checkpoint(self): # Arno, 2012-05-15. Currently this is safe to call from any thread. # Need this for torrent collecting via swift. self.network_checkpoint() - + def network_checkpoint(self): """ Called by network thread """ self.dllock.acquire() try: - pstate = self.network_get_persistent_state() + pstate = self.network_get_persistent_state() if self.sp is not None: self.sp.checkpoint_download(self) - return (self.sdef.get_roothash(),pstate) + return (self.sdef.get_roothash(), pstate) finally: self.dllock.release() - + def network_get_persistent_state(self): """ Assume dllock already held """ pstate = {} pstate['version'] = PERSISTENTSTATE_CURRENTVERSION - pstate['metainfo'] = self.sdef.get_url_with_meta() # assumed immutable + pstate['metainfo'] = self.sdef.get_url_with_meta() # assumed immutable dlconfig = copy.copy(self.dlconfig) dlconfig['name'] = self.sdef.get_name() # Reset unpicklable params dlconfig['vod_usercallback'] = None - dlconfig['mode'] = DLMODE_NORMAL # no callback, no VOD + dlconfig['mode'] = DLMODE_NORMAL # no callback, no VOD pstate['dlconfig'] = dlconfig pstate['dlstate'] = {} - ds = self.network_get_state(None,False,sessioncalling=True) + ds = self.network_get_state(None, False, sessioncalling=True) pstate['dlstate']['status'] = ds.get_status() pstate['dlstate']['progress'] = ds.get_progress() pstate['dlstate']['swarmcache'] = None pstate['dlstate'].update(ds.get_seeding_statistics()) - + if DEBUG: - print >>sys.stderr,"SwiftDownloadImpl: netw_get_pers_state: status",dlstatus_strings[ds.get_status()],"progress",ds.get_progress() + print >> sys.stderr, "SwiftDownloadImpl: netw_get_pers_state: status", dlstatus_strings[ds.get_status()], "progress", ds.get_progress() # Swift stores own state in .mhash and .mbinmap file pstate['engineresumedata'] = None @@ -622,7 +623,7 @@ def network_get_persistent_state(self): # # Coop download # - def get_coopdl_role_object(self,role): + def get_coopdl_role_object(self, role): """ Called by network thread """ return None @@ -634,34 +635,34 @@ def recontact_tracker(self): # # MOREINFO # - def set_moreinfo_stats(self,enable): + def set_moreinfo_stats(self, enable): """ Called by any thread """ - + # Arno, 2012-07-31: slight risk if process killed in between if self.askmoreinfo == enable: return self.askmoreinfo = enable - + if self.sp is not None: - self.sp.set_moreinfo_stats(self,enable) + self.sp.set_moreinfo_stats(self, enable) # # External addresses # - def add_peer(self,addr): + def add_peer(self, addr): """ Add a peer address from 3rd source (not tracker, not DHT) to this Download. @param (hostname_ip,port) tuple """ if self.sp is not None: - self.sp.add_peer(self,addr) + self.sp.add_peer(self, addr) # # Internal methods # - def set_error(self,e): + def set_error(self, e): self.dllock.acquire() self.error = e self.dllock.release() @@ -675,27 +676,27 @@ def network_check_swift_alive(self): try: if self.sp is not None and not self.done: if not self.sp.is_alive(): - print >>sys.stderr,"SwiftDownloadImpl: network_check_swift_alive: Restarting",`self.sdef.get_name()` + print >> sys.stderr, "SwiftDownloadImpl: network_check_swift_alive: Restarting", `self.sdef.get_name()` self.sp = None self.restart() except: print_exc() finally: self.dllock.release() - + if not self.done: - self.session.lm.rawserver.add_task(self.network_check_swift_alive,SWIFT_ALIVE_CHECK_INTERVAL) - + self.session.lm.rawserver.add_task(self.network_check_swift_alive, SWIFT_ALIVE_CHECK_INTERVAL) + class SwiftStatisticsResponse: - - def __init__(self,numleech,numseeds,midict): + + def __init__(self, numleech, numseeds, midict): # More would have to be sent from swift process to set these correctly self.numConCandidates = 0 self.numConInitiated = 0 self.have = None self.numSeeds = numseeds self.numPeers = numleech - + # Arno, 2012-05-23: At Niels' request self.upTotal = 0 self.downTotal = 0 diff --git a/Tribler/Core/Swift/SwiftProcessMgr.py b/Tribler/Core/Swift/SwiftProcessMgr.py index 2ff3bf7284e..1b5b9069f5c 100644 --- a/Tribler/Core/Swift/SwiftProcessMgr.py +++ b/Tribler/Core/Swift/SwiftProcessMgr.py @@ -6,7 +6,7 @@ import binascii import random import time -from traceback import print_exc,print_stack +from traceback import print_exc, print_stack import threading from Tribler.Core.Swift.SwiftProcess import * @@ -18,7 +18,7 @@ class SwiftProcessMgr: """ Class that manages a number of SwiftProcesses """ - def __init__(self,binpath,i2iport,dlsperproc,tunnellistenport,sesslock): + def __init__(self, binpath, i2iport, dlsperproc, tunnellistenport, sesslock): self.binpath = binpath self.i2iport = i2iport # ARNOSMPTODO: Implement such that a new proc is created when needed @@ -26,71 +26,71 @@ def __init__(self,binpath,i2iport,dlsperproc,tunnellistenport,sesslock): self.tunnellistenport = tunnellistenport self.sesslock = sesslock self.done = False - - self.sps = [] + self.sps = [] - def get_or_create_sp(self,workdir,zerostatedir,listenport,httpgwport,cmdgwport): + def get_or_create_sp(self, workdir, zerostatedir, listenport, httpgwport, cmdgwport): """ Download needs a process """ self.sesslock.acquire() - #print >>sys.stderr,"spm: get_or_create_sp" - try: - self.clean_sps() - - sp = None - if listenport is not None: - # Reuse the one with the same requested listen port - for sp2 in self.sps: - if sp2.listenport == listenport: - sp = sp2 - #print >>sys.stderr,"spm: get_or_create_sp: Reusing",sp2.get_pid() - - elif self.dlsperproc > 1: - # Find one with room, distribute equally - random.shuffle(self.sps) - for sp2 in self.sps: - if len(sp2.get_downloads()) < self.dlsperproc: - sp = sp2 - print >>sys.stderr,"spm: get_or_create_sp: Reusing",sp.get_pid() - break - - if sp is None: - # Create new process - sp = SwiftProcess(self.binpath,workdir,zerostatedir,listenport,httpgwport,cmdgwport,self) - print >>sys.stderr,"spm: get_or_create_sp: Creating new",sp.get_pid() - self.sps.append(sp) - - # Arno, 2011-10-13: On Linux swift is slow to start and - # allocate the cmd listen socket?! - # 2012-05-23: connection_lost() will attempt another - # connect when the first fails, so not timing dependent, - # just ensures no send_()s get lost. Executed by NetworkThread. - if sys.platform == "linux2" or sys.platform=="darwin": - print >>sys.stderr,"spm: Need to sleep 1 second for swift to start on Linux?! FIXME" - time.sleep(1) - - sp.start_cmd_connection() - - return sp - finally: - self.sesslock.release() - - def release_sp(self,sp): + if not self.done: + # print >>sys.stderr,"spm: get_or_create_sp" + try: + self.clean_sps() + + sp = None + if listenport is not None: + # Reuse the one with the same requested listen port + for sp2 in self.sps: + if sp2.listenport == listenport: + sp = sp2 + # print >>sys.stderr,"spm: get_or_create_sp: Reusing",sp2.get_pid() + + elif self.dlsperproc > 1: + # Find one with room, distribute equally + random.shuffle(self.sps) + for sp2 in self.sps: + if len(sp2.get_downloads()) < self.dlsperproc: + sp = sp2 + print >> sys.stderr, "spm: get_or_create_sp: Reusing", sp.get_pid() + break + + if sp is None: + # Create new process + sp = SwiftProcess(self.binpath, workdir, zerostatedir, listenport, httpgwport, cmdgwport, self) + print >> sys.stderr, "spm: get_or_create_sp: Creating new", sp.get_pid() + self.sps.append(sp) + + # Arno, 2011-10-13: On Linux swift is slow to start and + # allocate the cmd listen socket?! + # 2012-05-23: connection_lost() will attempt another + # connect when the first fails, so not timing dependent, + # just ensures no send_()s get lost. Executed by NetworkThread. + if sys.platform == "linux2" or sys.platform == "darwin": + print >> sys.stderr, "spm: Need to sleep 1 second for swift to start on Linux?! FIXME" + time.sleep(1) + + sp.start_cmd_connection() + + return sp + finally: + self.sesslock.release() + + def release_sp(self, sp): """ Download no longer needs process. Apply process-cleanup policy """ - # ARNOSMPTODO: MULTIPLE: Add policy param on whether to keep process around when no downloads. + # ARNOSMPTODO: MULTIPLE: Add policy param on whether to keep process around when no downloads. self.sesslock.acquire() try: # Arno, 2012-05-23: Don't kill tunneling swift process if sp.get_listen_port() == self.tunnellistenport: return - + if len(sp.get_downloads()) == 0: self.destroy_sp(sp) finally: self.sesslock.release() - - def destroy_sp(self,sp): - print >>sys.stderr,"spm: destroy_sp:",sp.get_pid() + + def destroy_sp(self, sp): + print >> sys.stderr, "spm: destroy_sp:", sp.get_pid() self.sesslock.acquire() try: self.sps.remove(sp) @@ -105,7 +105,7 @@ def clean_sps(self): deads = [] for sp in self.sps: if not sp.is_alive(): - print >>sys.stderr,"spm: clean_sps: Garbage collecting dead",sp.get_pid() + print >> sys.stderr, "spm: clean_sps: Garbage collecting dead", sp.get_pid() deads.append(sp) for sp in deads: self.sps.remove(sp) @@ -116,25 +116,31 @@ def early_shutdown(self): gracetime (see Session.shutdown()). """ # Called by any thread, assume sessionlock is held - print >>sys.stderr,"spm: early_shutdown" - self.done = True - - for sp in self.sps: - try: - sp.early_shutdown() - except: - print_exc() - + print >> sys.stderr, "spm: early_shutdown" + try: + self.sesslock.acquire() + self.done = True + + + for sp in self.sps: + try: + sp.early_shutdown() + except: + print_exc() + finally: + self.sesslock.release() + def network_shutdown(self): """ Gracetime expired, kill procs """ # Called by network thread + print >> sys.stderr, "spm: network_shutdown" for sp in self.sps: try: sp.network_shutdown() except: print_exc() - def connection_lost(self,port): + def connection_lost(self, port): if self.done: return @@ -142,8 +148,8 @@ def connection_lost(self,port): try: for sp in self.sps: if sp.get_cmdport() == port: - print >>sys.stderr,"spm: connection_lost: Restart",sp.get_pid() + print >> sys.stderr, "spm: connection_lost: Restart", sp.get_pid() sp.start_cmd_connection() finally: self.sesslock.release() - + diff --git a/Tribler/Test/API/test_seeding.py b/Tribler/Test/API/test_seeding.py index 849f48c6924..84c767cb756 100644 --- a/Tribler/Test/API/test_seeding.py +++ b/Tribler/Test/API/test_seeding.py @@ -34,13 +34,12 @@ def setUpPreSession(self): """ override TestAsServer """ TestAsServer.setUpPreSession(self) - self.config_path2 = tempfile.mkdtemp() self.config2 = self.config.copy() # not really necess - self.config2.set_state_dir(self.config_path2) + self.config2.set_state_dir(self.getStateDir(2)) self.config2.set_listen_port(4810) self.dscfg2 = DownloadStartupConfig() - self.dscfg2.set_dest_dir(self.config_path2) + self.dscfg2.set_dest_dir(self.getDestDir(2)) def setUpPostSession(self): pass @@ -60,15 +59,10 @@ def setup_seeder(self, merkle, filename='file.wmv'): self.tdef.set_create_merkle_torrent(merkle) self.tdef.finalize() - self.torrentfn = os.path.join(self.session.get_state_dir(), "gen.torrent") - self.tdef.save(self.torrentfn) - - self.tdef = TorrentDef.load(self.torrentfn) - print >> sys.stderr, "test: setup_seeder: name is", self.tdef.metainfo['info']['name'] self.dscfg = DownloadStartupConfig() - self.dscfg.set_dest_dir(os.path.join(BASE_DIR, "API")) + self.dscfg.set_dest_dir(os.path.join(BASE_DIR, "API")) # basedir of the file we are seeding d = self.session.start_download(self.tdef, self.dscfg) d.set_state_callback(self.seeder_state_callback) diff --git a/Tribler/Test/API/test_seeding_live.py b/Tribler/Test/API/test_seeding_live.py index cb62ae109dd..d2be8ae47ae 100644 --- a/Tribler/Test/API/test_seeding_live.py +++ b/Tribler/Test/API/test_seeding_live.py @@ -19,7 +19,7 @@ from Tribler.Core.simpledefs import * from Tribler.Core.Utilities.bitfield import Bitfield -DEBUG=True +DEBUG = True class TestSeeding(TestAsServer): """ @@ -29,9 +29,9 @@ class TestSeeding(TestAsServer): def setUp(self): """ override TestAsServer """ TestAsServer.setUp(self) - print >>sys.stderr,"test: Giving Session time to startup" + print >> sys.stderr, "test: Giving Session time to startup" time.sleep(5) - print >>sys.stderr,"test: Session should have started up" + print >> sys.stderr, "test: Session should have started up" def setUpPreSession(self): """ override TestAsServer """ @@ -57,7 +57,7 @@ def test_live_torrent(self): """ self.setup_seeder() time.sleep(10) - #self.subtest_connect2downloader() + # self.subtest_connect2downloader() self.subtest_download() @@ -67,56 +67,52 @@ def setup_seeder(self): self.bitrate = 6144 piecesize = 32768 self.npieces = 12 - playtime = ((self.npieces-1)*piecesize)/self.bitrate - playtimestr = '0:'+str(playtime) # DON'T WORK IF > 60 secs - self.tdef.create_live("Test Live",self.bitrate,playtimestr) + playtime = ((self.npieces - 1) * piecesize) / self.bitrate + playtimestr = '0:' + str(playtime) # DON'T WORK IF > 60 secs + self.tdef.create_live("Test Live", self.bitrate, playtimestr) self.tdef.set_tracker(self.session.get_internal_tracker_url()) self.tdef.set_piece_length(piecesize) self.tdef.finalize() - self.torrentfn = os.path.join(self.session.get_state_dir(),"gen.torrent") - self.tdef.save(self.torrentfn) - - print >>sys.stderr,"test: setup_seeder: name is",self.tdef.metainfo['info']['name'] + print >> sys.stderr, "test: setup_seeder: name is", self.tdef.metainfo['info']['name'] self.dscfg = DownloadStartupConfig() - self.dscfg.set_dest_dir(os.getcwd()) + self.dscfg.set_dest_dir(self.getDestDir()) # File source source = InfiniteSource(piecesize) self.dscfg.set_video_ratelimit(self.bitrate) self.dscfg.set_video_source(source) - d = self.session.start_download(self.tdef,self.dscfg) + d = self.session.start_download(self.tdef, self.dscfg) d.set_state_callback(self.seeder_state_callback) - def seeder_state_callback(self,ds): + def seeder_state_callback(self, ds): d = ds.get_download() - print >>sys.stderr,"test: seeder:",dlstatus_strings[ds.get_status()],ds.get_progress() - return (1.0,False) + print >> sys.stderr, "test: seeder:", dlstatus_strings[ds.get_status()], ds.get_progress() + return (1.0, False) def subtest_download(self): """ Now download the file via another Session """ - self.config2 = self.config.copy() # not really necess - self.config_path2 = tempfile.mkdtemp() - self.config2.set_state_dir(self.config_path2) + self.config2 = self.config.copy() # not really necess + self.config2.set_state_dir(self.getStateDir(2)) self.config2.set_listen_port(self.mylistenport) - self.session2 = Session(self.config2,ignore_singleton=True) + self.session2 = Session(self.config2, ignore_singleton=True) # Allow session2 to start - print >>sys.stderr,"test: downloader: Sleeping 3 secs to let Session2 start" + print >> sys.stderr, "test: downloader: Sleeping 3 secs to let Session2 start" time.sleep(3) tdef2 = TorrentDef.load(self.torrentfn) dscfg2 = DownloadStartupConfig() - dscfg2.set_dest_dir(self.config_path2) + dscfg2.set_dest_dir(self.getDestDir(2)) dscfg2.set_video_event_callback(self.downloader_vod_ready_callback) - d = self.session2.start_download(tdef2,dscfg2) + d = self.session2.start_download(tdef2, dscfg2) d.set_state_callback(self.downloader_state_callback) time.sleep(40) @@ -124,13 +120,13 @@ def subtest_download(self): self.subtest_connect2downloader() time.sleep(80) - def downloader_state_callback(self,ds): + def downloader_state_callback(self, ds): d = ds.get_download() - print >>sys.stderr,"test: download:",dlstatus_strings[ds.get_status()],ds.get_progress() + print >> sys.stderr, "test: download:", dlstatus_strings[ds.get_status()], ds.get_progress() - return (1.0,False) + return (1.0, False) - def downloader_vod_ready_callback(self,d,event,params): + def downloader_vod_ready_callback(self, d, event, params): """ Called by SessionThread """ if event == VODEVENT_START: stream = params["stream"] @@ -144,34 +140,34 @@ def downloader_vod_ready_callback(self,d,event,params): def subtest_connect2downloader(self): - print >> sys.stderr,"test: verifier: Connecting to seeder to check bitfield" + print >> sys.stderr, "test: verifier: Connecting to seeder to check bitfield" infohash = self.tdef.get_infohash() - s = BTConnection('localhost',self.mylistenport,user_infohash=infohash) + s = BTConnection('localhost', self.mylistenport, user_infohash=infohash) s.read_handshake_medium_rare() try: s.s.settimeout(10.0) resp = s.recv() self.assert_(len(resp) > 0) - print >> sys.stderr,"test: verifier: Got message",getMessageName(resp[0]) + print >> sys.stderr, "test: verifier: Got message", getMessageName(resp[0]) self.assert_(resp[0] == EXTEND) resp = s.recv() self.assert_(len(resp) > 0) - print >> sys.stderr,"test: verifier: Got 2nd message",getMessageName(resp[0]) + print >> sys.stderr, "test: verifier: Got 2nd message", getMessageName(resp[0]) self.assert_(resp[0] == BITFIELD) - b = Bitfield(self.npieces,resp[1:]) - print >> sys.stderr,"test: verifier: Bitfield is",`b.toboollist()` + b = Bitfield(self.npieces, resp[1:]) + print >> sys.stderr, "test: verifier: Bitfield is", `b.toboollist()` b2 = Bitfield(self.npieces) b2[0] = True - msg = BITFIELD+b2.tostring() + msg = BITFIELD + b2.tostring() s.send(msg) time.sleep(5) except socket.timeout: - print >> sys.stderr,"test: verifier: Timeout, peer didn't reply" + print >> sys.stderr, "test: verifier: Timeout, peer didn't reply" self.assert_(False) s.close() @@ -180,21 +176,8 @@ class InfiniteSource: def __init__(self, piece_length): self.emptypiece = " " * piece_length - def read(self,len): + def read(self, len): return self.emptypiece[:len] def close(self): pass - - - -def test_suite(): - suite = unittest.TestSuite() - # We should run the tests in a separate Python interpreter to prevent - # problems with our singleton classes, e.g. PeerDB, etc. - if len(sys.argv) != 2: - print "Usage: python test_seeding.py " - else: - suite.addTest(TestSeeding(sys.argv[1])) - - return suite diff --git a/Tribler/Test/test_as_server.py b/Tribler/Test/test_as_server.py index 2fbcd97119d..2fab31f4a01 100644 --- a/Tribler/Test/test_as_server.py +++ b/Tribler/Test/test_as_server.py @@ -10,6 +10,8 @@ import shutil import time import gc +import wx +import Image from traceback import print_exc from threading import enumerate as enumerate_threads @@ -17,18 +19,66 @@ from Tribler.Core.Session import * from Tribler.Core.SessionConfig import * from Tribler.Core.CacheDB.sqlitecachedb import SQLiteCacheDB +import re BASE_DIR = os.path.abspath(os.path.join(os.path.dirname(os.path.realpath(__file__)))) +STATE_DIR = os.path.join(BASE_DIR, "test_.Tribler") +DEST_DIR = os.path.join(BASE_DIR, "test_TriblerDownloads") + +from Tribler.Core import defaults +defaults.sessdefaults["state_dir"] = STATE_DIR +defaults.dldefaults["saveas"] = DEST_DIR DEBUG = False -class TestAsServer(unittest.TestCase): +class AbstractServer(unittest.TestCase): + def setUpCleanup(self): + # Elric: If the files are still there it means that either the last run segfaulted or + # that there was some kind of lock on those and the tearDown wasn't able to delete them. + # In either case the test would fail, so just remove the dirs. + for path in os.listdir(BASE_DIR): + path = os.path.join(BASE_DIR, path) + if path.startswith(STATE_DIR) or path.startswith(DEST_DIR): + shutil.rmtree(path) + + def tearDownCleanup(self): + self.setUpCleanup() + + def getStateDir(self, nr=0): + dir = STATE_DIR + (str(nr + 1) if nr else '') + if not os.path.exists(dir): + os.mkdir(dir) + return dir + + def getDestDir(self, nr=0): + dir = DEST_DIR + (str(nr + 1) if nr else '') + if not os.path.exists(dir): + os.mkdir(dir) + return dir + + def annotate(self, annotation, start=True, destdir="output"): + if not os.path.exists(destdir): + os.makedirs(destdir) + + filename = os.path.join(destdir, "annotations.txt") + if os.path.exists(filename): + f = open(filename, 'a') + else: + f = open(filename, 'w') + print >> f, "time remark start" + + annotation = re.sub('[^a-zA-Z0-9_]', '_', annotation) + + print >> f, time.time(), annotation, '1' if start else '0' + f.close() + +class TestAsServer(AbstractServer): """ Parent class for testing the server-side of Tribler """ def setUp(self): - """ unittest test setup code """ + self.setUpCleanup() self.setUpPreSession() self.session = Session(self.config) @@ -39,12 +89,12 @@ def setUp(self): while not self.session.lm.initComplete: time.sleep(1) + self.annotate(self._testMethodName, start=True) + def setUpPreSession(self): """ Should set self.config_path and self.config """ - self.config_path = tempfile.mkdtemp() - self.config = SessionStartupConfig() - self.config.set_state_dir(self.config_path) + self.config.set_state_dir(self.getStateDir()) self.config.set_listen_port(random.randint(10000, 60000)) self.config.set_torrent_checking(False) self.config.set_dialback(False) @@ -54,32 +104,28 @@ def setUpPreSession(self): self.config.set_dispersy(False) self.config.set_swift_proc(False) self.config.set_mainline_dht(False) - self.config.set_install_dir(os.path.abspath(os.path.join(__file__, '..', '..', '..'))) def tearDown(self): + self.annotate(self._testMethodName, start=False) + """ unittest test tear down code """ if self.session is not None: self._shutdown_session(self.session) Session.del_instance() + time.sleep(10) + gc.collect() + ts = enumerate_threads() print >> sys.stderr, "test_as_server: Number of threads still running", len(ts) for t in ts: print >> sys.stderr, "test_as_server: Thread still running", t.getName(), "daemon", t.isDaemon(), "instance:", t - SQLiteCacheDB.delInstance() - from Tribler.Core.CacheDB.sqlitecachedb import unregister - unregister() + if SQLiteCacheDB.hasInstance(): + SQLiteCacheDB.getInstance().close_all() + SQLiteCacheDB.delInstance() - time.sleep(10) - gc.collect() - - try: - shutil.rmtree(self.config_path) - except: - # Not fatal if something goes wrong here, and Win32 often gives - # spurious Permission Denied errors. - print_exc() + self.tearDownCleanup() def _shutdown_session(self, session): session_shutdown_start = time.time() @@ -96,3 +142,175 @@ def _shutdown_session(self, session): time.sleep(1) print >> sys.stderr, "test_as_server: Session is shutdown" + +class TestGuiAsServer(TestAsServer): + """ + Parent class for testing the gui-side of Tribler + """ + + def setUp(self): + self.setUpCleanup() + + self.app = wx.GetApp() + if not self.app: + self.app = wx.PySimpleApp(redirect=False) + + self.guiUtility = None + self.frame = None + self.lm = None + self.session = None + self.quitting = False + + self.asserts = [] + + self.annotate(self._testMethodName, start=True) + + def assert_(self, boolean, reason, doassert=True): + if not boolean: + self.screenshot("ASSERT: %s" % reason) + self.quit() + + self.asserts.append((boolean, reason)) + + if doassert: + assert boolean, reason + + def startTest(self, callback): + from Tribler.Main.vwxGUI.GuiUtility import GUIUtility + from Tribler.Main.tribler import run + + self.quitting = False + def wait_for_frame(): + print >> sys.stderr, "tgs: lm initcomplete, staring to wait for frame to be ready" + self.frame = self.guiUtility.frame + self.CallConditional(30, lambda : self.frame.ready, callback) + + def wait_for_init(): + print >> sys.stderr, "tgs: lm initcomplete, staring to wait for GUIUtility to be ready" + + self.guiUtility = GUIUtility.getInstance() + + self.CallConditional(30, lambda : self.guiUtility.frame, wait_for_frame) + + def wait_for_guiutility(): + print >> sys.stderr, "tgs: waiting for guiutility instance" + self.CallConditional(30, lambda: GUIUtility.hasInstance(), wait_for_init) + + def wait_for_instance(): + print >> sys.stderr, "tgs: found instance, staring to wait for lm to be initcomplete" + self.session = Session.get_instance() + self.lm = self.session.lm + + self.CallConditional(30, lambda : self.lm.initComplete, wait_for_guiutility) + + def wait_for_session(): + print >> sys.stderr, "tgs: waiting for session instance" + self.CallConditional(30, lambda: Session.has_instance(), wait_for_instance) + + self.CallConditional(30, Session.has_instance, wait_for_session) + + # modify argv to let tribler think its running from a different directory + sys.argv = [os.path.abspath('./.exe')] + run() + + def Call(self, seconds, callback): + if not self.quitting: + if seconds: + wx.CallLater(seconds * 1000, callback) + elif not wx.Thread_IsMain(): + wx.CallAfter(callback) + else: + callback() + + def CallConditional(self, timeout, condition, callback, assertMsg=None): + t = time.time() + + def DoCheck(): + if not self.quitting: + if time.time() - t < timeout: + if condition(): + print >> sys.stderr, "tgs: condition satisfied after %d seconds, calling callback" % (time.time() - t) + callback() + else: + self.Call(0.5, DoCheck) + else: + print >> sys.stderr, "tgs: quitting, condition was not satisfied in %d seconds" % timeout + self.assert_(False, assertMsg if assertMsg else "Condition was not satisfied in %d seconds" % timeout, doassert=False) + self.Call(0, DoCheck) + + def quit(self): + if self.frame: + self.frame.OnCloseWindow() + else: + def do_quit(): + self.app.ExitMainLoop + wx.WakeUpMainThread() + + self.Call(1, do_quit) + self.Call(2.5, self.app.Exit) + + self.quitting = True + + def tearDown(self): + self.annotate(self._testMethodName, start=False) + + """ unittest test tear down code """ + del self.guiUtility + del self.frame + del self.lm + del self.session + + time.sleep(10) + gc.collect() + + ts = enumerate_threads() + print >> sys.stderr, "teardown: Number of threads still running", len(ts) + for t in ts: + print >> sys.stderr, "teardown: Thread still running", t.getName(), "daemon", t.isDaemon(), "instance:", t + + dhtlog = os.path.join(STATE_DIR, 'pymdht.log') + if os.path.exists(dhtlog): + print >> sys.stderr, "teardown: content of pymdht.log" + f = open(dhtlog, 'r') + for line in f: + line = line.strip() + if line: + print >> sys.stderr, line + f.close() + print >> sys.stderr, "teardown: finished printing content of pymdht.log" + + self.tearDownCleanup() + + for boolean, reason in self.asserts: + assert boolean, reason + + def screenshot(self, title=None, destdir="output"): + app = wx.GetApp() + window = app.GetTopWindow() + rect = window.GetRect() + + screen = wx.WindowDC(window) + bmp = wx.EmptyBitmap(rect.GetWidth(), rect.GetHeight() + 30) + + mem = wx.MemoryDC(bmp) + mem.Blit(0, 30, rect.GetWidth(), rect.GetHeight(), screen, rect.GetX(), rect.GetY()) + if title: + titlerect = wx.Rect(0, 0, rect.GetWidth(), 30) + mem.DrawRectangleRect(titlerect) + mem.DrawLabel(title, titlerect, wx.ALIGN_CENTER_HORIZONTAL | wx.ALIGN_CENTER_VERTICAL) + del mem + + myWxImage = wx.ImageFromBitmap(bmp) + im = Image.new('RGB', (myWxImage.GetWidth(), myWxImage.GetHeight())) + im.fromstring(myWxImage.GetData()) + + if not os.path.exists(destdir): + os.makedirs(destdir) + index = 1 + filename = os.path.join(destdir, 'Screenshot-%.2d.png' % index) + while os.path.exists(filename): + index += 1 + filename = os.path.join(destdir, 'Screenshot-%.2d.png' % index) + im.save(filename) + + del bmp diff --git a/Tribler/Test/test_gui_as_server.py b/Tribler/Test/test_gui_as_server.py deleted file mode 100644 index 559c8429f9c..00000000000 --- a/Tribler/Test/test_gui_as_server.py +++ /dev/null @@ -1,230 +0,0 @@ -# Written by Niels Zeilemaker -# see LICENSE.txt for license information - -import unittest -import os -import sys -import wx -import gc -import Image -import re - -from threading import Thread, enumerate as enumerate_threads -from time import sleep, time - -from Tribler.Main.tribler import run -from Tribler.Core.Session import Session -from Tribler.Main.vwxGUI.GuiUtility import GUIUtility -import shutil -from Tribler.Core import defaults - -STATE_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "test_.Tribler") -DEST_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "test_TriblerDownloads") - -# Set custom state_dir and dest_dir paths so we do not mess with local installs by accident. -defaults.sessdefaults["state_dir"] = STATE_DIR -defaults.dldefaults["saveas"] = DEST_DIR - -try: - from collections import OrderedDict -except ImportError: - from .python27_ordereddict import OrderedDict - -class TestGuiAsServer(unittest.TestCase): - """ - Parent class for testing the gui-side of Tribler - """ - - def setUp(self): - """ unittest test setup code """ - # Elric: If the files are still there it means that either the last run segfaulted or - # that there was some kind of lock on those and the tearDown wasn't able to delete them. - # In either case the test would fail, so just remove the dirs. - for dir_ in (STATE_DIR, DEST_DIR): - if os.path.exists(dir_): - shutil.rmtree(dir_) - os.mkdir(dir_) - - self.app = wx.GetApp() - if not self.app: - self.app = wx.PySimpleApp(redirect=False) - - self.guiUtility = None - self.frame = None - self.lm = None - self.session = None - self.quitting = False - - self.asserts = [] - - self.annotate(self._testMethodName, start=True) - - def assert_(self, boolean, reason, doassert=True): - if not boolean: - self.screenshot("ASSERT: %s" % reason) - self.quit() - - self.asserts.append((boolean, reason)) - - if doassert: - assert boolean, reason - - def startTest(self, callback): - self.quitting = False - def wait_for_frame(): - print >> sys.stderr, "tgs: lm initcomplete, staring to wait for frame to be ready" - self.frame = self.guiUtility.frame - self.CallConditional(30, lambda : self.frame.ready, callback) - - def wait_for_init(): - print >> sys.stderr, "tgs: lm initcomplete, staring to wait for GUIUtility to be ready" - - self.guiUtility = GUIUtility.getInstance() - - self.CallConditional(30, lambda : self.guiUtility.frame, wait_for_frame) - - def wait_for_guiutility(): - print >> sys.stderr, "tgs: waiting for guiutility instance" - self.CallConditional(30, lambda: GUIUtility.hasInstance(), wait_for_init) - - def wait_for_instance(): - print >> sys.stderr, "tgs: found instance, staring to wait for lm to be initcomplete" - self.session = Session.get_instance() - self.lm = self.session.lm - - self.CallConditional(30, lambda : self.lm.initComplete, wait_for_guiutility) - - def wait_for_session(): - print >> sys.stderr, "tgs: waiting for session instance" - self.CallConditional(30, lambda: Session.has_instance(), wait_for_instance) - - self.CallConditional(30, Session.has_instance, wait_for_session) - - # modify argv to let tribler think its running from a different directory - sys.argv = [os.path.abspath('./.exe')] - run() - - def Call(self, seconds, callback): - if not self.quitting: - if seconds: - wx.CallLater(seconds * 1000, callback) - elif not wx.Thread_IsMain(): - wx.CallAfter(callback) - else: - callback() - - def CallConditional(self, timeout, condition, callback, assertMsg=None): - t = time() - - def DoCheck(): - if not self.quitting: - if time() - t < timeout: - if condition(): - print >> sys.stderr, "tgs: condition satisfied after %d seconds, calling callback" % (time() - t) - callback() - else: - self.Call(0.5, DoCheck) - else: - print >> sys.stderr, "tgs: quitting, condition was not satisfied in %d seconds" % timeout - self.assert_(False, assertMsg if assertMsg else "Condition was not satisfied in %d seconds" % timeout, doassert=False) - self.Call(0, DoCheck) - - def quit(self): - if self.frame: - self.frame.OnCloseWindow() - else: - def do_quit(): - self.app.ExitMainLoop - wx.WakeUpMainThread() - - self.Call(1, do_quit) - self.Call(2.5, self.app.Exit) - - self.quitting = True - - def tearDown(self): - self.annotate(self._testMethodName, start=False) - - """ unittest test tear down code """ - del self.guiUtility - del self.frame - del self.lm - del self.session - - from Tribler.Core.CacheDB.sqlitecachedb import unregister - unregister() - - sleep(10) - gc.collect() - - ts = enumerate_threads() - print >> sys.stderr, "teardown: Number of threads still running", len(ts) - for t in ts: - print >> sys.stderr, "teardown: Thread still running", t.getName(), "daemon", t.isDaemon(), "instance:", t - - dhtlog = os.path.join(STATE_DIR, 'pymdht.log') - if os.path.exists(dhtlog): - print >> sys.stderr, "teardown: content of pymdht.log" - f = open(dhtlog, 'r') - for line in f: - line = line.strip() - if line: - print >> sys.stderr, line - f.close() - print >> sys.stderr, "teardown: finished printing content of pymdht.log" - - shutil.rmtree(STATE_DIR) - shutil.rmtree(DEST_DIR) - - for boolean, reason in self.asserts: - assert boolean, reason - - assert not os.path.exists(STATE_DIR), "state_dir (%s) should not exist" % STATE_DIR - assert not os.path.exists(DEST_DIR), "dest_dir (%s) should not exist" % DEST_DIR - - def annotate(self, annotation, start=True, destdir="output"): - if not os.path.exists(destdir): - os.makedirs(destdir) - - filename = os.path.join(destdir, "annotations.txt") - if os.path.exists(filename): - f = open(filename, 'a') - else: - f = open(filename, 'w') - print >> f, "time remark start" - - annotation = re.sub('[^a-zA-Z0-9_]', '_', annotation) - - print >> f, time(), annotation, '1' if start else '0' - f.close() - - def screenshot(self, title=None, destdir="output"): - app = wx.GetApp() - window = app.GetTopWindow() - rect = window.GetRect() - - screen = wx.WindowDC(window) - bmp = wx.EmptyBitmap(rect.GetWidth(), rect.GetHeight() + 30) - - mem = wx.MemoryDC(bmp) - mem.Blit(0, 30, rect.GetWidth(), rect.GetHeight(), screen, rect.GetX(), rect.GetY()) - if title: - titlerect = wx.Rect(0, 0, rect.GetWidth(), 30) - mem.DrawRectangleRect(titlerect) - mem.DrawLabel(title, titlerect, wx.ALIGN_CENTER_HORIZONTAL | wx.ALIGN_CENTER_VERTICAL) - del mem - - myWxImage = wx.ImageFromBitmap(bmp) - im = Image.new('RGB', (myWxImage.GetWidth(), myWxImage.GetHeight())) - im.fromstring(myWxImage.GetData()) - - if not os.path.exists(destdir): - os.makedirs(destdir) - index = 1 - filename = os.path.join(destdir, 'Screenshot-%.2d.png' % index) - while os.path.exists(filename): - index += 1 - filename = os.path.join(destdir, 'Screenshot-%.2d.png' % index) - im.save(filename) - - del bmp diff --git a/Tribler/Test/test_gui_general.py b/Tribler/Test/test_gui_general.py index 6038ebc0cdf..808de3fc9a1 100644 --- a/Tribler/Test/test_gui_general.py +++ b/Tribler/Test/test_gui_general.py @@ -2,9 +2,9 @@ import unittest -from Tribler.Test.test_gui_as_server import TestGuiAsServer +from Tribler.Test.test_as_server import TestGuiAsServer -DEBUG=True +DEBUG = True class TestRemoteQuery(TestGuiAsServer): """ Testing QUERY message of Social Network extension V1 @@ -14,11 +14,11 @@ def do_assert(): self.assert_(self.guiUtility.guiPage == 'stats', 'Debug page is not selected') self.screenshot('After selecting debug page') self.quit() - + def do_page(): self.guiUtility.ShowPage('stats') self.Call(10, do_assert) - + self.startTest(do_page) if __name__ == "__main__": diff --git a/Tribler/Test/test_libtorrent_download.py b/Tribler/Test/test_libtorrent_download.py index 69a6b811e51..cc20f58d93d 100644 --- a/Tribler/Test/test_libtorrent_download.py +++ b/Tribler/Test/test_libtorrent_download.py @@ -4,7 +4,7 @@ import os from time import time -from Tribler.Test.test_gui_as_server import TestGuiAsServer +from Tribler.Test.test_as_server import TestGuiAsServer, BASE_DIR from Tribler.Main.globals import DefaultDownloadStartupConfig import binascii from Tribler.Core.Libtorrent.LibtorrentMgr import LibtorrentMgr @@ -12,6 +12,23 @@ DEBUG = True class TestLibtorrentDownload(TestGuiAsServer): + def test_downloadfromfile(self): + infohash = binascii.unhexlify('66ED7F30E3B30FA647ABAA19A36E7503AA071535') + + def do_assert(): + self.assert_(self.frame.librarylist.list.items.has_key(infohash), 'no download in librarylist') + self.assert_(self.frame.librarylist.list.items.has_key(infohash) and self.frame.librarylist.list.GetItem(infohash).original_data.ds and self.frame.librarylist.list.GetItem(infohash).original_data.ds.progress > 0, 'no download progress') + + self.screenshot('After starting a libtorrent download from url') + self.quit() + + def do_downloadfromurl(): + self.guiUtility.showLibrary() + self.frame.startDownload(os.path.join(BASE_DIR, "data", "Pioneer.One.S01E06.720p.x264-VODO.torrent"), self.getDestDir()) + self.Call(30, do_assert) + + self.startTest(do_downloadfromurl) + def test_downloadfromurl(self): infohash = binascii.unhexlify('24ad1d85206db5f85491a690e6723e27f4551e01') @@ -24,8 +41,7 @@ def do_assert(): def do_downloadfromurl(): self.guiUtility.showLibrary() - destdir = DefaultDownloadStartupConfig.getInstance().get_dest_dir() - self.frame.startDownloadFromUrl(r'http://www.clearbits.net/get/1678-zenith-part-1.torrent', destdir) + self.frame.startDownloadFromUrl(r'http://www.clearbits.net/get/1678-zenith-part-1.torrent', self.getDestDir()) self.Call(30, do_assert) self.startTest(do_downloadfromurl) @@ -42,8 +58,7 @@ def do_assert(): def do_downloadfrommagnet(): self.guiUtility.showLibrary() - destdir = DefaultDownloadStartupConfig.getInstance().get_dest_dir() - self.frame.startDownloadFromMagnet(r'magnet:?xt=urn:btih:5ac55cf1b935291f6fc92ad7afd34597498ff2f7&dn=Pioneer+One+S01E01+Xvid-VODO&title=', destdir) + self.frame.startDownloadFromMagnet(r'magnet:?xt=urn:btih:5ac55cf1b935291f6fc92ad7afd34597498ff2f7&dn=Pioneer+One+S01E01+Xvid-VODO&title=', self.getDestDir()) self.Call(120, do_assert) self.startTest(do_downloadfrommagnet) diff --git a/Tribler/Test/test_remote_search.py b/Tribler/Test/test_remote_search.py new file mode 100644 index 00000000000..6cb12fdeb0e --- /dev/null +++ b/Tribler/Test/test_remote_search.py @@ -0,0 +1,112 @@ +# see LICENSE.txt for license information + +import unittest +import sys + +from Tribler.Test.test_as_server import TestGuiAsServer +from Tribler.Main.globals import DefaultDownloadStartupConfig +from Tribler.Main.vwxGUI.list_item import ChannelListItem + +DEBUG = True +class TestRemoteQuery(TestGuiAsServer): + """ + Testing QUERY message of Social Network extension V1 + """ + def test_remotesearch(self): + def do_assert(): + self.screenshot('After doing mp3 search, got %d results' % self.frame.searchlist.GetNrResults()) + self.assert_(self.frame.searchlist.GetNrResults() > 0, 'no results') + self.assert_(self.guiUtility.torrentsearch_manager.gotRemoteHits, 'no remote results') + self.quit() + + def do_search(): + self.guiUtility.dosearch(u'mp3') + self.Call(10, do_assert) + + self.startTest(do_search) + + def test_ffsearch(self): + def do_assert(): + self.screenshot('After doing xxx search, got %d results' % self.frame.searchlist.GetNrResults()) + self.assert_(self.frame.searchlist.GetNrResults() == 0, 'got results') + self.quit() + + def do_search(): + self.guiUtility.toggleFamilyFilter(True) + self.guiUtility.dosearch(u'xxx') + self.Call(10, do_assert) + + self.startTest(do_search) + + def test_channelsearch(self): + def do_assert(): + self.assert_(self.guiUtility.guiPage == 'selectedchannel', 'no in selectedchannel page') + + self.screenshot('After doubleclicking first channel') + self.quit() + + def do_doubleclick(): + self.assert_(self.frame.searchlist.GetNrChannels() > 0, 'no channels matching vodo') + + self.screenshot('After doing vodo search, got %d results' % self.frame.searchlist.GetNrResults()) + items = self.frame.searchlist.GetItems() + for _, item in items.iteritems(): + if isinstance(item, ChannelListItem): + item.OnDClick() + break + else: + self.assert_(False, 'could not find ChannelListItem') + + self.Call(10, do_assert) + + def do_search(): + self.guiUtility.dosearch(u'vodo') + self.Call(10, do_doubleclick) + + self.startTest(do_search) + + def test_remotedownload(self): + def do_assert(): + self.screenshot('After doing vodo search + pioneer filter + selecting item + download') + self.quit() + + def do_download(): + self.screenshot('After doing vodo search + pioneer filter + selecting item') + + defaultDLConfig = DefaultDownloadStartupConfig.getInstance() + defaultDLConfig.set_show_saveas(False) + + self.frame.top_bg.OnDownload() + self.CallConditional(120, lambda: self.frame.librarylist.GetNrResults() > 0, do_assert, 'no download in librarylist') + + def do_select(): + self.assert_(self.frame.searchlist.GetNrResults() > 0, 'no hits matching vodo + pioneer') + self.screenshot('After doing vodo search + pioneer filter, got %d results' % self.frame.searchlist.GetNrResults()) + items = self.frame.searchlist.GetItems() + keys = items.keys() + + self.frame.searchlist.Select(keys[0]) + self.Call(5, do_download) + + def do_filter(): + self.assert_(self.frame.searchlist.GetNrResults() > 0, 'no hits matching vodo + pioneer') + self.screenshot('After doing vodo search, got %d results' % self.frame.searchlist.GetNrResults()) + self.frame.searchlist.GotFilter('pioneer') + + self.Call(5, do_select) + + def do_search(): + self.guiUtility.dosearch(u'vodo') + self.Call(10, do_filter) + + self.startTest(do_search) + + def startTest(self, callback): + def wait_for_search(): + print >> sys.stderr, "tgs: frame ready, staring to wait for search to be ready" + self.CallConditional(300, lambda : self.frame.SRstatusbar.GetConnections() > 0.5, callback) + + TestGuiAsServer.startTest(self, wait_for_search) + +if __name__ == "__main__": + unittest.main() diff --git a/Tribler/Test/test_sqlitecachedbhandler.py b/Tribler/Test/test_sqlitecachedbhandler.py index 3a15f2f5f03..06fd402eb65 100644 --- a/Tribler/Test/test_sqlitecachedbhandler.py +++ b/Tribler/Test/test_sqlitecachedbhandler.py @@ -30,7 +30,7 @@ class TestSqliteBasicDBHandler(unittest.TestCase): def setUp(self): - dbpath = init_bak_tribler_sdb('bak_new_tribler.sdb', overwrite = True) + dbpath = init_bak_tribler_sdb('bak_new_tribler.sdb', overwrite=True) self.sqlitedb = SQLiteCacheDB.getInstance() self.sqlitedb.initDB(dbpath, busytimeout=BUSYTIMEOUT) self.sqlitedb.waitForUpdateComplete() @@ -117,7 +117,7 @@ def test_getAll(self): class TestSqlitePeerDBHandler(unittest.TestCase): def setUp(self): - dbpath = init_bak_tribler_sdb('bak_new_tribler.sdb', overwrite = True) + dbpath = init_bak_tribler_sdb('bak_new_tribler.sdb', overwrite=True) db = SQLiteCacheDB.getInstance() db.initDB(dbpath) db.waitForUpdateComplete() @@ -370,7 +370,7 @@ def test_getPermIDByIP(self): class TestTorrentDBHandler(unittest.TestCase): def setUp(self): - dbpath = init_bak_tribler_sdb('bak_new_tribler.sdb', overwrite = True) + dbpath = init_bak_tribler_sdb('bak_new_tribler.sdb', overwrite=True) db = SQLiteCacheDB.getInstance() db.initDB(dbpath) db.waitForUpdateComplete() @@ -394,7 +394,7 @@ def test_hasTorrent(self): def test_count(self): num = self.tdb.getNumberTorrents() - assert num == 4483 + assert num == 4483, num def test_loadTorrents(self): torrent_size = self.tdb.getNumberTorrents() @@ -559,7 +559,7 @@ def test_freeSpace(self): class TestMyPreferenceDBHandler(unittest.TestCase): def setUp(self): - dbpath = init_bak_tribler_sdb('bak_new_tribler.sdb', overwrite = True) + dbpath = init_bak_tribler_sdb('bak_new_tribler.sdb', overwrite=True) db = SQLiteCacheDB.getInstance() db.initDB(dbpath) db.waitForUpdateComplete() diff --git a/Tribler/Test/test_torrentcollecting.py b/Tribler/Test/test_torrentcollecting.py deleted file mode 100644 index 5a43a0ee6d3..00000000000 --- a/Tribler/Test/test_torrentcollecting.py +++ /dev/null @@ -1,90 +0,0 @@ -import os -import unittest - -from Tribler.Core.CacheDB.sqlitecachedb import SQLiteCacheDB, str2bin, CURRENT_MAIN_DB_VERSION -from Tribler.Core.CacheDB.SqliteCacheDBHandler import MyPreferenceDBHandler -from bak_tribler_sdb import * - -CREATE_SQL_FILE = os.path.join('Tribler', "schema_sdb_v" + str(CURRENT_MAIN_DB_VERSION) + ".sql") -assert os.path.isfile(CREATE_SQL_FILE) - -def init(): - init_bak_tribler_sdb() - - -SQLiteCacheDB.DEBUG = False - -class TestTorrentCollecting(unittest.TestCase): - - def setUp(self): - self.db = SQLiteCacheDB.getInstance() - self.db.initDB(TRIBLER_DB_PATH_BACKUP) - - permid = {} - permid[3127] = 'MFIwEAYHKoZIzj0CAQYFK4EEABoDPgAEAcPezgQ13k1MSOaUrCPisWRhYuNT7Tm+q5rUgHFvAWd9b+BcSut6TCniEgHYHDnQ6TH/vxQBqtY8Loag' - permid[994] = 'MFIwEAYHKoZIzj0CAQYFK4EEABoDPgAEAJUNmwvDaigRaM4cj7cE2O7lessqnnFEQsan7df9AZS8xeNmVsP/XXVrEt4t7e2TNicYmjn34st/sx2P' - permid[19] = 'MFIwEAYHKoZIzj0CAQYFK4EEABoDPgAEAAJv2YLuIWa4QEdOEs4CPRxQZDwZphKd/xK/tgbcALG198nNdT10znJ2sZYl+OJIvj7YfYp75PrrnWNX' - permid[5] = 'MFIwEAYHKoZIzj0CAQYFK4EEABoDPgAEAAB0XbUrw5b8CrTrMZST1SPyrzjgSzIE6ynALtlZASGAb+figVXRRGpKW6MSal3KnEm1/q0P3JPWrhCE' - self.permid = permid - - db = MyPreferenceDBHandler.getInstance() - db.loadData() - - def tearDown(self): - self.db.close() - - @unittest.skip - def test_selecteTorrentToCollect(self): - db = PreferenceDBHandler.getInstance() - tc = SimpleTorrentCollecting(None, None) - truth = {3127:235, 994:20, 19:1, 5:0} - - for pid in truth: - pl = db.getPrefList(str2bin(self.permid[pid])) - assert len(pl) == truth[pid], [pid, len(pl)] - # test random selection - infohash = tc.selecteTorrentToCollect(pl, True) - if pid == 994 or pid == 3127: - assert len(infohash) == 20, infohash - else: - assert infohash is None, infohash - - # tc.updateAllCooccurrence() - for pid in truth: - pl = db.getPrefList(str2bin(self.permid[pid])) - assert len(pl) == truth[pid], [pid, len(pl)] - # test selecting most relevant torrent - infohash = tc.selecteTorrentToCollect(pl, False) - if pid == 994: - tid = tc.torrent_db.getTorrentID(infohash) - assert tid == 8979 - - permid = self.permid[pid] - infohash = tc.updatePreferences(permid, pl) - tid = tc.torrent_db.getTorrentID(infohash) - assert tid == 8979 - elif pid == 3127: - tid = tc.torrent_db.getTorrentID(infohash) - assert tid == 9170 - - permid = self.permid[pid] - infohash = tc.updatePreferences(permid, pl) - tid = tc.torrent_db.getTorrentID(infohash) - assert tid == 9170 - else: - assert infohash is None, infohash - - -def test_suite(): - suite = unittest.TestSuite() - suite.addTest(unittest.makeSuite(TestTorrentCollecting)) - - return suite - -def main(): - init() - unittest.main(defaultTest='test_suite') - - -if __name__ == '__main__': - main()