Skip to content

Commit

Permalink
Add VR600 Support
Browse files Browse the repository at this point in the history
  • Loading branch information
GuyKh committed May 5, 2020
1 parent 7d64b06 commit 16b92f5
Showing 1 changed file with 179 additions and 3 deletions.
182 changes: 179 additions & 3 deletions custom_components/tplink_router/device_tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,10 @@

def get_scanner(hass, config):
"""Validate the configuration and return a TP-Link scanner."""
for cls in [Tplink5DeviceScanner, Tplink4DeviceScanner,
Tplink3DeviceScanner, Tplink2DeviceScanner,
Tplink1DeviceScanner, TplinkDeviceScanner]:
for cls in [Tplink6DeviceScanner, Tplink5DeviceScanner,
Tplink4DeviceScanner, Tplink3DeviceScanner,
Tplink2DeviceScanner, Tplink1DeviceScanner,
TplinkDeviceScanner]:
scanner = cls(config[DOMAIN])
if scanner.success_init:
return scanner
Expand Down Expand Up @@ -494,3 +495,178 @@ def _update_info(self):
return True

return False

class Tplink6DeviceScanner(TplinkDeviceScanner):
"""This class queries an Archer VR600 router with TP-Link firmware."""

def __init__(self, config):
"""Initialize the scanner."""
self.pubkey = ''
self.jsessionId = ''
self.token = ''
super(Tplink6DeviceScanner, self).__init__(config)

def scan_devices(self):
"""Scan for new devices and return a list with found device IDs."""
self._update_info()
return self.last_results

# pylint: disable=no-self-use
def get_device_name(self, device):
"""Get the name of the wireless device."""
return None


def _get_pub_key(self):
# Get the modulu and exponent from the router
url = 'http:https://{}/cgi/getParm'.format(self.host)
referer = 'http:https://{}'.format(self.host)
response = requests.post(url, headers={ 'REFERER': referer})
if not response.status_code == 200:
return False

ee = self._get_field_from_router_response(response.text, 'ee')
nn = self._get_field_from_router_response(response.text, 'nn')

e = int(ee, 16)
n = int(nn, 16) #snipped for brevity

pubkey = construct((n, e))
self.pubkey = PKCS1_v1_5.new(pubkey)

return True

def _get_field_from_router_response(self, rText, key):
lines = rText.split('\n')
for line in lines:
if (line.startswith('var '+ key)):
# var ee="010101" => "010101"
return line.split('=\"')[1].split('\";')[0]
return ''


def _get_jsession_id(self):
username = self.username.encode('utf-8')
password = self.password.encode('utf-8')

b64pass = b64encode(password)
encryptedUsername = self.pubkey.encrypt(username)
encryptedPassword = self.pubkey.encrypt(b64pass)

base16username = b16encode(encryptedUsername).decode('utf-8').lower()
base16password = b16encode(encryptedPassword).decode('utf-8').lower()

referer = 'http:https://{}'.format(self.host)
url = 'http:https://{}/cgi/login?UserName={}&Passwd={}&Action=1&LoginStatus=0'.format(self.host, base16username, base16password)

randomSessionNum = self._get_random_alphaNumeric_string(15)

headers= { REFERER: referer }

response = requests.post(url, headers=headers)

if not response.status_code == 200:
_LOGGER.error("Error %s from router", page.response)
return False

self.jsessionId = dict(response.cookies)['JSESSIONID']
return True

def _get_random_alphaNumeric_string(self, stringLength=15):
lettersAndDigits = string.ascii_letters + string.digits
return ''.join((random.choice(lettersAndDigits) for i in range(stringLength)))

def _get_token(self):
url = 'http:https://{}/'.format(self.host)
referer = 'http:https://{}'.format(self.host)

headers= {
REFERER: referer,
COOKIE: 'loginErrorShow=1; JSESSIONID='+self.jsessionId,
}

cookies = {'JSESSIONID': self.jsessionId}
response = requests.get(url, headers=headers, cookies=cookies)

if not response.status_code == 200:
_LOGGER.error("Error %s from router", page.response)
return False


split = response.text.index('var token=') + len('var token=\"')
token = response.text[split:split+30]

self.token = token
return True


def _get_auth_tokens(self):
"""Retrieve auth tokens from the router."""
_LOGGER.info("Retrieving PublicKey...")

pubkey = self._get_pub_key()
if not pubkey:
return False

_LOGGER.info("Retrieving JSessionID...")
jsessionId = self._get_jsession_id()
if not jsessionId:
return False

_LOGGER.info("Retrieving Token...")
token = self._get_token()
if not token:
return False

return True

def _update_info(self):
"""Ensure the information from the TP-Link router is up to date.
Return boolean if scanning successful.
"""
if (self.jsessionId == '') or (self.token == ''):
self._get_auth_tokens()

_LOGGER.info("Loading wireless clients...")

referer = 'http:https://{}'.format(self.host)
headers= {
'TokenID': self.token,
REFERER: referer,
COOKIE: 'JSESSIONID=' + self.jsessionId
}

mac_results = []

# Check both the 2.4GHz and 5GHz client lists.
for clients_frequency in ('1', '2'):

# Refresh associated clients.
page = requests.post(
'http:https://{}/cgi?7'.format(self.host),
headers=headers,
data=(
'[ACT_WLAN_UPDATE_ASSOC#1,{},0,0,0,0#0,0,0,0,0,0]0,0\r\n'
).format(clients_frequency),
timeout=4)
if not page.status_code == 200:
_LOGGER.error("Error %s from router", page.status_code)
return False

# Retrieve associated clients.
page = requests.post(
'http:https://{}/cgi?6'.format(self.host),
headers=headers,
data=(
'[LAN_WLAN_ASSOC_DEV#0,0,0,0,0,0#1,{},0,0,0,0]0,1\r\n'
'AssociatedDeviceMACAddress\r\n'
).format(clients_frequency),
timeout=4)
if not page.status_code == 200:
_LOGGER.error("Error %s from router", page.status_code)
return False

mac_results.extend(self.parse_macs_colons.findall(page.text))

self.last_results = mac_results
return True

0 comments on commit 16b92f5

Please sign in to comment.