Skip to content

Commit

Permalink
Add support for Nearby/Fast Pair protocol
Browse files Browse the repository at this point in the history
This is, at least, what Google Pixel Buds use to report left, right,
and case battery status.

The protocol documentation doesn't appear to be public, but information
can be gathered from opensource code.
  • Loading branch information
drinkcat committed Dec 14, 2023
1 parent 8437d49 commit f541f98
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 7 deletions.
2 changes: 1 addition & 1 deletion Readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ result = int(query) # returns integer between 0 and 100
# or
result = str(query) # returns "0%".."100%"
# or
result = query.query() # returns a dictonary {'overall': 100}
result = query.query() # returns a dictonary, e.g. {'overall': 100, 'left': 100, 'right': 100, 'case': 87}
```

As errors can occur in a wireless system, you probably want to handle them:
Expand Down
110 changes: 104 additions & 6 deletions bluetooth_battery.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,10 @@ def __iter__(self):
return SocketDataIterator(self)

@staticmethod
def find_rfcomm_port(device_mac) -> int:
def find_rfcomm_port(device_mac, uuid = "0000111e-0000-1000-8000-00805f9b34fb") -> int:
"""
Find the RFCOMM port number for a given bluetooth device
"""
uuid = "0000111e-0000-1000-8000-00805f9b34fb"
services: List[Dict] = bluetooth.find_service(address=device_mac, uuid=uuid)

for service in services:
Expand Down Expand Up @@ -86,6 +85,15 @@ def __init__(self, bluetooth_mac: str, bluetooth_port: Optional[Union[str, int]]
self._bluetooth_mac = bluetooth_mac
self._bluetooth_port = int(bluetooth_port or RFCOMMSocket.find_rfcomm_port(bluetooth_mac))

# Only try to use Nearby/Fast Pair protocol if port is not specified
self._use_fastpair = (bluetooth_port == None)
if self._use_fastpair:
try:
self._fastpair_port = RFCOMMSocket.find_rfcomm_port(bluetooth_mac, "df21fe2c-2515-4fdb-8886-f12c4d67927c")
except bluetooth.BluetoothError:
logger.debug("No Nearby service on this device, disabling.")
self._use_fastpair = False

def __int__(self):
"""
Perform a reading and get the result as int between 0 and 100
Expand All @@ -95,6 +103,17 @@ def __int__(self):
if "overall" in result:
return result["overall"]

# No overall, take minimum of left and right
left = result.get("left")
right = result.get("right")
if left != None:
if right == None or left < right:
return left
else:
return right
elif right != None:
return right

raise BatteryQueryError("Could not query the battery state.")

def __str__(self):
Expand All @@ -105,11 +124,25 @@ def __str__(self):

def query(self) -> dict[str, int]:
"""
Will try to get the battery level of supported devices
Will try to get the battery level of supported devices, returns a dictionary with some of the following keys:
- "overall" battery status (e.g. minimum of left and right earbuds)
- "left"/"right"/"case": self explanatory
"""
result = self._perform_query_rfcomm()
if self._use_fastpair:
result.update(self._perform_query_fastpair())
logger.debug("Query results: " + str(result))

return result

def _perform_query_rfcomm(self) -> dict[str, int]:
"""
Will try to get the battery level of supported devices over (more or
less) standard RFCOMM/SPP service.
"""
result: dict[str, int] = {}
sock = RFCOMMSocket()
logger.debug("Connecting to {}.{}".format(self._bluetooth_mac, self._bluetooth_port))
logger.debug("Connecting to {}.{} (RFCOMM)".format(self._bluetooth_mac, self._bluetooth_port))
sock.connect((self._bluetooth_mac, self._bluetooth_port))
logger.debug("Connected")
# Iterate received packets until there is no more or a result was found
Expand Down Expand Up @@ -160,9 +193,66 @@ def query(self) -> dict[str, int]:
else:
sock.send(b"OK")
sock.close()
logger.debug("Query results: " + str(result))
logger.debug("RFCOMM query results: " + str(result))
return result

def _perform_query_fastpair(self) -> dict[str, int]:
"""
Will try to get the battery level of supported devices over the Nearby/Fast Pair protocol.
"""
result: dict[str, int] = {}
sock = RFCOMMSocket()
logger.debug("Connecting to {}.{} (Nearby/Fast Pair)".format(self._bluetooth_mac, self._fastpair_port))
sock.connect((self._bluetooth_mac, self._fastpair_port))
logger.debug("Connected")

try:
for data in sock:
while len(data) > 0:
# Header format https://developers.google.com/nearby/fast-pair/specifications/extensions/messagestream
if len(data) < 4:
logger.debug("Invalid data")
return result

group = data[0]
code = data[1]
length = int.from_bytes(data[2:4], "big")
payload = data[4:4+length+1]
logger.debug("Group: {}; Code: {}; Length: {}; Payload: {}".format(group, code, length, payload.hex()))

# See https://github.com/google/nearby/blob/main/fastpair/message_stream/message.h
# DeviceInformationEvent group, BatteryUpdated code.
if group == 3 and code == 3:
# Example: https://github.com/google/nearby/blob/main/fastpair/common/battery_notification.cc
def parse_level(level: int) -> int:
# 0xff means not present/unknown
if level == 0xff:
return None
# Top bit indicates if the device is charging
return level & 0x7f

if len(payload) == 1:
result["overall"] = parse_level(payload[0])
elif len(payload) == 3:
result["left"] = parse_level(payload[0])
result["right"] = parse_level(payload[1])
result["case"] = parse_level(payload[2])
else:
raise BatteryQueryError("Invalid number of data points in Fast Pair packet.")
break

# Parse next packet
data = data[4+length:]
if len(result) > 0:
break

sock.close()
except bluetooth.BluetoothError as e:
# Fast Pair errors are not fatal.
logger.info("Bluetooth Error while reading Nearby/Fast Pair data: " + str(e))
pass
logger.debug("Nearby/Fast Pair query results: " + str(result))
return result

def main():
"""
Expand All @@ -179,7 +269,15 @@ def main():
logging.basicConfig(level=logging.DEBUG)
for device in args.devices:
query = BatteryStateQuerier(*device.split("."))
print("Battery level for {} is {}".format(device, str(query)))
result = query.query()
print("Battery level for {}:".format(device), end="")
if "overall" in result:
print(" {:.0%}.".format(result["overall"]/100), end="")
for key, value in result.items():
if key != "overall":
print(" {}: {:.0%}.".format(key.capitalize(), value/100), end="")
print("")


if __name__ == "__main__":
main()

0 comments on commit f541f98

Please sign in to comment.