(file) Return to Connecter.py CVS log (file) (dir) Up to [Development] / shadowsclient / BitTorrent

File: [Development] / shadowsclient / BitTorrent / Connecter.py (download) / (as text)
Revision: 1.6, Fri Feb 13 18:35:44 2004 UTC (6 years, 6 months ago) by theshadow
Branch: MAIN
CVS Tags: HEAD
Changes since 1.5: +5 -2 lines
fixed a few bugs, including major ones in http downloader

# Written by Bram Cohen
# see LICENSE.txt for license information

from bitfield import bitfield_to_booleans, booleans_to_bitfield
from traceback import print_exc
from binascii import b2a_hex
from CurrentRateMeasure import Measure
true = 1
false = 0

def toint(s):
    return long(b2a_hex(s), 16)

def tobinary(i):
    return (chr(i >> 24) + chr((i >> 16) & 0xFF) + 
        chr((i >> 8) & 0xFF) + chr(i & 0xFF))

CHOKE = chr(0)
UNCHOKE = chr(1)
INTERESTED = chr(2)
NOT_INTERESTED = chr(3)
# index
HAVE = chr(4)
# index, bitfield
BITFIELD = chr(5)
# index, begin, length
REQUEST = chr(6)
# index, begin, piece
PIECE = chr(7)
# index, begin, piece
CANCEL = chr(8)

class Connection:
    def __init__(self, connection, connecter):
        self.connection = connection
        self.connecter = connecter
        self.got_anything = false

    def get_ip(self):
        return self.connection.get_ip()

    def get_id(self):
        return self.connection.get_id()

    def close(self):
        self.connection.close()

    def is_flushed(self):
        if self.connecter.rate_capped:
            return false
        return self.connection.is_flushed()

    def is_locally_initiated(self):
        return self.connection.is_locally_initiated()

    def send_interested(self):
        self.connection.send_message(INTERESTED)

    def send_not_interested(self):
        self.connection.send_message(NOT_INTERESTED)

    def send_choke(self):
        self.connection.send_message(CHOKE)

    def send_unchoke(self):
        self.connection.send_message(UNCHOKE)

    def send_request(self, index, begin, length):
        self.connection.send_message(REQUEST + tobinary(index) + 
            tobinary(begin) + tobinary(length))

    def send_cancel(self, index, begin, length):
        self.connection.send_message(CANCEL + tobinary(index) + 
            tobinary(begin) + tobinary(length))

    def send_piece(self, index, begin, piece):
        assert not self.connecter.rate_capped
        self.connecter._update_upload_rate(len(piece))
        self.connection.send_message(PIECE + tobinary(index) + 
            tobinary(begin) + piece)

    def send_bitfield(self, bitfield):
        self.connection.send_message(BITFIELD + 
            booleans_to_bitfield(bitfield))

    def send_have(self, index):
        self.connection.send_message(HAVE + tobinary(index))

    def get_upload(self):
        return self.upload

    def get_download(self):
        return self.download

    def set_download(self, download):
        self.download = download

class Connecter:
    def __init__(self, make_upload, downloader, choker, numpieces,
            totalup, config, sched = None):
        self.downloader = downloader
        self.make_upload = make_upload
        self.choker = choker
        self.numpieces = numpieces
        self.config = config
        self.sched = sched
        self.totalup = totalup
        self.rate_capped = false
        self.connections = {}
        self.external_connection_made = 0

    def _update_upload_rate(self, amount):
        self.totalup.update_rate(amount)
        if self.config['max_upload_rate'] > 0 and self.totalup.get_rate_noupdate() > self.config['max_upload_rate']:
            self.rate_capped = true
            self.sched(self._uncap, self.totalup.time_until_rate(self.config['max_upload_rate']))

    def _uncap(self):
        self.rate_capped = false
        while not self.rate_capped:
            up = None
            minrate = None
            for i in self.connections.values():
                if not i.upload.is_choked() and i.upload.has_queries() and i.connection.is_flushed():
                    rate = i.upload.get_rate()
                    if up is None or rate < minrate:
                        up = i.upload
                        minrate = rate
            if up is None:
                break
            up.flushed()
            if self.totalup.get_rate_noupdate() > self.config['max_upload_rate']:
                break

    def how_many_connections(self):
        return len(self.connections)

    def connection_made(self, connection):
        c = Connection(connection, self)
        self.connections[connection] = c
        c.upload = self.make_upload(c)
        c.download = self.downloader.make_download(c)
        self.choker.connection_made(c)

    def connection_lost(self, connection):
        c = self.connections[connection]
        d = c.download
        del self.connections[connection]
        d.disconnected()
        self.choker.connection_lost(c)

    def connection_flushed(self, connection):
        self.connections[connection].upload.flushed()

    def got_piece(self, i):
        for co in self.connections.values():
            co.send_have(i)

    def got_message(self, connection, message):
        c = self.connections[connection]
        t = message[0]
        if t == BITFIELD and c.got_anything:
            connection.close()
            return
        c.got_anything = true
        if (t in [CHOKE, UNCHOKE, INTERESTED, NOT_INTERESTED] and 
                len(message) != 1):
            connection.close()
            return
        if t == CHOKE:
            c.download.got_choke()
        elif t == UNCHOKE:
            c.download.got_unchoke()
        elif t == INTERESTED:
            c.upload.got_interested()
        elif t == NOT_INTERESTED:
            c.upload.got_not_interested()
        elif t == HAVE:
            if len(message) != 5:
                connection.close()
                return
            i = toint(message[1:])
            if i >= self.numpieces:
                connection.close()
                return
            c.download.got_have(i)
        elif t == BITFIELD:
            b = bitfield_to_booleans(message[1:], self.numpieces)
            if b is None:
                connection.close()
                return
            c.download.got_have_bitfield(b)
        elif t == REQUEST:
            if len(message) != 13:
                connection.close()
                return
            i = toint(message[1:5])
            if i >= self.numpieces:
                connection.close()
                return
            c.upload.got_request(i, toint(message[5:9]), 
                toint(message[9:]))
        elif t == CANCEL:
            if len(message) != 13:
                connection.close()
                return
            i = toint(message[1:5])
            if i >= self.numpieces:
                connection.close()
                return
            c.upload.got_cancel(i, toint(message[5:9]), 
                toint(message[9:]))
        elif t == PIECE:
            if len(message) <= 9:
                connection.close()
                return
            i = toint(message[1:5])
            if i >= self.numpieces:
                connection.close()
                return
            if c.download.got_piece(i, toint(message[5:9]), message[9:]):
                self.got_piece(i)
        else:
            connection.close()

