Skip to content

Commit

Permalink
- Own attempt at quick timeout and close of initiated connections, bu…
Browse files Browse the repository at this point in the history
…t looks like it don't work.

git-svn-id: https://svn.tribler.org/abc/branches/arno/d07-09-21-fastgui-from-mb-r5526@6013 001aeff7-3401-0410-a489-f7902fc005dd
  • Loading branch information
arno committed Nov 2, 2007
1 parent 82c8986 commit d22b1b0
Show file tree
Hide file tree
Showing 7 changed files with 48 additions and 24 deletions.
34 changes: 24 additions & 10 deletions BitTornado/BT1/Encrypter.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,14 @@

DEBUG = False

MAX_INCOMPLETE = 32 # Arno: was 8
if sys.platform == 'win32':
# On windows XP SP2 we can't initiate more than 10 conns/second
# or have more than 10 pending conns (not clear which), so limit it.
# If we use more we get problems e.g. in VOD that the HTTP request
# of VLC to our HTTPServer times out. Windows die die die.
MAX_INCOMPLETE = 8 # safety margin
else:
MAX_INCOMPLETE = 32

def make_readable(s):
if not s:
Expand Down Expand Up @@ -211,6 +218,9 @@ def read_peer_id(self, s):
if self.locally_initiated:
self.connection.write(self.Encoder.my_id)
incompletecounter.decrement()
# Arno: open new conn from queue if at limit. Faster than RawServer task
self.Encoder._start_connection_from_queue(sched=False)

c = self.Encoder.connecter.connection_made(self)
self.keepalive = c.send_keepalive
return 4, self.read_len
Expand Down Expand Up @@ -255,6 +265,8 @@ def sever(self):
self.connecter.connection_lost(self)
elif self.locally_initiated:
incompletecounter.decrement()
# Arno: open new conn from queue if at limit. Faster than RawServer task
self.Encoder._start_connection_from_queue(sched=False)

def send_message_raw(self, message):
if not self.closed:
Expand Down Expand Up @@ -398,31 +410,33 @@ def start_connections(self, list):
self.to_connect.extend(list)
self.trackertime = int(time())

def _start_connection_from_queue(self):
def _start_connection_from_queue(self,sched=True):

if not self.to_connect:
return

if self.connecter.external_connection_made:
max_initiate = self.config['max_initiate']
else:
max_initiate = int(self.config['max_initiate']*1.5)
cons = len(self.connections)
if cons >= self.max_connections or cons >= max_initiate:
#print >>sys.stderr,"encoder: start_from_queue delay 60"
delay = 60
delay = 60.0
elif self.paused or incompletecounter.toomany():
#print >>sys.stderr,"encoder: start_from_queue delay 1"
delay = 1
delay = 1.0
else:
#print >>sys.stderr,"encoder: start_from_queue delay 0"
delay = 0
delay = 0.0
dns, id = self.to_connect.pop(0)
self.start_connection(dns, id)
if self.to_connect:
if self.to_connect and sched:
print >>sys.stderr,"encoder: start_from_queue delay",delay
self.raw_server.add_task(self._start_connection_from_queue, delay)

def start_connection(self, dns, id, coord_con = False):
""" Locally initiated connection """
if DEBUG:
print >>sys.stderr,"encoder: start_connection:",dns
print >>sys.stderr,"encoder: start_connection: qlen",len(self.to_connect),"nconns",len(self.connections),"maxi",self.config['max_initiate'],"maxc",self.config['max_connections']
print >>sys.stderr,"encoder: start_connection: qlen",len(self.to_connect),"nconns",len(self.connections),"maxi",self.config['max_initiate'],"maxc",self.config['max_connections']

