Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Start using flake8 #12

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion avea/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
"""avea
"""avea.

A python library to control Elgato's Avea Bulb.
"""
Expand Down
155 changes: 92 additions & 63 deletions avea/avea.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,20 @@
import bluepy # for BLE transmission

# __all__ definition for __init__.py
__all__ = ["Bulb", "discover_avea_bulbs", "compute_brightness", "compute_transition_table",
"compute_color", "check_bounds", "AveaDelegate", "AveaPeripheral"]
__all__ = [
"Bulb",
"discover_avea_bulbs",
"compute_brightness",
"compute_transition_table",
"compute_color",
"check_bounds",
"AveaDelegate",
"AveaPeripheral",
]


class Bulb:
"""The class that represents an Avea bulb
"""The class that represents an Avea bulb.

An Bulb object describe a real world Avea bulb.
It is linked to an AveaPeripheral object for BLE transmissions
Expand All @@ -36,7 +44,7 @@ def __init__(self, address):
self.white = 0

def subscribe_to_notification(self):
"""Subscribe to the bulbs notifications
"""Subscribe to the bulbs notifications.f

41 is the handle for notifications (its the one used to communicate +1)
0100 is the "enable bit"
Expand All @@ -56,7 +64,8 @@ def connect(self):
self.bulb = AveaPeripheral()
self.delegate = AveaDelegate(self)

# Catch if the bulb does not respond instead of crashing the whole script
# Catch if the bulb does not respond i
# instead of crashing the whole script
try:
self.bulb.connect(self.addr)
except Exception:
Expand All @@ -66,23 +75,25 @@ def connect(self):
self.bulb.withDelegate(self.delegate)
self.subscribe_to_notification()
return True

def get_fw_version(self):
"""Retrieve and return the current Firmware Revision of the bulb"""
"""Retrieve and return the current Firmware Revision of the bulb"""
version = ""
if self.connect():
try:
c = self.bulb.getCharacteristics(uuid=bluepy.btle.AssignedNumbers.firmwareRevisionString)
c = self.bulb.getCharacteristics(
uuid=bluepy.btle.AssignedNumbers.firmwareRevisionString
)
version = c[0].read()
except (bluepy.btle.BTLEException, BrokenPipeError, AttributeError) as e:
print(e, "get_fw_version")
if type(version) is bytes:
version = version.decode("utf-8")
self.fw_version = version;
self.fw_version = version
return version

def disconnect(self):
"""Disconnect from the bulb
"""Disconnect from the bulb.

Cleanup properly the bluepy's Peripheral and the Notification's Delegate to avoid weird issues
"""
Expand All @@ -94,17 +105,18 @@ def disconnect(self):
del self.delegate

def set_brightness(self, brightness):
"""Send the specified brightness to the bulb
"""Send the specified brightness to the bulb.

:args: - brightness value from 0 to 4095
"""
if self.connect():
self.bulb.writeCharacteristic(
40, compute_brightness(check_bounds(brightness)))
40, compute_brightness(check_bounds(brightness))
)
self.disconnect()

def get_brightness(self):
"""Retrieve and return the current brightness of the bulb
"""Retrieve and return the current brightness of the bulb.

:return: Current brightness, from 0 to 4095
"""
Expand All @@ -117,39 +129,46 @@ def get_brightness(self):
return self.brightness

def set_color(self, white, red, green, blue):
""" Set the color of the bulb using the full range of colors
"""Set the color of the bulb using the full range of colors.

:args: - white value from 0 to 4095
- red value
- green value
- blue value
"""
if self.connect():
self.bulb.writeCharacteristic(40, compute_color(check_bounds(white),
check_bounds(red),
check_bounds(
green),
check_bounds(blue)))
self.bulb.writeCharacteristic(
40,
compute_color(
check_bounds(white),
check_bounds(red),
check_bounds(green),
check_bounds(blue),
),
)
self.disconnect()

def set_rgb(self, red, green, blue):
"""Set the color of the bulb in a RGB format
"""Set the color of the bulb in a RGB format.

:args: - red value
- green value
- blue value
"""
if self.connect():
self.bulb.writeCharacteristic(40, compute_color(check_bounds(0),
check_bounds(
red*16),
check_bounds(
green*16),
check_bounds(blue*16)))
self.bulb.writeCharacteristic(
40,
compute_color(
check_bounds(0),
check_bounds(red * 16),
check_bounds(green * 16),
check_bounds(blue * 16),
),
)
self.disconnect()


def set_smooth_transition(self, target_red, target_green, target_blue, duration=2, fps=60):

"""Transition smoothly between the current color and a given
target color, in a given timeframe and a fps number, or how many changes per second are wanted.

Expand All @@ -172,33 +191,39 @@ def set_smooth_transition(self, target_red, target_green, target_blue, duration=
if self.connect():

# compute iters & interval
iterations = duration*fps
interval = 1/fps

iterations = duration * fps
interval = 1 / fps

# Compute the tables
transition_table_red = compute_transition_table(
init_r, target_red, iterations)
init_r, target_red, iterations
)
transition_table_green = compute_transition_table(
init_g, target_green, iterations)
init_g, target_green, iterations
)
transition_table_blue = compute_transition_table(
init_b, target_blue, iterations)
init_b, target_blue, iterations
)

# Loopy loop
for i in range(iterations):

val = compute_color(check_bounds(0),check_bounds(transition_table_red[i]*16),check_bounds(transition_table_green[i]*16),check_bounds(transition_table_blue[i]*16))
val = compute_color(
check_bounds(0),
check_bounds(transition_table_red[i] * 16),
check_bounds(transition_table_green[i] * 16),
check_bounds(transition_table_blue[i] * 16),
)
try:
self.bulb.writeCharacteristic(40,val)
self.bulb.writeCharacteristic(40, val)
except:
self.disconnect()
self.connect()
time.sleep(interval)
self.disconnect()


def get_color(self):
"""Retrieve and return the current color of the bulb
"""Retrieve and return the current color of the bulb.

A .5s sleep is here to accommodate for the bulb's response time :
If get_color() is called directly after set_color(), the transmission
Expand All @@ -225,7 +250,7 @@ def get_rgb(self):
self.bulb.waitForNotifications(1.0)
self.disconnect()

return int(self.red/16), int(self.green/16), int(self.blue/16)
return int(self.red / 16), int(self.green / 16), int(self.blue / 16)

def get_name(self):
"""Get and return the name of the bulb
Expand All @@ -244,7 +269,7 @@ def set_name(self, name):
"""Set the name of the bulb"""
if self.connect():
byteName = name.encode("utf-8")
command = "58"+byteName.hex()
command = "58" + byteName.hex()
self.bulb.writeCharacteristic(40, command)
self.disconnect()

