diff --git a/experiments/tribler/channel_download.conf b/experiments/tribler/channel_download.conf index 1b8d72bf1..b347029db 100644 --- a/experiments/tribler/channel_download.conf +++ b/experiments/tribler/channel_download.conf @@ -10,7 +10,7 @@ tracker_cmd = 'run_tracker.sh' experiment_time = 3600 local_instance_cmd = "process_guard.py -c channel_download.py -c channel_download.py -c channel_download.py -c channel_download.py -c channel_download.py -t $EXPERIMENT_TIME -m $OUTPUT_DIR -o $OUTPUT_DIR " -#post_process_cmd = 'post_process_dispersy_experiment.sh' +post_process_cmd = 'gumby/scripts/channel_download/channel_download.sh' sync_subscribers_amount = 5 #Run python in optimized mode? diff --git a/experiments/tribler/channel_download.py b/experiments/tribler/channel_download.py index 5a5f94bc4..114893fe6 100755 --- a/experiments/tribler/channel_download.py +++ b/experiments/tribler/channel_download.py @@ -47,6 +47,8 @@ def __init__(self, *argv, **kwargs): self.num_peers = -1 + self.id_experiment = os.environ['EXPERIMENT_NAME'].replace(" ", "") + def start_session(self): super(ChannelDownloadClient, self).start_session() self.session_deferred.addCallback(self.__config_dispersy) @@ -136,6 +138,7 @@ def registerCallbacks(self): self.scenario_runner.register(self.join, 'join') self.scenario_runner.register(self.publish, 'publish') self.scenario_runner.register(self.start_download, 'start_download') + self.scenario_runner.register(self.stop_download, 'stop_download') self.scenario_runner.register(self.setup_seeder, 'setup_seeder') def create(self): @@ -190,18 +193,24 @@ def __ds_active_callback(self, ds): tmp = float(p_num) / float(num_peers * num_pieces) availability += tmp - self._logger.error('%s:%s infohash=%s, downsp=%d, upsp=%d, progress=%s, status=%s, peers=%s rat=%s dl=%s up=%s avail=%.8f' % + peers = [x for x in ds.get_peerlist() if any(x['have']) and not + x['ip'].startswith("127.0.0")] + + ds.get_peerlist = lambda: peers + + self._logger.error('%s:%s infohash=%s, downsp=%d, upsp=%d, progress=%s, status=%s, peers=%s rat=%s dl=%s up=%s avail=%.8f dsavail=%.8f' % (self._dispersy.lan_address[0], self._dispersy.lan_address[1], ds.get_download().tdef.get_infohash().encode('hex')[:5], ds.get_current_speed('down')/1000, ds.get_current_speed('up')/1000, ds.get_progress(), dlstatus_strings[ds.get_status()], - sum(ds.get_num_seeds_peers()), + len(peers), ds.seeding_ratio, ds.get_total_transferred('down')/1000, ds.get_total_transferred('up')/1000, - availability)) + availability, + ds.get_availability())) if ds.get_progress() == 0.0 and ds.get_status() == 3: self._connect_peer(ds.get_download().handle) @@ -209,25 +218,35 @@ def __ds_active_callback(self, ds): return 1.0, True def setup_seeder(self, filename, size): - try: - tdef = TorrentDef.load(path.join(self.upload_dir_path, "%s.torrent" % filename)) - except IOError: + filename = self.id_experiment + "_" + filename + tpath = path.join(self.upload_dir_path, "%s.data" % filename) + tdef = None + if path.isfile(tpath): + tpath = path.join(self.upload_dir_path, "%s.torrent" % filename) + + if path.isfile(tpath): + tdef = TorrentDef.load(tpath) + else: + # writing file has not finished yet + reactor.callLater(10.0, self.setup_seeder, filename, size) + else: # file not found. In DAS case, this is because the file in another node tdef = self._create_test_torrent(filename, size) - dscfg = DefaultDownloadStartupConfig.getInstance().copy() - dscfg.set_dest_dir(self.upload_dir_path) - dscfg.set_hops(0) - dscfg.set_safe_seeding(False) + if tdef: + dscfg = DefaultDownloadStartupConfig.getInstance().copy() + dscfg.set_dest_dir(self.upload_dir_path) + dscfg.set_hops(0) + dscfg.set_safe_seeding(False) - self._logger.error("Setup seeder for %s", hexlify(tdef.get_infohash())) + self._logger.error("Setup seeder for %s", hexlify(tdef.get_infohash())) - download = self.session.start_download_from_tdef(tdef, dscfg) - download.set_state_callback(self.__ds_active_callback, True) + download = self.session.start_download_from_tdef(tdef, dscfg) + download.set_state_callback(self.__ds_active_callback, True) def publish(self, filename, size): if self.my_channel or self.joined_community: - tdef = self._create_test_torrent(filename, size) + tdef = self._create_test_torrent(self.id_experiment + "_" + filename, size) if self.my_channel: self.my_channel._disp_create_torrent_from_torrentdef(tdef, int(time.time())) elif self.joined_community: @@ -263,6 +282,7 @@ def _connect_peer(self, thandle): thandle.connect_peer((ip, port), 0) def start_download(self, name): + name = name if name.startswith(self.id_experiment) else self.id_experiment + "_" + name if name not in self.dl_lc.keys(): self.dl_lc[name] = LoopingCall(self.start_download, name) self.dl_lc[name].start(1.0, now=False) @@ -328,12 +348,26 @@ def _success_download(ihash_str): self._logger.error("Pending download") + def stop_download(self, dname): + dname = self.id_experiment + "_" + dname + for name in self.dl_lc.keys(): + if name == dname: + lc = self.dl_lc.pop(name) + if not self.downloaded_torrent[name]: + self._logger.error("Can't make it to download %s", name) + lc.stop() + else: + ihash = unhexlify(self.downloaded_torrent[dname]) + d_impl = self.session.get_download(ihash) + self._logger.error("Stopping Download %s", self.downloaded_torrent[dname]) + self.session.remove_download_by_id(ihash, True, True) + def stop(self, retry=3): # stop stalled download for name in self.dl_lc.keys(): if not self.downloaded_torrent[name]: - self.dl_lc[name].stop() + self.dl_lc.pop(name).stop() self._logger.error("Can't make it to download %s", name) downloads_impl = self.session.get_downloads() diff --git a/experiments/tribler/channel_download_10s.scenario b/experiments/tribler/channel_download_10s.scenario new file mode 100644 index 000000000..b12f47a25 --- /dev/null +++ b/experiments/tribler/channel_download_10s.scenario @@ -0,0 +1,85 @@ +@0:0 set_master_member 3081a7301006072a8648ce3d020106052b81040027038192000403cbbfd2dfb67a7db66c88988df56f93fa6e7f982f9a6a0fa8898492c8b8cae23e10b159ace60b7047012082a5aa4c6e221d7e58107bb550436d57e046c11ab4f51f0ab18fa8f58d0346cc12d1cc2b61fc86fe5ed192309152e11e3f02489e30c7c971dd989e1ce5030ea0fb77d5220a92cceb567cbc94bc39ba246a42e215b55e9315b543ddeff0209e916f77c0d747 +@0:2 start_dispersy {1-84} +@0:20 start_session +@0:60 online +@0:60 reset_dispersy_statistics +@0:60 annotate start-experiment +# 1 : creator (seeder) +# 2-12 : dedicated seeder +# 13-81 : peers +# 81 - 84 : flashcrowd all scenario +@0:60 join {2-20} +@0:60 create {1} +@0:62 publish file1gb_1 1524288001 {1} +@0:64 publish file1gb_2 1524288002 {1} +@0:66 publish file1gb_3 1524288003 {1} +@0:70 publish file5gb_1 5524288007 {1} +@0:73 publish file5gb_2 5524288006 {1} +@0:76 publish file5gb_3 5524288005 {1} +@0:79 publish file5gb_4 5524288004 {1} +@0:81 publish file5gb_5 5524288003 {1} +@0:84 publish file5gb_6 5524288002 {1} +@0:87 publish file5gb_7 5524288001 {1} +@0:88 join {21-40} +@0:92 join {41-60} +@0:105 setup_seeder file1gb_2 1524288002 {2} +@0:106 setup_seeder file1gb_3 1524288003 {3,4} +@0:105 setup_seeder file5gb_3 5524288005 {5} +@0:105 setup_seeder file5gb_4 5524288004 {6} +@0:105 setup_seeder file5gb_5 5524288003 {7,8} +@0:105 setup_seeder file5gb_6 5524288002 {9,10} +@0:105 setup_seeder file5gb_7 5524288001 {11,12} +@0:110 join {61-80} +@0:115 join {81-84} +# wave 1 +@2:6 start_download file1gb_1 {13-15} +@2:12 start_download file1gb_2 {16-18} +@2:18 start_download file1gb_3 {19-21} +@2:24 start_download file5gb_1 {22} +@2:30 start_download file5gb_2 {23-25} +@2:36 start_download file5gb_3 {26} +@2:42 start_download file5gb_4 {27-29} +@2:48 start_download file5gb_5 {30} +@2:54 start_download file5gb_6 {33-38} +@3:1 start_download file5gb_7 {39,40} +# wave 2 +@1:2:6 start_download file1gb_1 {41} +@1:2:12 start_download file1gb_2 {42} +@1:2:18 start_download file1gb_3 {42} +@1:2:24 start_download file5gb_1 {44} +@1:2:30 start_download file5gb_2 {45} +@1:2:36 start_download file5gb_3 {46} +@1:2:42 start_download file5gb_4 {47} +@1:2:48 start_download file5gb_5 {48} +@1:2:54 start_download file5gb_6 {49} +@1:3:1 start_download file5gb_7 {50} +# wave 3 +@2:2:6 start_download file1gb_1 {51,52} +@2:2:12 start_download file1gb_2 {53,54} +@2:2:18 start_download file1gb_3 {55,56} +@2:2:24 start_download file5gb_1 {57} +@2:2:30 start_download file5gb_2 {58-60} +@2:2:36 start_download file5gb_3 {61} +@2:2:42 start_download file5gb_4 {62,63} +@2:3:1 start_download file5gb_7 {64} +#wave 4 +@3:2:6 start_download file1gb_1 {65,66} +@3:2:12 start_download file1gb_2 {67,68} +@3:2:18 start_download file1gb_3 {69,70} +@3:2:24 start_download file5gb_1 {71-75} +@3:2:30 start_download file5gb_2 {76} +@3:2:36 start_download file5gb_3 {77,78} +@3:2:42 start_download file5gb_4 {79-81} +#flashcrowd +@4:2:6 start_download file1gb_1 {81-84} +@4:2:12 start_download file1gb_2 {81-84} +@4:2:18 start_download file1gb_3 {81-84} +@4:2:24 start_download file5gb_1 {81-84} +@4:2:30 start_download file5gb_2 {81-84} +@4:2:36 start_download file5gb_3 {81-84} +@4:2:42 start_download file5gb_4 {81-84} +@4:2:48 start_download file5gb_5 {81-84} +@4:2:54 start_download file5gb_6 {81-84} +@4:3:1 start_download file5gb_7 {81-84} +@7:58:0 reset_dispersy_statistics +@7:59:0 stop \ No newline at end of file diff --git a/experiments/tribler/channel_download_das.conf b/experiments/tribler/channel_download_das.conf index 6e60b768c..8098810ba 100644 --- a/experiments/tribler/channel_download_das.conf +++ b/experiments/tribler/channel_download_das.conf @@ -27,3 +27,4 @@ messages_to_plot = 'torrent' PYTHONOPTIMIZE = True SCENARIO_FILE = channel_download.scenario +post_process_cmd = 'gumby/scripts/channel_download/channel_download.sh' diff --git a/scripts/channel_download/channel_dl_parse.py b/scripts/channel_download/channel_dl_parse.py index 7dd703cc8..c6e96f493 100644 --- a/scripts/channel_download/channel_dl_parse.py +++ b/scripts/channel_download/channel_dl_parse.py @@ -15,6 +15,8 @@ def __init__(self): self.ultotal = 0 self.dltotal = 0 self.progress = 0 + self.avail = 0.0 + self.dsavail = 0.0 self.ip = "0.0.0.0:0" @@ -26,6 +28,7 @@ def main(): infohash_set = set([]) infohash_short_long = {} + infohash_name = {} tlist = defaultdict(lambda: Activity()) with open(args.log) as fin: @@ -38,10 +41,12 @@ def main(): split_msg = message.split(" ") - if len(split_msg) == 5 and split_msg[0] == 'Find' or split_msg[0] == 'Setup': + if (len(split_msg) == 5 and split_msg[0] == 'Find') or split_msg[0] == 'Setup': infohash_set.add(split_msg[-1]) + if len(split_msg) == 5: + infohash_name[split_msg[-1]] = split_msg[1] - if len(split_msg) == 10: + if len(split_msg) == 12: try: ihash_short = split_msg[1].split("=")[1][:-1] except IndexError: @@ -66,11 +71,20 @@ def main(): a.progress = float(split_msg[4].split("=")[1][:-1]) a.infohash = ihash_short + a.avail = float(split_msg[10].split("=")[1]) + a.dsavail = float(split_msg[11].split("=")[1]) + tlist[line] = a - print "ts\tihash\tactor\tul_speed\tdl_speed\tul_tot\tdl_tot\tprogress" + print "ts\tihash\tactor\tul_speed\tdl_speed\tul_tot\tdl_tot\tprogress\tavail\tdsavail" for _, a in tlist.items(): - print "%s\t%s\t%s\t%d\t%d\t%d\t%d\t%f\t" %(a.ts, infohash_short_long[a.infohash], a.ip, a.ulrate, a.dlrate, a.ultotal, a.dltotal, a.progress) + print "%s\t%s\t%s\t%d\t%d\t%d\t%d\t%f\t%f\t%f\t" %(a.ts, infohash_short_long[a.infohash], a.ip, a.ulrate, + a.dlrate, a.ultotal, a.dltotal, a.progress, + a.avail, a.dsavail) + + with open('ihashname.txt', 'a') as tbl: + for i in infohash_name.keys(): + tbl.write("%s\t%s\n" %(i, infohash_name[i])) if __name__ == "__main__": main() diff --git a/scripts/channel_download/channel_dl_proc.R b/scripts/channel_download/channel_dl_proc.R old mode 100644 new mode 100755 index 1a57fac05..7da7a82db --- a/scripts/channel_download/channel_dl_proc.R +++ b/scripts/channel_download/channel_dl_proc.R @@ -1,6 +1,7 @@ #!/usr/bin/env Rscript library(argparse) +library(plyr) library(dplyr) library(ggplot2) library(lubridate) @@ -36,15 +37,22 @@ floor_time <- function(x, k = 1, unit = c("second", "minute", "hour", "day", RobustMax <- function(x) {if (length(x)>0) max(x) else 0} +IhashToFile <- function(df, it) { + tmp <- merge(df, it, by.x = "ihash", by.y="V1", all.x=TRUE) + colnames(tmp)[ncol(tmp)] <- "filename" + tmp$ihash <- NULL + tmp +} + parser <- ArgumentParser(description="Plot priority download") parser$add_argument("file", help="log used as input") parser$add_argument("output", help="name used as output", nargs="?", - default="priority_figure.pdf") + default="channel_dl_figure.pdf") args <- parser$parse_args() tt <- read.table(args$file, header=T) -tt$ts <- floor_time(as.POSIXct(tt$ts, "%Y%m%dT%H%M%OSZ", tz='UTC'), 1, "minute") +tt$ts <- floor_time(as.POSIXct(tt$ts, "%Y%m%dT%H%M%OSZ", tz='UTC'), 2, "minute") # download speed graph : MEAN dlspeed.mean <- recast(tt, ts ~ ihash, measure.var=c("dl_speed"), fun.aggregate=mean) @@ -55,7 +63,7 @@ tmp <- as.data.frame(dlspeed.mean) tmp$ts <- time(dlspeed.mean) dlspeed.mean <- tmp dlmean <- melt(dlspeed.mean, id.var="ts", variable.name="ihash", value.name="dl_speed") -dlmean$dl_speed <- as.numeric(dlmean$dl_dl_speed) +dlmean <- transform(dlmean, dl_speed = as.numeric(dl_speed)) # download speed graph : MAX dlspeed.max <- recast(tt, ts ~ ihash, measure.var=c("dl_speed"), fun.aggregate=RobustMax) @@ -67,6 +75,7 @@ tmp$ts <- time(dlspeed.max) dlspeed.max <- tmp dlmax <- melt(dlspeed.max, id.var="ts", variable.name="ihash", value.name="dl_speed") dlmax$dl_speed <- as.numeric(dlmax$dl_speed) +dlmax <- transform(dlmax, dl_speed = as.numeric(dl_speed)) # number peer where upload > dl tt$pos <- as.numeric(tt$ul_tot > tt$dl_tot) @@ -81,29 +90,64 @@ pmc <- melt(posit.max.clean, id.var="ts", variable.name="ihash", value.name="amo # gain t.seeder <- filter(tt, (progress == 1.0 & dl_tot == 0)) t.seeder <- t.seeder[! (duplicated(t.seeder$ihash) & duplicated(t.seeder$actor)), c("ihash", "actor"), drop = FALSE] - t.download <- tt[! (t.seeder$ihash %in% tt$ihash & tt$actor %in% t.seeder$actor),] - t.download$gain <- t.download$ul_tot - t.download$dl_tot gain <- recast(t.download, ts + actor ~ ihash, measure.var=c("gain"), fun.aggregate=mean) gain_na <- melt(gain, id.var=c("ts", "actor"), variable.name = "ihash", value.name = "avg_gain") +# monkey patch +gain[is.na(gain)] <- 0 + gain <- zoo(gain, order.by=gain$ts) gain <- na.locf(gain) tmp <- as.data.frame(gain) tmp$ts <- time(gain) gain <- tmp - -# monkey patch -gain[is.na(gain)] <- 0 - gain.g <- melt(gain, id.var="ts", variable.name="ihash", value.name="gainvalue") gain.g <- filter(gain.g, ! ihash == 'actor') gain.g$gainvalue <- as.numeric(gain.g$gainvalue) gain.g <- ddply(gain.g, c("ts", "ihash"), numcolwise(mean)) +# piece availability +dlist = list() +for (i in levels(unique(tt$ihash))){ + tmp <- tt %>% filter(ihash == i, avail > 0, dl_tot == 0) + dlist[[i]] <- tmp +} +tmp <- bind_rows(dlist) +t.avail <- recast(tmp, ts ~ ihash, measure.var=c("avail"), fun.aggregate=mean) +#t.avail.melt_na <- melt(t.avail, id.var="ts", variable.name="ihash", value.name="avail") +t.avail <- zoo(t.avail, order.by = t.avail$ts) +t.avail <- na.locf(t.avail) +tmp <- as.data.frame(t.avail) +tmp$ts <- time(t.avail) +t.avail <- tmp + +p.avail <- melt(t.avail, id.var="ts", variable.name="ihash", value.name="avail") +p.avail$avail <- as.numeric(p.avail$avail) + + +# node amount +ttx <- filter(tt, nodem != -1) +t.popularity <- recast(ttx, ts ~ ihash, measure.var=c("nodem"), fun.aggregate=mean) +t.popularity <- zoo(t.popularity, order.by = t.popularity$ts) +t.popularity <- na.locf(t.popularity) +tmp <- as.data.frame(t.popularity) +tmp$ts <- time(t.popularity) +t.popularity <- tmp + +p.popularity <- melt(t.popularity, id.var="ts", variable.name="ihash", value.name="dsavail") +p.popularity$dsavail <- as.numeric(p.popularity$dsavail) + +# replace infohash info with filename +it <- read.table('ihashname.txt') + # plots -ggplot(dlmean) + geom_line(aes(x=ts, y=dl_speed, color=ihash)) + theme(legend.position="bottom") + ggtitle("Mean") -ggplot(dlmax) + geom_line(aes(x=ts, y=dl_speed, color=ihash)) + theme(legend.position="bottom") + ggtitle("Max") -ggplot(pmc, aes(x=ts, y=amount, color=ihash, group=ihash)) + geom_line() + theme(legend.position="bottom") + ggtitle("Max peer where upload > downloaded") -ggplot(gain.g, aes(x=ts, y=gainvalue, color=ihash, group=ihash)) + geom_line() + theme(legend.position="bottom") + ggtitle("Average upload gain") \ No newline at end of file +pdf(file=args$output, width=18, height=8) +ggplot(IhashToFile(dlmean, it)) + geom_line(aes(x=ts, y=dl_speed, color=filename, alpha=0.5, size=2)) + theme(legend.position="bottom") + ggtitle("Mean") + scale_x_datetime(breaks=seq(from = min(dlmean$ts), to = max(dlmean$ts), by = 3600)) + scale_y_continuous(limits=c(0,150)) +ggplot(IhashToFile(dlmax, it)) + geom_line(aes(x=ts, y=dl_speed, color=filename, alpha=0.5, size=2)) + theme(legend.position="bottom") + ggtitle("Max") + scale_x_datetime(breaks=seq(from = min(dlmax$ts), to = max(dlmax$ts), by = 3600)) + scale_y_continuous(limits=c(0,300)) +ggplot(IhashToFile(pmc, it), aes(x=ts, y=amount, color=filename, group=filename, alpha=0.5, size=2)) + geom_line() + theme(legend.position="bottom") + ggtitle("Max peer where upload > downloaded") + scale_x_datetime(breaks=seq(from = min(pmc$ts), to = max(pmc$ts), by = 3600)) +ggplot(IhashToFile(gain.g, it), aes(x=ts, y=gainvalue, color=filename, alpha=0.5, size=2)) + geom_line() + theme(legend.position="bottom") + ggtitle("Average upload gain (For peers)") + scale_x_datetime(breaks=seq(from = min(gain.g$ts), to = max(gain.g$ts), by = 3600)) +ggplot(IhashToFile(p.avail, it)) + geom_line(aes(x=ts, y=avail, color=filename, alpha=0.5, size=2)) + theme(legend.position="bottom") + ggtitle("Availability") + scale_x_datetime(breaks=seq(from = min(p.avail$ts), to = max(p.avail$ts), by = 3600)) + scale_y_continuous(limits=c(0,1)) +ggplot(IhashToFile(p.popularity, it)) + geom_line(aes(x=ts, y=dsavail, color=filename, alpha=0.5, size=2)) + theme(legend.position="bottom") + ggtitle("Popularity-Availability") + scale_x_datetime(breaks=seq(from = min(p.popularity$ts), to = max(p.popularity$ts), by = 3600)) + scale_y_continuous(limits=c(0,8)) +dev.off() diff --git a/scripts/channel_download/channel_download.sh b/scripts/channel_download/channel_download.sh index f3cb5dce2..191693515 100755 --- a/scripts/channel_download/channel_download.sh +++ b/scripts/channel_download/channel_download.sh @@ -13,6 +13,7 @@ #cd $OUTPUT_DIR echo "Running post channel downloading..." +SCRIPT_DIR="$(dirname "$(readlink -f "$0")")" MERGE_TSV_FILE="all.tsv" @@ -27,21 +28,30 @@ do fname="${log_%.*}_`echo ${thedir} | tr /. _`.log" cp $log "data/$fname" + echo "copying $fname to data/" + tname="${fname%.*}.tsv" - python channel_dl_parse.py data/$fname > data/$tname + python $SCRIPT_DIR/channel_dl_parse.py data/$fname > data/$tname done tsvlist=$(find . -regex ".*\.tsv") -echo -e "ts\tihash\tactor\tul_speed\tdl_speed\tul_tot\tdl_tot\tprogress" > $MERGE_TSV_FILE.raw +echo -e "ts\tihash\tactor\tul_speed\tdl_speed\tul_tot\tdl_tot\tprogress\tavail\tdsavail" > $MERGE_TSV_FILE.raw for tsvs in $tsvlist do - tail -n +2 $tsvs >> $MERGE_TSV_FILE.raw + if [ `awk -F' ' '{print NF; exit}' $tsvs` -eq 10 ]; then + tail -n +2 $tsvs >> $MERGE_TSV_FILE.raw + fi done (head -n 1 $MERGE_TSV_FILE.raw && tail -n +2 $MERGE_TSV_FILE.raw | sort) > $MERGE_TSV_FILE.sorted -#R --no-save --quiet < "$EXPERIMENT_DIR"/scripts/install.r +sort -k2 ihashname.txt | uniq > ihashname_unique.txt +mv ihashname_unique.txt ihashname.txt + +$SCRIPT_DIR/channel_dl_proc.R all.tsv.sorted + # Create RData files for plotting from log files and crate image -#"$EXPERIMENT_DIR"/scripts/log2rdata.sh +convert -resize 25% -density 300 -depth 8 -quality 85 channel_dl_figure.pdf channel_dl_figure.png +rm -rf localhost/ tracker/ err.txt 2>&1