Skip to content

Commit

Permalink
Make the script more Pythonic and abstract an interface
Browse files Browse the repository at this point in the history
  • Loading branch information
Galaxy102 committed Nov 12, 2021
1 parent 08350c2 commit 6a2a680
Show file tree
Hide file tree
Showing 2 changed files with 149 additions and 103 deletions.
2 changes: 2 additions & 0 deletions __init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
from .bluetooth_battery import BatteryStateQuerier, BatteryQueryError
from bluetooth import BluetoothError
250 changes: 147 additions & 103 deletions bluetooth_battery.py
Original file line number Diff line number Diff line change
@@ -1,119 +1,163 @@
#!/usr/bin/env python3

"""
A python script to get battery level from Bluetooth headsets
A python library to get battery level from Bluetooth headsets
"""

# License: GPL-3.0
# Author: @TheWeirdDev
# Author: @TheWeirdDev, @GaLaXy102
# 29 Sept 2019

import sys
import argparse
import bluetooth
from typing import Optional, Union, List, Dict


class BatteryQueryError(bluetooth.BluetoothError):
pass


class SocketDataIterator:
def __init__(self, sock: bluetooth.BluetoothSocket, chunk_size: int = 128):
"""
Create an Iterator over the given Socket
chunk_size defines the amount of data in Bytes to be read per iteration
"""
self._sock = sock
self._chunk_size = chunk_size

def __next__(self):
"""
Receive chunks
"""
return self._sock.recv(self._chunk_size)


class RFCOMMSocket(bluetooth.BluetoothSocket):

def __init__(self, proto=bluetooth.RFCOMM, _sock=None):
super().__init__(proto, _sock)

def __iter__(self):
"""
Iterate over incoming chunks of 128 Bytes
"""
return SocketDataIterator(self)

@staticmethod
def find_rfcomm_port(device_mac) -> int:
"""
Find the RFCOMM port number for a given bluetooth device
"""
uuid = "0000111e-0000-1000-8000-00805f9b34fb"
services: List[Dict] = bluetooth.find_service(address=device_mac, uuid=uuid)

for service in services:
if "protocol" in service.keys() and service["protocol"] == "RFCOMM":
return service["port"]
# Raise Interface error when the required service is not offered my the end device
raise bluetooth.BluetoothError("Couldn't find the RFCOMM port number. Perhaps the device is offline?")

def send(self, data):
"""
This function sends a message through a bluetooth socket with added line separators
"""
return super().send(b"\r\n" + data + b"\r\n")


class BatteryStateQuerier:

def __init__(self, bluetooth_mac: str, bluetooth_port: Optional[Union[str, int]] = None):
"""
Prepare a query for the end devices' battery state
bluetooth_mac is the MAC of the end device, e.g. 11:22:33:44:55:66
bluetooth_port is the Port of the RFCOMM/SPP service of the end device.
It will be determined automatically if not given.
The actual query can be performed using the int() and str() method.
"""
self._bt_settings = bluetooth_mac, int(bluetooth_port or RFCOMMSocket.find_rfcomm_port(bluetooth_mac))

def __int__(self):
"""
Perform a reading and get the result as int between 0 and 100
"""
return self._perform_query()

def __str__(self):
"""
Perform a reading and get the result as str between 0% and 100%
"""
return "{:.0%}".format(self._perform_query() / 100)

def _perform_query(self) -> int:
"""
Will try to get and print the battery level of supported devices
"""
result = None
sock = RFCOMMSocket()
sock.connect(self._bt_settings)
# Iterate received packets until there is no more or a result was found
for line in sock:
if b"BRSF" in line:
sock.send(b"+BRSF: 1024")
sock.send(b"OK")
elif b"CIND=" in line:
sock.send(b"+CIND: (\"battchg\",(0-5))")
sock.send(b"OK")
elif b"CIND?" in line:
sock.send(b"+CIND: 5")
sock.send(b"OK")
elif b"BIND=?" in line:
# Announce that we support the battery level HF indicator
# https://www.bluetooth.com/specifications/assigned-numbers/hands-free-profile/
sock.send(b"+BIND: (2)")
sock.send(b"OK")
elif b"BIND?" in line:
# Enable battery level HF indicator
sock.send(b"+BIND: 2,1")
sock.send(b"OK")
elif b"XAPL=" in line:
sock.send(b"+XAPL=iPhone,7")
sock.send(b"OK")
elif b"IPHONEACCEV" in line:
parts = line.strip().split(b',')[1:]
if len(parts) > 1 and (len(parts) % 2) == 0:
parts = iter(parts)
params = dict(zip(parts, parts))
if b'1' in params:
result = (int(params[b'1']) + 1) * 10
break
elif b"BIEV=" in line:
params = line.strip().split(b"=")[1].split(b",")
if params[0] == b"2":
result = int(params[1])
break
elif b"XEVENT=BATTERY" in line:
params = line.strip().split(b"=")[1].split(b",")
result = int(params[1]) / int(params[2]) * 100
break
else:
sock.send(b"OK")
sock.close()
# Check whether the result was found, otherwise raise an Error
if result is None:
raise BatteryQueryError("Could not query the battery state.")
return result