Expand All @@ -263,17 +288,18 @@ def process_notification(self, data):

# Convert the brightness value
if cmd is 57:
self.brightness = int.from_bytes(values, 'little')
self.brightness = int.from_bytes(values, "little")

# Convert the color values
elif cmd is 35:
hex = values.hex()
self.red = int.from_bytes(bytes.fromhex(
hex[-4:]), "little") ^ int(0x3000)
self.green = int.from_bytes(bytes.fromhex(
hex[-8:-4]), "little") ^ int(0x2000)
self.blue = int.from_bytes(bytes.fromhex(
hex[-12:-8]), "little") ^ int(0x1000)
self.red = int.from_bytes(bytes.fromhex(hex[-4:]), "little") ^ int(0x3000)
self.green = int.from_bytes(bytes.fromhex(hex[-8:-4]), "little") ^ int(
0x2000
)
self.blue = int.from_bytes(bytes.fromhex(hex[-12:-8]), "little") ^ int(
0x1000
)
self.white = int.from_bytes(bytes.fromhex(hex[-16:-12]), "little")

# Convert the name
Expand Down Expand Up @@ -310,7 +336,7 @@ def compute_brightness(brightness):
"""Return the hex code for the specified brightness"""
value = hex(int(brightness))[2:]
value = value.zfill(4)
value = value[2:] + value[:2] # how to swap endianness
value = value[2:] + value[:2] # how to swap endianness

return "57" + value

Expand All @@ -320,23 +346,24 @@ def compute_color(w=2000, r=0, g=0, b=0):
color = "35"
fading = "1101"
unknow = "0a00"
white = (int(w) | int(0x8000)).to_bytes(2, byteorder='little').hex()
red = (int(r) | int(0x3000)).to_bytes(2, byteorder='little').hex()
green = (int(g) | int(0x2000)).to_bytes(2, byteorder='little').hex()
blue = (int(b) | int(0x1000)).to_bytes(2, byteorder='little').hex()
white = (int(w) | int(0x8000)).to_bytes(2, byteorder="little").hex()
red = (int(r) | int(0x3000)).to_bytes(2, byteorder="little").hex()
green = (int(g) | int(0x2000)).to_bytes(2, byteorder="little").hex()
blue = (int(b) | int(0x1000)).to_bytes(2, byteorder="little").hex()

return color + fading + unknow + white + red + green + blue


def compute_transition_table(init, target, iterations):
"""Compute a list of values for a smooth transition
between 2 numbers.


"""Compute a list of values for a smooth transition
between 2 numbers.

Args:
init (int): initial value
target (int): target value
iterations (int): number of in-between values to create

Returns:
list: the transition list
"""
Expand All @@ -360,7 +387,8 @@ def compute_transition_table(init, target, iterations):
while len(tmp_range) < iterations:
insert_index = insert_index % len(tmp_range)
tmp_range.insert(
insert_index, tmp_range[insert_index-1 if insert_index-1 > 0 else 0])
insert_index, tmp_range[insert_index - 1 if insert_index - 1 > 0 else 0]
)
insert_index += 4

# Pop the first and append the index
Expand All @@ -370,7 +398,7 @@ def compute_transition_table(init, target, iterations):


def check_bounds(value):
"""Check if the given value is out-of-bounds (0 to 4095)
"""Check if the given value is out-of-bounds (0 to 4095).

:args: the value to be checked
:returns: the checked value
Expand All @@ -384,14 +412,14 @@ def check_bounds(value):

else:
return value

except ValueError:
print("Value was not a number, returned default value of 0")
return 0


class AveaDelegate(bluepy.btle.DefaultDelegate):
"""Overwrite of Bluepy's DefaultDelegate class
"""Overwrite of Bluepy's DefaultDelegate class.

It adds a bulb object that refers to the Bulb.bulb object which
called this delegate.
Expand All @@ -406,7 +434,8 @@ def handleNotification(self, cHandle, data):

It's just passing the data to process_notification(),
which is linked to the emitting bulb (via self.bulb).
This allows us to use the bulb's functions and interact with the response.
This allows us to use the bulb's functions
and interact with the response.
"""
self.bulb.process_notification(data)

Expand All @@ -418,12 +447,12 @@ class AveaPeripheral(bluepy.btle.Peripheral):
"""

def writeCharacteristic(self, handle, val, withResponse=False):
"""Overwrite of the writeCharacteristic method
"""Overwrite of the writeCharacteristic method.

By default it only allows strings as input
As we craft our own paylod, we need to bypass this behavior
and send hex values directly
"""
cmd = "wrr" if withResponse else "wr"
self._writeCmd("%s %X %s\n" % (cmd, handle, val))
return self._getResp('wr')
return self._getResp("wr")