if ( self.paused
or len(self.connections) >= self.max_connections
Expand Down
14 changes: 11 additions & 3 deletions BitTornado/SocketHandler.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ def __init__(self, socket_handler, sock, handler, ip = None):
self.handler = handler
self.buffer = []
self.last_hit = clock()
self.only_hit = True
self.fileno = sock.fileno()
self.connected = False
self.skipped = 0
Expand Down Expand Up @@ -181,15 +182,21 @@ def __init__(self, timeout, ipv6_enable, readsize = 100000):
self.servers = {}

def scan_for_timeouts(self):
t = clock() - self.timeout
print >> sys.stderr,"SocketHandler: scan_timeouts"
tokill = []
for s in self.single_sockets.values():
if s.only_hit: # only initiated connection, never got any data
to = 20.0
else:
to = self.timeout
t = clock() - to

if s.last_hit < t:
tokill.append(s)
for k in tokill:
if k.socket is not None:
if DEBUG:
print >> sys.stderr,"SocketHandler: scan_timeout closing connection",k.get_ip()
#if DEBUG:
print >> sys.stderr,"SocketHandler: scan_timeout closing connection &&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&",k.get_ip()
self._close_socket(k)

def bind(self, port, bind = '', reuse = False, ipv6_socket_style = 1):
Expand Down Expand Up @@ -427,6 +434,7 @@ def handle_events(self, events):
if (event & POLLIN):
try:
s.last_hit = clock()
s.only_hit = False
data = s.socket.recv(100000)
if not data:
if DEBUG:
Expand Down
2 changes: 1 addition & 1 deletion Tribler/API/defaults.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
'(0 = disabled, 1 = mode 1 [fast,win32], 2 = mode 2 [slow,win32], 3 = mode 3 [any platform])'),
('timeout', 300.0,
'time to wait between closing sockets which nothing has been received on'),
('timeout_check_interval', 60.0,
('timeout_check_interval', 3.0,
'time to wait between checking if any connections have timed out'),

# Tribler session opts
Expand Down
7 changes: 3 additions & 4 deletions Tribler/API/launchmanycore.py
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ def checkpoint(self,stop=False):

def network_checkpoint_callback(self,dllist,stop):
""" Called by network thread """
psdict = {'version':PERSISTENTSTATE_CURRENTVERSION}
psdict = {}
for d in dllist:
# Tell all downloads to stop, and save their persistent state
# in a infohash -> pstate dict which is then passed to the user
Expand All @@ -395,9 +395,8 @@ def network_checkpoint_callback(self,dllist,stop):
psdict[infohash] = pstate

try:
if len(psdict) > 1: # not just version:
for infohash,pstate in psdict.iteritems():
self.save_download_pstate(infohash,pstate)
for infohash,pstate in psdict.iteritems():
self.save_download_pstate(infohash,pstate)
except Exception,e:
self.rawserver_nonfatalerrorfunc(e)

Expand Down
3 changes: 2 additions & 1 deletion Tribler/Video/EmbeddedPlayer.py
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,8 @@ def getVlcMediaCtrl(self):

# Arno: 2007-05-11: Don't ask me why but without the "--verbose=0" vlc will ignore the key redef.
#params = ["--verbose=0","--key-fullscreen", "Esc"]
params = []
params = ["--verbose=0"]
###params = []

if sys.platform == 'darwin':
params += ["--plugin-path", "%s/lib/vlc" % (
Expand Down
2 changes: 1 addition & 1 deletion Tribler/Video/VideoOnDemand.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
# pull all video data as if a video player was attached
FAKEPLAYBACK = False

DEBUG = True
DEBUG = False
DEBUGPP = False

class PiecePickerStreaming(PiecePicker):
Expand Down
10 changes: 6 additions & 4 deletions Tribler/Video/VideoServer.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ def __init__(self):
BaseHTTPServer.HTTPServer.__init__( self, ("",self.port), SimpleServer )
self.daemon_threads = True
self.allow_reuse_address = True
#self.request_queue_size = 10
self.errorcallback = None
self.statuscallback = None

Expand All @@ -187,10 +188,11 @@ def getInstance(*args, **kw):
getInstance = staticmethod(getInstance)

def background_serve( self ):
#name = "VideoHTTPServerThread-1"
#self.thread2 = Thread(target=self.serve_forever,name=name)
#self.thread2.start()
thread.start_new_thread( self.serve_forever, () )
name = "VideoHTTPServerThread-1"
self.thread2 = Thread(target=self.serve_forever,name=name)
self.thread2.setDaemon(True)
self.thread2.start()
#thread.start_new_thread( self.serve_forever, () )

def register(self,errorcallback,statuscallback):
self.errorcallback = errorcallback
Expand Down

0 comments on commit d22b1b0

Please sign in to comment.