class DummyUpload:
    def __init__(self, events):
        self.events = events
        events.append('made upload')

    def flushed(self):
        self.events.append('flushed')

    def got_interested(self):
        self.events.append('interested')
        
    def got_not_interested(self):
        self.events.append('not interested')

    def got_request(self, index, begin, length):
        self.events.append(('request', index, begin, length))

    def got_cancel(self, index, begin, length):
        self.events.append(('cancel', index, begin, length))

class DummyDownload:
    def __init__(self, events):
        self.events = events
        events.append('made download')
        self.hit = 0

    def disconnected(self):
        self.events.append('disconnected')

    def got_choke(self):
        self.events.append('choke')

    def got_unchoke(self):
        self.events.append('unchoke')

    def got_have(self, i):
        self.events.append(('have', i))

    def got_have_bitfield(self, bitfield):
        self.events.append(('bitfield', bitfield))

    def got_piece(self, index, begin, piece):
        self.events.append(('piece', index, begin, piece))
        self.hit += 1
        return self.hit > 1

class DummyDownloader:
    def __init__(self, events):
        self.events = events

    def make_download(self, connection):
        return DummyDownload(self.events)

class DummyConnection:
    def __init__(self, events):
        self.events = events

    def send_message(self, message):
        self.events.append(('m', message))

class DummyChoker:
    def __init__(self, events, cs):
        self.events = events
        self.cs = cs

    def connection_made(self, c):
        self.events.append('made')
        self.cs.append(c)

    def connection_lost(self, c):
        self.events.append('lost')

def test_operation():
    events = []
    cs = []
    co = Connecter(lambda c, events = events: DummyUpload(events), 
        DummyDownloader(events), DummyChoker(events, cs), 3, 
        Measure(10))
    assert events == []
    assert cs == []
    
    dc = DummyConnection(events)
    co.connection_made(dc)
    assert len(cs) == 1
    cc = cs[0]
    co.got_message(dc, BITFIELD + chr(0xc0))
    co.got_message(dc, CHOKE)
    co.got_message(dc, UNCHOKE)
    co.got_message(dc, INTERESTED)
    co.got_message(dc, NOT_INTERESTED)
    co.got_message(dc, HAVE + tobinary(2))
    co.got_message(dc, REQUEST + tobinary(1) + tobinary(5) + tobinary(6))
    co.got_message(dc, CANCEL + tobinary(2) + tobinary(3) + tobinary(4))
    co.got_message(dc, PIECE + tobinary(1) + tobinary(0) + 'abc')
    co.got_message(dc, PIECE + tobinary(1) + tobinary(3) + 'def')
    co.connection_flushed(dc)
    cc.send_bitfield([false, true, true])
    cc.send_interested()
    cc.send_not_interested()
    cc.send_choke()
    cc.send_unchoke()
    cc.send_have(4)
    cc.send_request(0, 2, 1)
    cc.send_cancel(1, 2, 3)
    cc.send_piece(1, 2, 'abc')
    co.connection_lost(dc)
    x = ['made upload', 'made download', 'made', 
        ('bitfield', [true, true, false]), 'choke', 'unchoke',
        'interested', 'not interested', ('have', 2), 
        ('request', 1, 5, 6), ('cancel', 2, 3, 4),
        ('piece', 1, 0, 'abc'), ('piece', 1, 3, 'def'), 
        ('m', HAVE + tobinary(1)),
        'flushed', ('m', BITFIELD + chr(0x60)), ('m', INTERESTED), 
        ('m', NOT_INTERESTED), ('m', CHOKE), ('m', UNCHOKE), 
        ('m', HAVE + tobinary(4)), ('m', REQUEST + tobinary(0) + 
        tobinary(2) + tobinary(1)), ('m', CANCEL + tobinary(1) + 
        tobinary(2) + tobinary(3)), ('m', PIECE + tobinary(1) + 
        tobinary(2) + 'abc'), 'disconnected', 'lost']
    for a, b in zip (events, x):
        assert a == b, repr((a, b))

def test_conversion():
    assert toint(tobinary(50000)) == 50000

No CVS admin address has been configured
Powered by
ViewCVS 0.9.3