diff --git a/experiments/tribler/__init__.py b/experiments/tribler/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/experiments/tribler/channel_download.conf b/experiments/tribler/channel_download.conf new file mode 100644 index 000000000..b347029db --- /dev/null +++ b/experiments/tribler/channel_download.conf @@ -0,0 +1,33 @@ + +experiment_name = "Channel Download" +local_output_dir = "output" +output_dir = "output" + +experiment_server_cmd = 'experiment_server.py' + +tracker_cmd = 'run_tracker.sh' + +experiment_time = 3600 +local_instance_cmd = "process_guard.py -c channel_download.py -c channel_download.py -c channel_download.py -c channel_download.py -c channel_download.py -t $EXPERIMENT_TIME -m $OUTPUT_DIR -o $OUTPUT_DIR " + +post_process_cmd = 'gumby/scripts/channel_download/channel_download.sh' + +sync_subscribers_amount = 5 +#Run python in optimized mode? +#PYTHONOPTIMIZE = yup +use_local_venv = FALSE + +# Delay between sending the experiment info and the start signal +sync_experiment_start_delay = 1 + +sync_port = __unique_port__ +sync_host = 127.0.0.1 + +messages_to_plot = 'torrent' + +tracker_crypto = 'nocrypto' + +MANHOLE_ENABLE = TRUE +MANHOLE_PORT = 0 + +SCENARIO_FILE = channel_download_1h.scenario diff --git a/experiments/tribler/channel_download.py b/experiments/tribler/channel_download.py new file mode 100755 index 000000000..1b71f71b1 --- /dev/null +++ b/experiments/tribler/channel_download.py @@ -0,0 +1,401 @@ +#!/usr/bin/env python2 +import logging +import os +from binascii import hexlify, unhexlify +from os import path as path +from sys import path as pythonpath +import time + +import posix + +from twisted.internet import reactor +from twisted.internet.task import LoopingCall + +from gumby.experiments.TriblerDispersyClient import TriblerDispersyExperimentScriptClient, BASE_DIR +from gumby.experiments.dispersyclient import main, DispersyExperimentScriptClient + +# TODO(emilon): Fix this crap +pythonpath.append(path.abspath(path.join(path.dirname(__file__), '..', '..', '..', "./tribler"))) +pythonpath.append('.') + +from Tribler.Core.DownloadConfig import DefaultDownloadStartupConfig +from Tribler.Core.TorrentDef import TorrentDef +from Tribler.community.channel.community import ChannelCommunity +from Tribler.community.channel.preview import PreviewChannelCommunity +from Tribler.Policies.credit_mining_util import TorrentManagerCM +from Tribler.Core.RemoteTorrentHandler import RemoteTorrentHandler + +class ChannelDownloadClient(TriblerDispersyExperimentScriptClient): + + def __init__(self, *argv, **kwargs): + super(ChannelDownloadClient, self).__init__(*argv, **kwargs) + from Tribler.community.allchannel.community import AllChannelCommunity + self.community_class = AllChannelCommunity + + self._logger.setLevel(logging.DEBUG) + + self.my_channel = None + self.joined_community = None + + self.join_lc = None + self.dl_lc = {} + + self.upload_dir_path = None + + self.torrent_mgr = None + self.downloaded_torrent = {} + + self.num_peers = -1 + + self.id_experiment = os.environ['EXPERIMENT_NAME'].replace(" ", "") + + def start_session(self): + super(ChannelDownloadClient, self).start_session() + self.session_deferred.addCallback(self.__config_dispersy) + self.session_deferred.addErrback(self._logger.error) + + self.num_peers = os.environ.get('das4_instances_to_run', -1) + if self.num_peers == -1: + self._logger.error("Cannot get var:das4_instances_to_run") + self.num_peers = os.environ.get('sync_subscribers_amount', -1) + if self.num_peers == -1: + self._logger.error("Cannot get var:sync_subscribers_amount") + self.num_peers = len(self.get_peers()) or 200 + + def __config_dispersy(self, session): + + if self._dispersy is None: + reactor.callLater(5.0, self.__config_dispersy, session) + return + + self.session.lm.dispersy = self._dispersy + # self.session.lm.init() + self.session.get_dispersy = True + + self._dispersy.define_auto_load(ChannelCommunity, self._my_member, (), {"tribler_session": self.session}) + self._dispersy.define_auto_load(PreviewChannelCommunity, self._my_member, (), {"tribler_session": self.session}) + + from Tribler.Core.TFTP.handler import TftpHandler + + self.session.lm.tftp_handler = TftpHandler(self.session, self.endpoint, + "fffffffd".decode('hex'), block_size=1024) + self.session.lm.tftp_handler.initialize() + + self.session.lm.rtorrent_handler = RemoteTorrentHandler(self.session) + self.session.get_dispersy_instance = lambda: self.session.lm.dispersy + self.session.lm.rtorrent_handler.initialize() + + self._logger.error("Dispersy configured") + + def start_dispersy(self): + DispersyExperimentScriptClient.start_dispersy(self) + + def setup_session_config(self): + config = super(ChannelDownloadClient, self).setup_session_config() + config.set_state_dir(os.path.abspath(os.path.join(posix.environ.get('OUTPUT_DIR', None) or BASE_DIR, "Tribler-%d") % os.getpid())) + config.set_megacache(True) + config.set_dht_torrent_collecting(True) + config.set_torrent_collecting(False) + config.set_torrent_store(True) + config.set_enable_multichain(False) + config.set_tunnel_community_enabled(False) + config.set_channel_community_enabled(False) + config.set_preview_channel_community_enabled(False) + + config.set_dispersy(False) + + self.upload_dir_path = path.join(config.get_state_dir(), "..", "upload_dir") + if not os.path.exists(self.upload_dir_path): + try: + os.makedirs(self.upload_dir_path) + except OSError: + # race condition of creating shared directory may happen + # this particular issue can be found in DAS environment which we can't control where to put a node + # usually, the publisher handles the directory creation, in DAS case, this is not entirely the case + pass + + logging.debug("Do session config locally") + return config + + def online(self, dont_empty=False): + self.set_community_kwarg("tribler_session", self.session) + self.set_community_kwarg("auto_join_channel", True) + + super(ChannelDownloadClient, self).online() + + settings = self.session.lm.ltmgr.get_session().get_settings() + settings['allow_multiple_connections_per_ip'] = True + settings['ignore_limits_on_local_network'] = False + self.session.lm.ltmgr.get_session().set_settings(settings) + self.set_speed(250000, 100000) + + self.torrent_mgr = TorrentManagerCM(self.session) + + def registerCallbacks(self): + super(ChannelDownloadClient, self).registerCallbacks() + + self.scenario_runner.register(self.create, 'create') + self.scenario_runner.register(self.join, 'join') + self.scenario_runner.register(self.publish, 'publish') + self.scenario_runner.register(self.start_download, 'start_download') + self.scenario_runner.register(self.stop_download, 'stop_download') + self.scenario_runner.register(self.setup_seeder, 'setup_seeder') + self.scenario_runner.register(self.set_speed, 'set_speed') + + def create(self): + self._logger.error("creating-community") + self.my_channel = ChannelCommunity.create_community(self._dispersy, self._my_member, tribler_session=self.session) + self.my_channel.set_channel_mode(ChannelCommunity.CHANNEL_OPEN) + self.my_channel._disp_create_channel(u'channel-name', u'channel-desc') + + self._logger.error("Community %s (%s) created with member: %s", + self.my_channel.get_channel_name(), self.my_channel.get_channel_id(), self.my_channel._master_member) + + def set_speed(self, download, upload): + settings = self.session.lm.ltmgr.get_session().get_settings() + settings['download_rate_limit'] = int(download) + settings["upload_rate_limit"] = int(upload) + self.session.lm.ltmgr.get_session().set_settings(settings) + + def join(self): + if not self.join_lc: + self.join_lc = lc = LoopingCall(self.join) + lc.start(1.0, now=False) + + self._logger.error("trying-to-join-community on %s", self._community) + + if self._community is None: + return + + channels = self._community._channelcast_db.getAllChannels() + + if channels: + cid = channels[0][1] + community = self._community._get_channel_community(cid) + if community._channel_id: + + self._community.disp_create_votecast(community.cid, 2, int(time.time())) + + self._logger.error("joining-community") + for c in self._dispersy.get_communities(): + if isinstance(c, ChannelCommunity): + self.joined_community = c + if self.joined_community is None: + self._logger.error("couldn't join community") + self._logger.error("Joined community with member: %s", self.joined_community._master_member) + self.join_lc.stop() + return + + def __ds_active_callback(self, ds): + from Tribler.Core.simpledefs import dlstatus_strings + + thandle = ds.get_download().handle + availability = 0.0 + if thandle: + num_peers = thandle.status().num_peers + num_pieces, _ = ds.get_pieces_total_complete() + + if num_peers * num_pieces: + for p_num in thandle.piece_availability(): + tmp = float(p_num) / float(num_peers * num_pieces) + availability += tmp + + peers = [x for x in ds.get_peerlist() if any(x['have']) and not + x['ip'].startswith("127.0.0")] + + ds.get_peerlist = lambda: peers + + setting = self.session.lm.ltmgr.get_session().get_settings() + dlmax = setting['download_rate_limit'] + ulmax = setting['upload_rate_limit'] + + self._logger.error('%s:%s infohash=%s, downsp=%d, upsp=%d, progress=%s, status=%s, peers=%s rat=%s dl=%s up=%s avail=%.8f dsavail=%.8f' % + (self._dispersy.lan_address[0], self._dispersy.lan_address[1], + ds.get_download().tdef.get_infohash().encode('hex')[:5], + min(ds.get_current_speed('down'), dlmax + 100000)/1000, + min(ds.get_current_speed('up'), ulmax + 100000)/1000, + ds.get_progress(), + dlstatus_strings[ds.get_status()], + len(peers), + ds.seeding_ratio, + ds.get_total_transferred('down')/1000, + ds.get_total_transferred('up')/1000, + availability, + ds.get_availability())) + + if ds.get_progress() == 0.0 and ds.get_status() == 3: + self._connect_peer(ds.get_download().handle) + + return 1.0, True + + def setup_seeder(self, filename, size): + exp_filename = self.id_experiment + "_" + filename + tpath = path.join(self.upload_dir_path, "%s.data" % exp_filename) + tdef = None + if path.isfile(tpath): + tpath = path.join(self.upload_dir_path, "%s.torrent" % exp_filename) + + if path.isfile(tpath): + tdef = TorrentDef.load(tpath) + else: + # writing file has not finished yet + reactor.callLater(10.0, self.setup_seeder, filename, size) + else: + # file not found. In DAS case, this is because the file is in another node + tdef = self._create_test_torrent(exp_filename, size) + + if tdef: + dscfg = DefaultDownloadStartupConfig.getInstance().copy() + dscfg.set_dest_dir(self.upload_dir_path) + dscfg.set_hops(0) + dscfg.set_safe_seeding(False) + dscfg.dlconfig.set('downloadconfig', 'seeding_mode', 'forever') + + self._logger.error("Setup seeder for %s", hexlify(tdef.get_infohash())) + + download = self.session.start_download_from_tdef(tdef, dscfg) + download.set_state_callback(self.__ds_active_callback, True) + + def publish(self, filename, size): + if self.my_channel or self.joined_community: + tdef = self._create_test_torrent(self.id_experiment + "_" + filename, size) + if self.my_channel: + self.my_channel._disp_create_torrent_from_torrentdef(tdef, int(time.time())) + elif self.joined_community: + self.joined_community._disp_create_torrent_from_torrentdef(tdef, int(time.time())) + + self.setup_seeder(filename, size) + else: + self._logger.debug("Can't publish yet, no channel or community joined") + reactor.callLater(10.0, self.publish, filename, size) + + def _create_test_torrent(self, filename='', size=0): + filepath = path.join(self.upload_dir_path, "%s.data" % filename) + + tdef = TorrentDef() + with open(filepath, 'wb') as fp: + fp.write("0" * int(size)) + + tdef.add_content(filepath) + + # doesn't matter for now + tdef.set_tracker("http://127.0.0.1:9197/announce") + tdef.finalize() + + tdef_file = path.join(self.upload_dir_path, "%s.torrent" % filename) + tdef.save(tdef_file) + + self._logger.error("Created %s torrent (%s) with size %s", filename, hexlify(tdef.get_infohash()), size) + + return tdef + + def _connect_peer(self, thandle): + for cd in self.joined_community.dispersy_yield_verified_candidates(): + ip = cd.lan_address[0] + for port in xrange(20000, 20000 + self.num_peers + 10): + if thandle: + thandle.connect_peer((ip, port), 0) + + def start_download(self, name): + name = name if name.startswith(self.id_experiment) else self.id_experiment + "_" + name + if name not in self.dl_lc.keys(): + self.dl_lc[name] = LoopingCall(self.start_download, name) + self.dl_lc[name].start(1.0, now=False) + + self.downloaded_torrent[name] = False + elif self.downloaded_torrent[name]: + self.dl_lc[name].stop() + self.dl_lc[name] = None + + tdef = TorrentDef.load_from_memory(self.session.get_collected_torrent( + unhexlify(self.downloaded_torrent[name]))) + + self._logger.error("%s.torrent %s (%s) found, prepare to download..", + name, self.downloaded_torrent[name], tdef) + + dscfg = DefaultDownloadStartupConfig.getInstance().copy() + dscfg.set_dest_dir(path.join(self.session.get_state_dir(), "download")) + dscfg.set_hops(0) + dscfg.set_safe_seeding(False) + dscfg.dlconfig.set('downloadconfig', 'seeding_mode', 'forever') + + self._logger.error("Start downloading for %s", hexlify(tdef.get_infohash())) + + download_impl = self.session.start_download_from_tdef(tdef, dscfg) + download_impl.set_state_callback(self.__ds_active_callback, True) + + self._connect_peer(download_impl.handle) + + if not self.joined_community: + self._logger.error("Pending download") + return + + #shameless copy from boostingsource + CHANTOR_DB = ['ChannelTorrents.channel_id', 'Torrent.torrent_id', 'infohash', 'Torrent.name', 'length', + 'category', 'status', 'num_seeders', 'num_leechers', 'ChannelTorrents.id', + 'ChannelTorrents.dispersy_id', 'ChannelTorrents.name', 'Torrent.name', + 'ChannelTorrents.description', 'ChannelTorrents.time_stamp', 'ChannelTorrents.inserted'] + infohash_bin = None + torrent_values = self.joined_community._channelcast_db.getTorrentsFromChannelId(self.joined_community.get_channel_id(), True, CHANTOR_DB) + if torrent_values: + log = "Channel id %s : " % self.joined_community.get_channel_id() + for t in torrent_values: + torrent_name = t[3] + log += "%s(%s) " % (t[3], hexlify(t[2])) + + if torrent_name[:-5] == name: + infohash_bin = t[2] + + self._logger.error(log) + + if infohash_bin: + + self._logger.error("Find %s with ihash %s", name, hexlify(infohash_bin)) + for candidate in list(self.joined_community.dispersy_yield_candidates()): + + def _success_download(ihash_str): + self.downloaded_torrent[name] = ihash_str + + self.session.lm.rtorrent_handler.download_torrent( + candidate, infohash_bin, user_callback=_success_download, priority=1) + + self.session.lm.rtorrent_handler.download_torrent( + None, infohash_bin, user_callback=_success_download, priority=1) + + self._logger.error("Pending download") + + def stop_download(self, dname): + dname = self.id_experiment + "_" + dname + for name in self.dl_lc.keys(): + if name == dname: + lc = self.dl_lc.pop(name) + if not self.downloaded_torrent[name]: + self._logger.error("Can't make it to download %s", name) + lc.stop() + else: + ihash = unhexlify(self.downloaded_torrent[dname]) + d_impl = self.session.get_download(ihash) + self._logger.error("Stopping Download %s", self.downloaded_torrent[dname]) + self.session.remove_download_by_id(ihash, True, True) + + def stop(self, retry=3): + + # stop stalled download + for name in self.dl_lc.keys(): + if not self.downloaded_torrent[name]: + self.dl_lc.pop(name).stop() + self._logger.error("Can't make it to download %s", name) + + downloads_impl = self.session.get_downloads() + if downloads_impl: + for d in downloads_impl: + self._logger.error("Clean download %s", hexlify(d.tdef.get_infohash())) + self.session.remove_download(d, True, True) + + reactor.callLater(10.0, self.stop, retry) + else: + super(ChannelDownloadClient, self).stop() + +if __name__ == '__main__': + ChannelDownloadClient.scenario_file = os.environ.get('SCENARIO_FILE', 'channel_download.scenario') + main(ChannelDownloadClient) diff --git a/experiments/tribler/channel_download.scenario b/experiments/tribler/channel_download.scenario new file mode 100644 index 000000000..5a469c542 --- /dev/null +++ b/experiments/tribler/channel_download.scenario @@ -0,0 +1,70 @@ +@0:0 set_master_member 3081a7301006072a8648ce3d020106052b81040027038192000403cbbfd2dfb67a7db66c88988df56f93fa6e7f982f9a6a0fa8898492c8b8cae23e10b159ace60b7047012082a5aa4c6e221d7e58107bb550436d57e046c11ab4f51f0ab18fa8f58d0346cc12d1cc2b61fc86fe5ed192309152e11e3f02489e30c7c971dd989e1ce5030ea0fb77d5220a92cceb567cbc94bc39ba246a42e215b55e9315b543ddeff0209e916f77c0d747 +@0:2 start_dispersy {1-200} +@0:40 start_session +@0:120 online +@0:120 reset_dispersy_statistics +@0:120 annotate start-experiment +# 1 : creator +# 2-51 : dedicated seeder +# 52-170 : peers +# 171 - 200 : flashcrowd all scenario +@0:121 join {2-50} +@0:125 create {1} +@0:127 publish file300mb_1 324288000 {1} +@0:147 publish file1gb_1 1524288000 {1} +@0:167 publish file5gb_1 5524288000 {1} +@0:167 publish file300mb_2 324288111 {1} +@0:187 publish file1gb_2 1524288111 {1} +@0:200 join {51-100} +@0:207 publish file5gb_2 5524288111 {1} +@0:300 join {101-150} +@0:400 join {151-200} +@0:400 join {151-201} +@0:600 setup_seeder file300mb_1 324288000 {2-4} +@0:600 setup_seeder file300mb_2 324288111 {5-12} +@0:800 setup_seeder file1gb_1 1524288000 {13-26} +@0:800 setup_seeder file1gb_2 1524288111 {27-34} +@15:0 setup_seeder file5gb_1 5524288000 {35-48} +@15:0 setup_seeder file5gb_2 5524288111 {49-51} +# ready on minute 16' +# wave 1 +@0:16:0 start_download file300mb_1 {52-56} +@0:16:10 start_download file300mb_2 {56-62} +@0:16:20 start_download file1gb_1 {63-74} +@0:16:30 start_download file1gb_2 {75-76} +@0:16:40 start_download file5gb_1 {77-81} +@0:16:50 start_download file5gb_2 {82-88} +# wave 2 +@2:16:0 start_download file300mb_1 {89-93} +@2:16:10 start_download file300mb_2 {94} +@2:16:20 start_download file1gb_1 {95-96} +@2:16:30 start_download file1gb_2 {97-98} +@2:16:40 start_download file5gb_1 {99} +@3:16:50 start_download file5gb_2 {100-109} +# wave 3 +@4:16:0 start_download file300mb_1 {110-114} +@4:16:10 start_download file300mb_2 {115-116} +@4:16:20 start_download file1gb_1 {117-118} +@4:16:30 start_download file1gb_2 {119-122} +@4:16:40 start_download file5gb_1 {123} +@4:16:50 start_download file5gb_2 {124} +# wave 4 +@6:16:0 start_download file300mb_1 {125-129} +@6:16:10 start_download file300mb_2 {130-139} +@6:16:20 start_download file1gb_1 {140-142} +# >>>>> None @6:16:30 start_download file1gb_2 {0000} +@6:16:40 start_download file5gb_1 {143-147} +@6:16:50 start_download file5gb_2 {148-149} +# wave 5 +@8:16:20 start_download file1gb_1 {150} +@8:16:30 start_download file1gb_2 {151-162} +@8:16:40 start_download file5gb_1 {163-170} +#flashcrowd +@10:16:0 start_download file300mb_1 {171-200} +@10:16:1 start_download file300mb_2 {171-200} +@10:16:2 start_download file1gb_1 {171-200} +@10:16:3 start_download file1gb_2 {171-200} +@10:16:4 start_download file5gb_1 {171-200} +@10:16:5 start_download file5gb_2 {171-200} +@11:58:0 reset_dispersy_statistics +@11:59:0 stop \ No newline at end of file diff --git a/experiments/tribler/channel_download_10s.scenario b/experiments/tribler/channel_download_10s.scenario new file mode 100644 index 000000000..b12f47a25 --- /dev/null +++ b/experiments/tribler/channel_download_10s.scenario @@ -0,0 +1,85 @@ +@0:0 set_master_member 3081a7301006072a8648ce3d020106052b81040027038192000403cbbfd2dfb67a7db66c88988df56f93fa6e7f982f9a6a0fa8898492c8b8cae23e10b159ace60b7047012082a5aa4c6e221d7e58107bb550436d57e046c11ab4f51f0ab18fa8f58d0346cc12d1cc2b61fc86fe5ed192309152e11e3f02489e30c7c971dd989e1ce5030ea0fb77d5220a92cceb567cbc94bc39ba246a42e215b55e9315b543ddeff0209e916f77c0d747 +@0:2 start_dispersy {1-84} +@0:20 start_session +@0:60 online +@0:60 reset_dispersy_statistics +@0:60 annotate start-experiment +# 1 : creator (seeder) +# 2-12 : dedicated seeder +# 13-81 : peers +# 81 - 84 : flashcrowd all scenario +@0:60 join {2-20} +@0:60 create {1} +@0:62 publish file1gb_1 1524288001 {1} +@0:64 publish file1gb_2 1524288002 {1} +@0:66 publish file1gb_3 1524288003 {1} +@0:70 publish file5gb_1 5524288007 {1} +@0:73 publish file5gb_2 5524288006 {1} +@0:76 publish file5gb_3 5524288005 {1} +@0:79 publish file5gb_4 5524288004 {1} +@0:81 publish file5gb_5 5524288003 {1} +@0:84 publish file5gb_6 5524288002 {1} +@0:87 publish file5gb_7 5524288001 {1} +@0:88 join {21-40} +@0:92 join {41-60} +@0:105 setup_seeder file1gb_2 1524288002 {2} +@0:106 setup_seeder file1gb_3 1524288003 {3,4} +@0:105 setup_seeder file5gb_3 5524288005 {5} +@0:105 setup_seeder file5gb_4 5524288004 {6} +@0:105 setup_seeder file5gb_5 5524288003 {7,8} +@0:105 setup_seeder file5gb_6 5524288002 {9,10} +@0:105 setup_seeder file5gb_7 5524288001 {11,12} +@0:110 join {61-80} +@0:115 join {81-84} +# wave 1 +@2:6 start_download file1gb_1 {13-15} +@2:12 start_download file1gb_2 {16-18} +@2:18 start_download file1gb_3 {19-21} +@2:24 start_download file5gb_1 {22} +@2:30 start_download file5gb_2 {23-25} +@2:36 start_download file5gb_3 {26} +@2:42 start_download file5gb_4 {27-29} +@2:48 start_download file5gb_5 {30} +@2:54 start_download file5gb_6 {33-38} +@3:1 start_download file5gb_7 {39,40} +# wave 2 +@1:2:6 start_download file1gb_1 {41} +@1:2:12 start_download file1gb_2 {42} +@1:2:18 start_download file1gb_3 {42} +@1:2:24 start_download file5gb_1 {44} +@1:2:30 start_download file5gb_2 {45} +@1:2:36 start_download file5gb_3 {46} +@1:2:42 start_download file5gb_4 {47} +@1:2:48 start_download file5gb_5 {48} +@1:2:54 start_download file5gb_6 {49} +@1:3:1 start_download file5gb_7 {50} +# wave 3 +@2:2:6 start_download file1gb_1 {51,52} +@2:2:12 start_download file1gb_2 {53,54} +@2:2:18 start_download file1gb_3 {55,56} +@2:2:24 start_download file5gb_1 {57} +@2:2:30 start_download file5gb_2 {58-60} +@2:2:36 start_download file5gb_3 {61} +@2:2:42 start_download file5gb_4 {62,63} +@2:3:1 start_download file5gb_7 {64} +#wave 4 +@3:2:6 start_download file1gb_1 {65,66} +@3:2:12 start_download file1gb_2 {67,68} +@3:2:18 start_download file1gb_3 {69,70} +@3:2:24 start_download file5gb_1 {71-75} +@3:2:30 start_download file5gb_2 {76} +@3:2:36 start_download file5gb_3 {77,78} +@3:2:42 start_download file5gb_4 {79-81} +#flashcrowd +@4:2:6 start_download file1gb_1 {81-84} +@4:2:12 start_download file1gb_2 {81-84} +@4:2:18 start_download file1gb_3 {81-84} +@4:2:24 start_download file5gb_1 {81-84} +@4:2:30 start_download file5gb_2 {81-84} +@4:2:36 start_download file5gb_3 {81-84} +@4:2:42 start_download file5gb_4 {81-84} +@4:2:48 start_download file5gb_5 {81-84} +@4:2:54 start_download file5gb_6 {81-84} +@4:3:1 start_download file5gb_7 {81-84} +@7:58:0 reset_dispersy_statistics +@7:59:0 stop \ No newline at end of file diff --git a/experiments/tribler/channel_download_1h.scenario b/experiments/tribler/channel_download_1h.scenario new file mode 100644 index 000000000..e4e5c5915 --- /dev/null +++ b/experiments/tribler/channel_download_1h.scenario @@ -0,0 +1,21 @@ +@0:0 set_master_member 3081a7301006072a8648ce3d020106052b81040027038192000403cbbfd2dfb67a7db66c88988df56f93fa6e7f982f9a6a0fa8898492c8b8cae23e10b159ace60b7047012082a5aa4c6e221d7e58107bb550436d57e046c11ab4f51f0ab18fa8f58d0346cc12d1cc2b61fc86fe5ed192309152e11e3f02489e30c7c971dd989e1ce5030ea0fb77d5220a92cceb567cbc94bc39ba246a42e215b55e9315b543ddeff0209e916f77c0d747 +@0:2 start_dispersy {1-5} +@0:10 start_session +@0:12 online +@0:12 reset_dispersy_statistics +@0:12 annotate start-experiment +# 1 : creator +# 2-5 : peers +@0:20 create {1} +@0:25 publish file300mb_1 324288077 {1} +@0:30 publish file1gb_1 1524288077 {1} +@0:60 join {2-5} +@0:60 setup_seeder file1gb_1 1524288077 {2} +# ready on minute 1' +@1:1 start_download file300mb_1 {3} +@5:1 start_download file300mb_1 {4} +@20:1 start_download file1gb_1 {5} +@35:1 start_download file1gb_1 {4} +@40:1 start_download file300mb_1 {5} +@58:0 reset_dispersy_statistics +@59:0 stop \ No newline at end of file diff --git a/experiments/tribler/channel_download_6h.scenario b/experiments/tribler/channel_download_6h.scenario new file mode 100644 index 000000000..15e3b2423 --- /dev/null +++ b/experiments/tribler/channel_download_6h.scenario @@ -0,0 +1,43 @@ +@0:0 set_master_member 3081a7301006072a8648ce3d020106052b81040027038192000403cbbfd2dfb67a7db66c88988df56f93fa6e7f982f9a6a0fa8898492c8b8cae23e10b159ace60b7047012082a5aa4c6e221d7e58107bb550436d57e046c11ab4f51f0ab18fa8f58d0346cc12d1cc2b61fc86fe5ed192309152e11e3f02489e30c7c971dd989e1ce5030ea0fb77d5220a92cceb567cbc94bc39ba246a42e215b55e9315b543ddeff0209e916f77c0d747 +@0:2 start_dispersy {1-100} +@0:20 start_session +@0:60 online +@0:60 reset_dispersy_statistics +@0:60 annotate start-experiment +# 1 : creator (seeder) +# 2-20 : dedicated seeder +# 21-70 : peers +# 71 - 100 : flashcrowd all scenario +@0:60 join {2-20} +@0:60 create {1} +@0:62 publish file300mb_1 324288002 {1} +@0:67 publish file5gb_1 5524288002 {1} +@0:72 publish file1gb_1 1524288002 {1} +@0:77 publish file1gb_2 1524288222 {1} +@0:80 join {21-40} +@0:99 join {41-60} +@0:105 setup_seeder file300mb_1 324288002 {2-5} +@0:110 setup_seeder file1gb_1 1524288002 {6-10} +@0:115 setup_seeder file1gb_2 1524288222 {11-12} +@0:120 setup_seeder file5gb_1 5524288002 {13-20} +@0:120 join {61-80} +@0:125 join {81-100} +# wave 1 +@2:10 start_download file300mb_1 {21-30} +@2:20 start_download file1gb_1 {31-35} +@2:30 start_download file1gb_2 {36-45} +@2:40 start_download file5gb_1 {46-48} +# wave 2 +@3:2:0 start_download file300mb_1 {49-52} +@3:2:20 start_download file1gb_1 {53} +@3:2:30 start_download file1gb_2 {54-58} +@3:2:40 start_download file5gb_1 {59-66} +# wave 3 +@4:0:0 start_download file1gb_1 {67-70} +#flashcrowd +@4:2:10 start_download file300mb_1 {71-100} +@4:2:20 start_download file1gb_1 {71-100} +@4:2:30 start_download file1gb_2 {71-100} +@4:2:40 start_download file5gb_1 {71-100} +@5:58:0 reset_dispersy_statistics +@5:59:0 stop \ No newline at end of file diff --git a/experiments/tribler/channel_download_das.conf b/experiments/tribler/channel_download_das.conf new file mode 100644 index 000000000..8098810ba --- /dev/null +++ b/experiments/tribler/channel_download_das.conf @@ -0,0 +1,30 @@ + +experiment_name = "Channel_Download_DAS" +experiment_server_cmd = 'experiment_server.py' + +#remote_workspace_dir = '/var/scratch/aputra/chn_dl' +output_dir = '/var/scratch/pouwelse/chn_dl' + +local_setup_cmd = 'das4_setup.sh' +local_instance_cmd = 'das4_reserve_and_run.sh' + +# The following options are used by das4_reserve_and_run.sh and das4_node_run_job.sh: +das4_node_amount = 10 +das4_node_timeout = 43200 +das4_instances_to_run = 201 +das4_node_command = "channel_download.py" + +sync_port = __unique_port__ +sync_experiment_start_delay = 1 + +tracker_cmd = 'run_tracker.sh' +tracker_port = __unique_port__ + +use_local_venv = True +with_systemtap = false + +messages_to_plot = 'torrent' +PYTHONOPTIMIZE = True + +SCENARIO_FILE = channel_download.scenario +post_process_cmd = 'gumby/scripts/channel_download/channel_download.sh' diff --git a/experiments/tribler/logger.conf b/experiments/tribler/logger.conf new file mode 100644 index 000000000..d45ea9f61 --- /dev/null +++ b/experiments/tribler/logger.conf @@ -0,0 +1,46 @@ +[loggers] +keys=root,TriblerDispersyExperimentScriptClient,ChannelDownloadClient + +[handlers] +keys=debugging,default + +[formatters] +keys=debugging,default + +[logger_root] +level=WARNING +handlers=default + +[logger_TriblerDispersyExperimentScriptClient] +level=INFO +handlers=default +qualname=TriblerDispersyExperimentScriptClient +propagate=0 + +[logger_ChannelDownloadClient] +level=DEBUG +handlers=default +qualname=ChannelDownloadClient +propagate=0 + +[handler_default] +class=StreamHandler +level=NOTSET +formatter=debugging +args=(sys.stderr,) + +[formatter_debugging] +format=%(asctime)s.%(msecs).03dZ-%(levelname)s-%(message)s +datefmt=%Y%m%dT%H%M%S +class=logging.Formatter + +[handler_debugging] +class=StreamHandler +level=NOTSET +formatter=debugging +args=(sys.stderr,) + +[formatter_default] +format=%(asctime)s %(levelname)s %(message)s +class=logging.Formatter +level=ERROR \ No newline at end of file diff --git a/experiments/tribler_idle_run/1hour.conf b/experiments/tribler_idle_run/1hour.conf deleted file mode 100644 index 29fef2506..000000000 --- a/experiments/tribler_idle_run/1hour.conf +++ /dev/null @@ -1,22 +0,0 @@ -experiment_name = "TriblerIdleRun1hr" -local_setup_cmd = "tribler_experiment_setup.sh" - -PYTHONOPTIMIZE="yup" -process_guard_timeout = 3900 - -local_instance_cmd = 'process_guard.py -t $PROCESS_GUARD_TIMEOUT -i 5 -c "wrap_in_vnc.sh gumby/experiments/tribler_idle_run/tribler_idle_run.py" -m $OUTPUT_DIR -o $OUTPUT_DIR' - -post_process_cmd = graph_process_guard_data.sh - -tracker_cmd = "" -# Enable use of virtualenv when running on the local machine (will be always used on the DAS4) -use_local_venv = False - -#Custom variable to control the experiment execution time in seconds -TRIBLER_EXECUTION_TIME = 3600 - -# memory profiler stuff -PROFILE_MEMORY = True -PROFILE_MEMORY_INTERVAL = 300 -MANHOLE_ENABLE = True -MANHOLE_PORT = 2323 diff --git a/experiments/tribler_idle_run/1hour_stap.conf b/experiments/tribler_idle_run/1hour_stap.conf deleted file mode 100644 index 6ac6701bb..000000000 --- a/experiments/tribler_idle_run/1hour_stap.conf +++ /dev/null @@ -1,24 +0,0 @@ -experiment_name = "TriblerIdleRun1HrStap" -local_setup_cmd = "tribler_experiment_setup.sh" - -PYTHONOPTIMIZE="yup" -process_guard_timeout = 3900 - -local_instance_cmd = 'process_guard.py -t $PROCESS_GUARD_TIMEOUT -i 5 -c "wrap_in_vnc.sh run_stap_probe.sh gumby/experiments/tribler_idle_run/tribler_idle_run.py $OUTPUT_DIR/stap.csv" -m $OUTPUT_DIR -o $OUTPUT_DIR' - -post_process_cmd = 'generate_stap_process_guard_graphs.sh' - -tracker_cmd = "" -# Enable use of virtualenv when running on the local machine (will be always used on the DAS4) -use_local_venv = True -# Enable use of systemtap (Will be never used on DAS4) -use_local_systemtap = True - -#Custom variable to control the experiment execution time in seconds -TRIBLER_EXECUTION_TIME = 3600 # run for 60 minutes - -# memory profiler stuff -PROFILE_MEMORY = True -PROFILE_MEMORY_INTERVAL = 300 -MANHOLE_ENABLE = True -MANHOLE_PORT = 2323 diff --git a/experiments/tribler_idle_run/1week.conf b/experiments/tribler_idle_run/1week.conf deleted file mode 100644 index 2ef26a73e..000000000 --- a/experiments/tribler_idle_run/1week.conf +++ /dev/null @@ -1,22 +0,0 @@ -experiment_name = "TriblerIdleRun1wk" -local_setup_cmd = "tribler_experiment_setup.sh" - -PYTHONOPTIMIZE="yup" -process_guard_timeout = 605100 - -local_instance_cmd = 'process_guard.py -t $PROCESS_GUARD_TIMEOUT -i 5 -c "wrap_in_vnc.sh gumby/experiments/tribler_idle_run/tribler_idle_run.py" -m $OUTPUT_DIR -o $OUTPUT_DIR' - -post_process_cmd = graph_process_guard_data.sh - -tracker_cmd = "" -# Enable use of virtualenv when running on the local machine (will be always used on the DAS4) -use_local_venv = False - -#Custom variable to control the experiment execution time in seconds -TRIBLER_EXECUTION_TIME = 604800 # run for 1 week - -# memory profiler stuff -PROFILE_MEMORY = True -PROFILE_MEMORY_INTERVAL = 86400 -MANHOLE_ENABLE = True -MANHOLE_PORT = 2325 diff --git a/experiments/tribler_idle_run/1day.conf b/experiments/tribler_idle_run/idle_run.conf similarity index 57% rename from experiments/tribler_idle_run/1day.conf rename to experiments/tribler_idle_run/idle_run.conf index 278d71bf8..dc74272aa 100644 --- a/experiments/tribler_idle_run/1day.conf +++ b/experiments/tribler_idle_run/idle_run.conf @@ -1,20 +1,19 @@ -experiment_name = "TriblerIdleRun24hr" -local_setup_cmd = "tribler_experiment_setup.sh" +experiment_name = "tribler_idle_run" -PYTHONOPTIMIZE="yup" +process_guard_timeout = 100000 -process_guard_timeout = 86700 - -local_instance_cmd = 'process_guard.py -t $PROCESS_GUARD_TIMEOUT -i 5 -c "wrap_in_vnc.sh gumby/experiments/tribler_idle_run/tribler_idle_run.py" -m $OUTPUT_DIR -o $OUTPUT_DIR' +local_instance_cmd = 'process_guard.py -t $PROCESS_GUARD_TIMEOUT -i 5 -c "gumby/experiments/tribler_idle_run/tribler_idle_run.py" -m $OUTPUT_DIR -o $OUTPUT_DIR --network' post_process_cmd = graph_process_guard_data.sh tracker_cmd = "" # Enable use of virtualenv when running on the local machine (will be always used on the DAS4) -use_local_venv = False +use_local_venv = FALSE #Custom variable to control the experiment execution time in seconds -TRIBLER_EXECUTION_TIME = 86400 +TRIBLER_EXECUTION_TIME = 120 + +AUTO_JOIN_CHANNELS = FALSE # memory profiler stuff PROFILE_MEMORY = True diff --git a/experiments/tribler_idle_run/tribler_idle_run.py b/experiments/tribler_idle_run/tribler_idle_run.py index 9d5720320..a5dadc0c6 100755 --- a/experiments/tribler_idle_run/tribler_idle_run.py +++ b/experiments/tribler_idle_run/tribler_idle_run.py @@ -44,31 +44,39 @@ from gumby.instrumentation import init_instrumentation +BASE_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..', '..')) + sys.path.append(os.path.abspath('./tribler')) -sys.path.append(os.path.abspath('./tribler/twisted/twisted/plugins')) +sys.path.append(os.path.abspath('./tribler/twisted/plugins')) from tribler_plugin import TriblerServiceMaker + class IdleTribleRunner(): + def __init__(self): init_instrumentation() self.service = None - def start(self): self.service = TriblerServiceMaker() if "TRIBLER_EXECUTION_TIME" in os.environ: run_time = int(os.environ["TRIBLER_EXECUTION_TIME"]) else: - run_time = 60*10 # Run for 10 minutes by default + run_time = 60*10 # Run for 10 minutes by default + + auto_join_channels = os.environ.get("AUTO_JOIN_CHANNELS", "FALSE").upper() == "TRUE" reactor.callLater(run_time, self.stop) + self.service.start_tribler({'restapi': 0, 'dispersy': 21000, + 'statedir': os.path.abspath(os.path.join(BASE_DIR, "output", "tribler-state")), + 'libtorrent': 21005, 'auto-join-channel': True if auto_join_channels else None}) def stop(self): # TODO(Laurens): Current the plugin does not offer a function to shutdown it nicely # so once this is added, make sure it is not violently killed. - self.service.shutdown_process() + self.service.shutdown_process("Stopping Tribler idle run", code=0) if __name__ == "__main__": runner = IdleTribleRunner() diff --git a/gumby/experiments/TriblerDispersyClient.py b/gumby/experiments/TriblerDispersyClient.py index 5fad4e43d..356c4320c 100644 --- a/gumby/experiments/TriblerDispersyClient.py +++ b/gumby/experiments/TriblerDispersyClient.py @@ -40,21 +40,27 @@ def stop_dispersy(self): raise NotImplementedError("Dispersy is stopped using the tribler session in stop") def start_session(self): + from twisted.internet import threads + + def _do_ready(_): + self.annotate("Tribler Session started") + + if self.session.get_dispersy(): + self._dispersy = self.session.lm.dispersy + + return self.session + logging.error("Starting Tribler Session") self.session_config = self.setup_session_config() self.session = Session(scfg=self.session_config) - def on_tribler_started(_): - logging.error("Tribler Session started") - self.annotate("Tribler Session started") - self._dispersy = self.session.lm.dispersy - - def _do_start(): - return self.session.start().addCallback(on_tribler_started) + self.session_deferred = self.session.start() + self.session_deferred.addCallback(_do_ready) - deferToThread(_do_start).addCallback(self.__setup_dispersy_member) + if self.session_config.get_dispersy(): + self.session_deferred.addCallback(self.__start_dispersy) - def __setup_dispersy_member(self, _): + def __start_dispersy(self, session): self.original_on_incoming_packets = self._dispersy.on_incoming_packets if self.master_private_key: self._master_member = self._dispersy.get_member(private_key=self.master_private_key) @@ -87,7 +93,7 @@ def setup_session_config(self): config.set_dht_torrent_collecting(False) config.set_enable_torrent_search(False) config.set_enable_channel_search(False) - config.set_videoplayer(False) + config.set_http_api_enabled(False) config.set_listen_port(20000 + self.scenario_runner._peernumber) if self.dispersy_port is None: diff --git a/gumby/experiments/dispersyclient.py b/gumby/experiments/dispersyclient.py index 4680bb967..f3614389f 100755 --- a/gumby/experiments/dispersyclient.py +++ b/gumby/experiments/dispersyclient.py @@ -84,6 +84,8 @@ def __init__(self, vars): self.generateMyMember() self.vars['private_keypair'] = base64.encodestring(self.my_member_private_key) + self.endpoint = None + def onVarsSend(self): scenario_file_path = path.join(environ['EXPERIMENT_DIR'], self.scenario_file) self.scenario_runner = ScenarioRunner(scenario_file_path) @@ -214,7 +216,9 @@ def start_dispersy(self, autoload_discovery=True): from dispersy.endpoint import StandaloneEndpoint from dispersy.util import unhandled_error_observer - self._dispersy = Dispersy(StandaloneEndpoint(int(self.my_id) + 12000, '0.0.0.0'), u'.', self._database_file, self._crypto) + self.endpoint = StandaloneEndpoint(int(self.my_id) + 12000, '0.0.0.0') + + self._dispersy = Dispersy(self.endpoint, u'.', self._database_file, self._crypto) self._dispersy.statistics.enable_debug_statistics(True) self.original_on_incoming_packets = self._dispersy.on_incoming_packets diff --git a/scripts/channel_download/channel_dl_parse.py b/scripts/channel_download/channel_dl_parse.py new file mode 100644 index 000000000..c6e96f493 --- /dev/null +++ b/scripts/channel_download/channel_dl_parse.py @@ -0,0 +1,90 @@ +#!/usr/bin/env python + +import argparse +from collections import defaultdict + + +class Activity: + def __init__(self): + self.ts = "20161010T101010.100Z" + self.infohash = "0" * 40 + self.id = -1 + + self.ulrate = 0 + self.dlrate = 0 + self.ultotal = 0 + self.dltotal = 0 + self.progress = 0 + self.avail = 0.0 + self.dsavail = 0.0 + + self.ip = "0.0.0.0:0" + + +def main(): + argparser = argparse.ArgumentParser() + argparser.add_argument("log", help="log used as input") + args = argparser.parse_args() + + infohash_set = set([]) + infohash_short_long = {} + infohash_name = {} + tlist = defaultdict(lambda: Activity()) + + with open(args.log) as fin: + for line in fin: + line = line.splitlines()[0] + try: + ts, dummy_level, message = line.split("-", 2) + except ValueError: + continue + + split_msg = message.split(" ") + + if (len(split_msg) == 5 and split_msg[0] == 'Find') or split_msg[0] == 'Setup': + infohash_set.add(split_msg[-1]) + if len(split_msg) == 5: + infohash_name[split_msg[-1]] = split_msg[1] + + if len(split_msg) == 12: + try: + ihash_short = split_msg[1].split("=")[1][:-1] + except IndexError: + # "file not found" line + continue + + if ihash_short not in infohash_short_long.keys(): + for n in infohash_set: + if n.startswith(ihash_short): + infohash_short_long[ihash_short] = n + break + + a = Activity() + a.ip = split_msg[0] + a.ts = ts + a.dlrate = int(split_msg[2].split("=")[1][:-1]) + a.ulrate = int(split_msg[3].split("=")[1][:-1]) + + a.dltotal = int(split_msg[8].split("=")[1]) + a.ultotal = int(split_msg[9].split("=")[1]) + + a.progress = float(split_msg[4].split("=")[1][:-1]) + a.infohash = ihash_short + + a.avail = float(split_msg[10].split("=")[1]) + a.dsavail = float(split_msg[11].split("=")[1]) + + tlist[line] = a + + print "ts\tihash\tactor\tul_speed\tdl_speed\tul_tot\tdl_tot\tprogress\tavail\tdsavail" + for _, a in tlist.items(): + print "%s\t%s\t%s\t%d\t%d\t%d\t%d\t%f\t%f\t%f\t" %(a.ts, infohash_short_long[a.infohash], a.ip, a.ulrate, + a.dlrate, a.ultotal, a.dltotal, a.progress, + a.avail, a.dsavail) + + with open('ihashname.txt', 'a') as tbl: + for i in infohash_name.keys(): + tbl.write("%s\t%s\n" %(i, infohash_name[i])) + +if __name__ == "__main__": + main() diff --git a/scripts/channel_download/channel_dl_proc.R b/scripts/channel_download/channel_dl_proc.R new file mode 100755 index 000000000..7da7a82db --- /dev/null +++ b/scripts/channel_download/channel_dl_proc.R @@ -0,0 +1,153 @@ +#!/usr/bin/env Rscript + +library(argparse) +library(plyr) +library(dplyr) +library(ggplot2) +library(lubridate) +library(reshape2) +library(scales) +library(zoo) + +# http://stackoverflow.com/a/23165334 +floor_time <- function(x, k = 1, unit = c("second", "minute", "hour", "day", + "week", "month", "year")) { + require(lubridate) + + nmax <- NULL + + switch(unit, second = {nmax <- 60}, + minute = {nmax <- 60}, + hour = {nmax <- 24}) + + cuts <- seq(from = 0, to = nmax - 1, by = k) + new <- switch(unit, + second = update(x, seconds = cuts[findInterval(second(x), cuts)]), + minute = update(x, minutes = cuts[findInterval(minute(x), cuts)], + seconds = 0), + hour = update(x, hours = cuts[findInterval(hour(x), cuts)], + minutes = 0, seconds = 0), + day = update(x, hours = 0, minutes = 0, seconds = 0), + week = update(x, wdays = 1, hours = 0, minutes = 0, seconds = 0), + month = update(x, mdays = 1, hours = 0, minutes = 0, seconds = 0), + year = update(x, ydays = 1, hours = 0, minutes = 0, seconds = 0)) + + new +} + +RobustMax <- function(x) {if (length(x)>0) max(x) else 0} + +IhashToFile <- function(df, it) { + tmp <- merge(df, it, by.x = "ihash", by.y="V1", all.x=TRUE) + colnames(tmp)[ncol(tmp)] <- "filename" + tmp$ihash <- NULL + tmp +} + +parser <- ArgumentParser(description="Plot priority download") +parser$add_argument("file", help="log used as input") +parser$add_argument("output", help="name used as output", nargs="?", + default="channel_dl_figure.pdf") +args <- parser$parse_args() + + +tt <- read.table(args$file, header=T) +tt$ts <- floor_time(as.POSIXct(tt$ts, "%Y%m%dT%H%M%OSZ", tz='UTC'), 2, "minute") + +# download speed graph : MEAN +dlspeed.mean <- recast(tt, ts ~ ihash, measure.var=c("dl_speed"), fun.aggregate=mean) +dlspeed.mean.melt_na <- melt(dlspeed.mean, id.var="ts", variable.name="ihash", value.name="dl_speed") +dlspeed.mean <- zoo(dlspeed.mean, order.by=dlspeed.mean$ts) +dlspeed.mean <- na.locf(dlspeed.mean) +tmp <- as.data.frame(dlspeed.mean) +tmp$ts <- time(dlspeed.mean) +dlspeed.mean <- tmp +dlmean <- melt(dlspeed.mean, id.var="ts", variable.name="ihash", value.name="dl_speed") +dlmean <- transform(dlmean, dl_speed = as.numeric(dl_speed)) + +# download speed graph : MAX +dlspeed.max <- recast(tt, ts ~ ihash, measure.var=c("dl_speed"), fun.aggregate=RobustMax) +dlspeed.max.melt_na <- melt(dlspeed.max, id.var="ts", variable.name="ihash", value.name="dl_speed") +dlspeed.max <- zoo(dlspeed.max, order.by=dlspeed.max$ts) +dlspeed.max <- na.locf(dlspeed.max) +tmp <- as.data.frame(dlspeed.max) +tmp$ts <- time(dlspeed.max) +dlspeed.max <- tmp +dlmax <- melt(dlspeed.max, id.var="ts", variable.name="ihash", value.name="dl_speed") +dlmax$dl_speed <- as.numeric(dlmax$dl_speed) +dlmax <- transform(dlmax, dl_speed = as.numeric(dl_speed)) + +# number peer where upload > dl +tt$pos <- as.numeric(tt$ul_tot > tt$dl_tot) +posit.max <- recast(tt, ts + actor ~ ihash, measure.var=c("pos"), fun.aggregate=RobustMax) +posit.max.clean <- ddply(posit.max, ("ts"), numcolwise(sum)) +posit.max.clean <- zoo(posit.max.clean, order.by=posit.max.clean$ts) +tmp <- as.data.frame(posit.max.clean) +tmp$ts <- time(posit.max.clean) +posit.max.clean <- tmp +pmc <- melt(posit.max.clean, id.var="ts", variable.name="ihash", value.name="amount") + +# gain +t.seeder <- filter(tt, (progress == 1.0 & dl_tot == 0)) +t.seeder <- t.seeder[! (duplicated(t.seeder$ihash) & duplicated(t.seeder$actor)), c("ihash", "actor"), drop = FALSE] +t.download <- tt[! (t.seeder$ihash %in% tt$ihash & tt$actor %in% t.seeder$actor),] +t.download$gain <- t.download$ul_tot - t.download$dl_tot +gain <- recast(t.download, ts + actor ~ ihash, measure.var=c("gain"), fun.aggregate=mean) +gain_na <- melt(gain, id.var=c("ts", "actor"), variable.name = "ihash", value.name = "avg_gain") + +# monkey patch +gain[is.na(gain)] <- 0 + +gain <- zoo(gain, order.by=gain$ts) +gain <- na.locf(gain) +tmp <- as.data.frame(gain) +tmp$ts <- time(gain) +gain <- tmp +gain.g <- melt(gain, id.var="ts", variable.name="ihash", value.name="gainvalue") +gain.g <- filter(gain.g, ! ihash == 'actor') +gain.g$gainvalue <- as.numeric(gain.g$gainvalue) +gain.g <- ddply(gain.g, c("ts", "ihash"), numcolwise(mean)) + +# piece availability +dlist = list() +for (i in levels(unique(tt$ihash))){ + tmp <- tt %>% filter(ihash == i, avail > 0, dl_tot == 0) + dlist[[i]] <- tmp +} +tmp <- bind_rows(dlist) +t.avail <- recast(tmp, ts ~ ihash, measure.var=c("avail"), fun.aggregate=mean) +#t.avail.melt_na <- melt(t.avail, id.var="ts", variable.name="ihash", value.name="avail") +t.avail <- zoo(t.avail, order.by = t.avail$ts) +t.avail <- na.locf(t.avail) +tmp <- as.data.frame(t.avail) +tmp$ts <- time(t.avail) +t.avail <- tmp + +p.avail <- melt(t.avail, id.var="ts", variable.name="ihash", value.name="avail") +p.avail$avail <- as.numeric(p.avail$avail) + + +# node amount +ttx <- filter(tt, nodem != -1) +t.popularity <- recast(ttx, ts ~ ihash, measure.var=c("nodem"), fun.aggregate=mean) +t.popularity <- zoo(t.popularity, order.by = t.popularity$ts) +t.popularity <- na.locf(t.popularity) +tmp <- as.data.frame(t.popularity) +tmp$ts <- time(t.popularity) +t.popularity <- tmp + +p.popularity <- melt(t.popularity, id.var="ts", variable.name="ihash", value.name="dsavail") +p.popularity$dsavail <- as.numeric(p.popularity$dsavail) + +# replace infohash info with filename +it <- read.table('ihashname.txt') + +# plots +pdf(file=args$output, width=18, height=8) +ggplot(IhashToFile(dlmean, it)) + geom_line(aes(x=ts, y=dl_speed, color=filename, alpha=0.5, size=2)) + theme(legend.position="bottom") + ggtitle("Mean") + scale_x_datetime(breaks=seq(from = min(dlmean$ts), to = max(dlmean$ts), by = 3600)) + scale_y_continuous(limits=c(0,150)) +ggplot(IhashToFile(dlmax, it)) + geom_line(aes(x=ts, y=dl_speed, color=filename, alpha=0.5, size=2)) + theme(legend.position="bottom") + ggtitle("Max") + scale_x_datetime(breaks=seq(from = min(dlmax$ts), to = max(dlmax$ts), by = 3600)) + scale_y_continuous(limits=c(0,300)) +ggplot(IhashToFile(pmc, it), aes(x=ts, y=amount, color=filename, group=filename, alpha=0.5, size=2)) + geom_line() + theme(legend.position="bottom") + ggtitle("Max peer where upload > downloaded") + scale_x_datetime(breaks=seq(from = min(pmc$ts), to = max(pmc$ts), by = 3600)) +ggplot(IhashToFile(gain.g, it), aes(x=ts, y=gainvalue, color=filename, alpha=0.5, size=2)) + geom_line() + theme(legend.position="bottom") + ggtitle("Average upload gain (For peers)") + scale_x_datetime(breaks=seq(from = min(gain.g$ts), to = max(gain.g$ts), by = 3600)) +ggplot(IhashToFile(p.avail, it)) + geom_line(aes(x=ts, y=avail, color=filename, alpha=0.5, size=2)) + theme(legend.position="bottom") + ggtitle("Availability") + scale_x_datetime(breaks=seq(from = min(p.avail$ts), to = max(p.avail$ts), by = 3600)) + scale_y_continuous(limits=c(0,1)) +ggplot(IhashToFile(p.popularity, it)) + geom_line(aes(x=ts, y=dsavail, color=filename, alpha=0.5, size=2)) + theme(legend.position="bottom") + ggtitle("Popularity-Availability") + scale_x_datetime(breaks=seq(from = min(p.popularity$ts), to = max(p.popularity$ts), by = 3600)) + scale_y_continuous(limits=c(0,8)) +dev.off() diff --git a/scripts/channel_download/channel_download.sh b/scripts/channel_download/channel_download.sh new file mode 100755 index 000000000..191693515 --- /dev/null +++ b/scripts/channel_download/channel_download.sh @@ -0,0 +1,57 @@ +#!/usr/bin/env bash + +#if [ -z "$OUTPUT_DIR" ]; then +# echo 'ERROR: $OUTPUT_DIR variable not found, are you running this script from within gumby?' +# exit 1 +#fi +# +#if [ -z "$EXPERIMENT_DIR" ]; then +# echo 'ERROR: EXPERIMENT_DIR variable not found, are you running this script from within gumby?' +# exit 1 +#fi + +#cd $OUTPUT_DIR + +echo "Running post channel downloading..." +SCRIPT_DIR="$(dirname "$(readlink -f "$0")")" + +MERGE_TSV_FILE="all.tsv" + +# find .log file and put them to output/data +mkdir -p data + +for log in $(grep -H -r "Dispersy configured" | grep -v ^Binary | cut -d: -f1) +do + thedir=`dirname $log` + log_=${log##*/} + # the name will be logname_localhost_nodenumber.log + fname="${log_%.*}_`echo ${thedir} | tr /. _`.log" + cp $log "data/$fname" + + echo "copying $fname to data/" + + tname="${fname%.*}.tsv" + + python $SCRIPT_DIR/channel_dl_parse.py data/$fname > data/$tname +done + +tsvlist=$(find . -regex ".*\.tsv") +echo -e "ts\tihash\tactor\tul_speed\tdl_speed\tul_tot\tdl_tot\tprogress\tavail\tdsavail" > $MERGE_TSV_FILE.raw + +for tsvs in $tsvlist +do + if [ `awk -F' ' '{print NF; exit}' $tsvs` -eq 10 ]; then + tail -n +2 $tsvs >> $MERGE_TSV_FILE.raw + fi +done +(head -n 1 $MERGE_TSV_FILE.raw && tail -n +2 $MERGE_TSV_FILE.raw | sort) > $MERGE_TSV_FILE.sorted + +sort -k2 ihashname.txt | uniq > ihashname_unique.txt +mv ihashname_unique.txt ihashname.txt + +$SCRIPT_DIR/channel_dl_proc.R all.tsv.sorted + + +# Create RData files for plotting from log files and crate image +convert -resize 25% -density 300 -depth 8 -quality 85 channel_dl_figure.pdf channel_dl_figure.png +rm -rf localhost/ tracker/ err.txt 2>&1 diff --git a/scripts/graph_process_guard_data.sh b/scripts/graph_process_guard_data.sh index 8d1841bbd..667a02d0d 100755 --- a/scripts/graph_process_guard_data.sh +++ b/scripts/graph_process_guard_data.sh @@ -62,6 +62,7 @@ memtimes.r writebytes.r readbytes.r network.r +file_descriptors.r " export XMIN export XMAX diff --git a/scripts/process_guard.py b/scripts/process_guard.py index a7f50880d..28b1234af 100755 --- a/scripts/process_guard.py +++ b/scripts/process_guard.py @@ -12,6 +12,7 @@ from subprocess import Popen from time import sleep, time +from psutil import Process, AccessDenied OK_EXIT_CODE = 0 TIMEOUT_EXIT_CODE = 3 @@ -223,9 +224,13 @@ def __init__(self, commands, timeout, interval, output_dir=None, monitor_dir=Non self._rm = ResourceMonitor(output_dir, commands) self.monitor_file = None self.network_monitor_file = None + self.fd_file = None + self.psutil_process = Process() if monitor_dir: self.monitor_file = open(monitor_dir + "/resource_usage.log", "w", (1024 ** 2) * 10) # Set the file's buffering to 10MB + self.fd_file = open(monitor_dir + "/fd_usage.log", "w", (1024 ** 2) * 10) # Set the file's buffering to 10MB + self.fd_file.write("time pid num_fds\n") # We read the jiffie -> second conversion rate from the os, by dividing the utime # and stime values by this conversion rate we will get the actual cpu seconds spend during this second. try: @@ -252,6 +257,8 @@ def stop(self): self.stopping = True if self.monitor_file: self.monitor_file.close() + if self.fd_file: + self.fd_file.close() # Check if any process exited with an error code before killing the remaining ones failed = self._rm.get_failed_commands() @@ -305,6 +312,23 @@ def monitoring_loop(self): for line in self._rm.get_network_stats(): self.network_monitor_file.write("%.1f %s\n" % (r_timestamp, line)) + if hasattr(self.psutil_process, 'children'): + p_children = self.psutil_process.children(recursive=True) + else: + p_children = self.psutil_process.get_children(recursive=True) + + if self.fd_file: + for child_process in p_children: + try: + if hasattr(self.psutil_process, 'num_fds'): + self.fd_file.write("%.1f %s %d\n" % + (r_timestamp, child_process.pid, child_process.num_fds())) + else: + self.fd_file.write("%.1f %s %d\n" % + (r_timestamp, child_process.pid, child_process.get_num_fds())) + except AccessDenied: + pass # Just ignore the file descriptors of this forbidden process + if self.end_time and timestamp > self.end_time: # if self.end_time == 0 the time out is disabled. print "Time out, killing monitored processes." self.timed_out = True diff --git a/scripts/r/file_descriptors.r b/scripts/r/file_descriptors.r new file mode 100644 index 000000000..34cbf9796 --- /dev/null +++ b/scripts/r/file_descriptors.r @@ -0,0 +1,29 @@ +library(ggplot2) +library(reshape) + +args <- commandArgs(TRUE) +minX <- as.integer(args[1]) +maxX <- as.integer(args[2]) + +source(paste(Sys.getenv('R_SCRIPTS_PATH'), 'annotation.r', sep='/')) +df3 <- load_annotations() + +if(file.exists("fd_usage.log")){ + df <- read.table("fd_usage.log", header = TRUE, check.names = FALSE, na.strings = "?") + num_columns <- ncol(df) - 1 + df$type <- 'Process' + df$time <- df$time - df$time[1] + + p <- ggplot(df) + theme_bw() + p <- add_annotations(p, df, df3) + p <- p + geom_line(alpha=0.8, aes(x=time, y=num_fds, group=pid, colour=pid)) + p <- p + facet_grid(type ~ ., scales = "free_y") + p <- p + theme(legend.position = "none") + p <- p + labs(x = "\nTime into experiment (Seconds)", y = "Number of open file descriptors\n") + if(length(args) > 0){ + p <- p + xlim(minX, maxX) + } + p + + ggsave(file="fd_usage.png", width=12, height=6, dpi=100) +}