def send(sock, message):
"""
This function sends a message through a bluetooth socket
"""
sock.send(b"\r\n" + message + b"\r\n")


def get_at_command(sock, line, device):
"""
Will try to get and print the battery level of supported devices
"""
blevel = -1

if b"BRSF" in line:
send(sock, b"+BRSF: 1024")
send(sock, b"OK")
elif b"CIND=" in line:
send(sock, b"+CIND: (\"battchg\",(0-5))")
send(sock, b"OK")
elif b"CIND?" in line:
send(sock, b"+CIND: 5")
send(sock, b"OK")
elif b"BIND=?" in line:
# Announce that we support the battery level HF indicator
# https://www.bluetooth.com/specifications/assigned-numbers/hands-free-profile/
send(sock, b"+BIND: (2)")
send(sock, b"OK")
elif b"BIND?" in line:
# Enable battery level HF indicator
send(sock, b"+BIND: 2,1")
send(sock, b"OK")
elif b"XAPL=" in line:
send(sock, b"+XAPL=iPhone,7")
send(sock, b"OK")
elif b"IPHONEACCEV" in line:
parts = line.strip().split(b',')[1:]
if len(parts) > 1 and (len(parts) % 2) == 0:
parts = iter(parts)
params = dict(zip(parts, parts))
if b'1' in params:
blevel = (int(params[b'1']) + 1) * 10
elif b"BIEV=" in line:
params = line.strip().split(b"=")[1].split(b",")
if params[0] == b"2":
blevel = int(params[1])
elif b"XEVENT=BATTERY" in line:
params = line.strip().split(b"=")[1].split(b",")
blevel = int(params[1]) / int(params[2]) * 100
else:
send(sock, b"OK")

if blevel != -1:
print(f"Battery level for {device} is {blevel}%")
return False

return True


def find_rfcomm_port(device):
"""
Find the RFCOMM port number for a given bluetooth device
"""
uuid = "0000111e-0000-1000-8000-00805f9b34fb"
proto = bluetooth.find_service(address=device, uuid=uuid)
if len(proto) == 0:
print("Couldn't find the RFCOMM port number")
return 4

for pr in proto:
if 'protocol' in pr and pr['protocol'] == 'RFCOMM':
port = pr['port']
return port
return 4


def main():
if __name__ == "__main__":
"""
The starting point of the program. For each device address in the argument
list a bluetooth socket will be opened and the battery level will be read
and printed to stdout
"""
if len(sys.argv) < 2:
print("Usage: bluetooth_battery.py BT_MAC_ADDRESS_1.PORT ...")
print(" Port number is optional")
sys.exit()
else:
for device in sys.argv[1:]:
i = device.find('.')
if i == -1:
port = find_rfcomm_port(device)
else:
port = int(device[i+1:])
device = device[:i]
try:
sock = bluetooth.BluetoothSocket(bluetooth.RFCOMM)
sock.connect((device, port))
while get_at_command(sock, sock.recv(128), device):
pass
sock.close()
except OSError as err:
print(f"{device} is offline", err)


if __name__ == "__main__":
main()
parser = argparse.ArgumentParser(description="Get battery level from Bluetooth headsets")
parser.add_argument("devices", metavar="DEVICE_MAC[.PORT]", type=str, nargs="+",
help="(MAC address of target)[.SPP Port]")
args = parser.parse_args()
for device in args.devices:
query = BatteryStateQuerier(*device.split("."))
print("Battery level for {} is {}".format(device, str(query)))

0 comments on commit 6a2a680

Please sign in to comment.