469 lines
13 KiB
Python
469 lines
13 KiB
Python
|
import logging
|
||
|
import RPi.GPIO as GPIO
|
||
|
import subprocess
|
||
|
import pwnagotchi.plugins as plugins
|
||
|
import signal
|
||
|
import smbus
|
||
|
import time
|
||
|
from threading import Thread
|
||
|
import atexit
|
||
|
from colorsys import hsv_to_rgb
|
||
|
|
||
|
|
||
|
try:
|
||
|
import queue
|
||
|
except ImportError:
|
||
|
import Queue as queue
|
||
|
|
||
|
ADDR = 0x3f
|
||
|
|
||
|
#adapted from version 0.0.2
|
||
|
__version__ = '0.0.2x'
|
||
|
|
||
|
_bus = None
|
||
|
|
||
|
LED_DATA = 7
|
||
|
LED_CLOCK = 6
|
||
|
|
||
|
REG_INPUT = 0x00
|
||
|
REG_OUTPUT = 0x01
|
||
|
REG_POLARITY = 0x02
|
||
|
REG_CONFIG = 0x03
|
||
|
|
||
|
NUM_BUTTONS = 5
|
||
|
|
||
|
BUTTON_A = 0
|
||
|
"""Button A"""
|
||
|
BUTTON_B = 1
|
||
|
"""Button B"""
|
||
|
BUTTON_C = 2
|
||
|
"""Button C"""
|
||
|
BUTTON_D = 3
|
||
|
"""Button D"""
|
||
|
BUTTON_E = 4
|
||
|
"""Button E"""
|
||
|
|
||
|
NAMES = ['A', 'B', 'C', 'D', 'E']
|
||
|
"""Sometimes you want to print the plain text name of the button that's triggered.
|
||
|
|
||
|
You can use::
|
||
|
|
||
|
buttonshim.NAMES[button_index]
|
||
|
|
||
|
To accomplish this.
|
||
|
|
||
|
"""
|
||
|
|
||
|
ERROR_LIMIT = 10
|
||
|
|
||
|
FPS = 60
|
||
|
|
||
|
LED_GAMMA = [
|
||
|
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
|
||
|
0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 2, 2, 2,
|
||
|
2, 2, 2, 3, 3, 3, 3, 3, 4, 4, 4, 4, 5, 5, 5, 5,
|
||
|
6, 6, 6, 7, 7, 7, 8, 8, 8, 9, 9, 9, 10, 10, 11, 11,
|
||
|
11, 12, 12, 13, 13, 13, 14, 14, 15, 15, 16, 16, 17, 17, 18, 18,
|
||
|
19, 19, 20, 21, 21, 22, 22, 23, 23, 24, 25, 25, 26, 27, 27, 28,
|
||
|
29, 29, 30, 31, 31, 32, 33, 34, 34, 35, 36, 37, 37, 38, 39, 40,
|
||
|
40, 41, 42, 43, 44, 45, 46, 46, 47, 48, 49, 50, 51, 52, 53, 54,
|
||
|
55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70,
|
||
|
71, 72, 73, 74, 76, 77, 78, 79, 80, 81, 83, 84, 85, 86, 88, 89,
|
||
|
90, 91, 93, 94, 95, 96, 98, 99, 100, 102, 103, 104, 106, 107, 109, 110,
|
||
|
111, 113, 114, 116, 117, 119, 120, 121, 123, 124, 126, 128, 129, 131, 132, 134,
|
||
|
135, 137, 138, 140, 142, 143, 145, 146, 148, 150, 151, 153, 155, 157, 158, 160,
|
||
|
162, 163, 165, 167, 169, 170, 172, 174, 176, 178, 179, 181, 183, 185, 187, 189,
|
||
|
191, 193, 194, 196, 198, 200, 202, 204, 206, 208, 210, 212, 214, 216, 218, 220,
|
||
|
222, 224, 227, 229, 231, 233, 235, 237, 239, 241, 244, 246, 248, 250, 252, 255]
|
||
|
|
||
|
# The LED is an APA102 driven via the i2c IO expander.
|
||
|
# We must set and clear the Clock and Data pins
|
||
|
# Each byte in _reg_queue represents a snapshot of the pin state
|
||
|
|
||
|
_reg_queue = []
|
||
|
_update_queue = []
|
||
|
_brightness = 0.5
|
||
|
|
||
|
_led_queue = queue.Queue()
|
||
|
|
||
|
_t_poll = None
|
||
|
|
||
|
_running = False
|
||
|
|
||
|
_states = 0b00011111
|
||
|
|
||
|
|
||
|
class Handler():
|
||
|
plugin = None
|
||
|
def __init__(self, plugin):
|
||
|
self.press = None
|
||
|
self.release = None
|
||
|
|
||
|
self.hold = None
|
||
|
self.hold_time = 0
|
||
|
|
||
|
self.repeat = False
|
||
|
self.repeat_time = 0
|
||
|
|
||
|
self.t_pressed = 0
|
||
|
self.t_repeat = 0
|
||
|
self.hold_fired = False
|
||
|
self.plugin = plugin
|
||
|
|
||
|
_handlers = [None,None,None,None,None]
|
||
|
|
||
|
|
||
|
def _run():
|
||
|
global _running, _states
|
||
|
_running = True
|
||
|
_last_states = 0b00011111
|
||
|
_errors = 0
|
||
|
|
||
|
while _running:
|
||
|
led_data = None
|
||
|
|
||
|
try:
|
||
|
led_data = _led_queue.get(False)
|
||
|
_led_queue.task_done()
|
||
|
|
||
|
except queue.Empty:
|
||
|
pass
|
||
|
|
||
|
try:
|
||
|
if led_data:
|
||
|
for chunk in _chunk(led_data, 32):
|
||
|
_bus.write_i2c_block_data(ADDR, REG_OUTPUT, chunk)
|
||
|
|
||
|
_states = _bus.read_byte_data(ADDR, REG_INPUT)
|
||
|
|
||
|
except IOError:
|
||
|
_errors += 1
|
||
|
if _errors > ERROR_LIMIT:
|
||
|
_running = False
|
||
|
raise IOError("More than {} IO errors have occurred!".format(ERROR_LIMIT))
|
||
|
|
||
|
for x in range(NUM_BUTTONS):
|
||
|
last = (_last_states >> x) & 1
|
||
|
curr = (_states >> x) & 1
|
||
|
handler = _handlers[x]
|
||
|
|
||
|
# If last > curr then it's a transition from 1 to 0
|
||
|
# since the buttons are active low, that's a press event
|
||
|
if last > curr:
|
||
|
handler.t_pressed = time.time()
|
||
|
handler.hold_fired = False
|
||
|
|
||
|
if callable(handler.press):
|
||
|
handler.t_repeat = time.time()
|
||
|
Thread(target=handler.press, args=(x, True, handler.plugin)).start()
|
||
|
|
||
|
continue
|
||
|
|
||
|
if last < curr and callable(handler.release):
|
||
|
Thread(target=handler.release, args=(x, False, handler.plugin)).start()
|
||
|
continue
|
||
|
|
||
|
if curr == 0:
|
||
|
if callable(handler.hold) and not handler.hold_fired and (time.time() - handler.t_pressed) > handler.hold_time:
|
||
|
Thread(target=handler.hold, args=(x,)).start()
|
||
|
handler.hold_fired = True
|
||
|
|
||
|
if handler.repeat and callable(handler.press) and (time.time() - handler.t_repeat) > handler.repeat_time:
|
||
|
_handlers[x].t_repeat = time.time()
|
||
|
Thread(target=_handlers[x].press, args=(x, True, handler.plugin)).start()
|
||
|
|
||
|
_last_states = _states
|
||
|
|
||
|
time.sleep(1.0 / FPS)
|
||
|
|
||
|
|
||
|
def _quit():
|
||
|
global _running
|
||
|
|
||
|
if _running:
|
||
|
_led_queue.join()
|
||
|
set_pixel(0, 0, 0)
|
||
|
_led_queue.join()
|
||
|
|
||
|
_running = False
|
||
|
_t_poll.join()
|
||
|
|
||
|
|
||
|
def setup():
|
||
|
global _t_poll, _bus
|
||
|
|
||
|
if _bus is not None:
|
||
|
return
|
||
|
|
||
|
_bus = smbus.SMBus(1)
|
||
|
|
||
|
_bus.write_byte_data(ADDR, REG_CONFIG, 0b00011111)
|
||
|
_bus.write_byte_data(ADDR, REG_POLARITY, 0b00000000)
|
||
|
_bus.write_byte_data(ADDR, REG_OUTPUT, 0b00000000)
|
||
|
|
||
|
_t_poll = Thread(target=_run)
|
||
|
_t_poll.daemon = True
|
||
|
_t_poll.start()
|
||
|
|
||
|
set_pixel(0, 0, 0)
|
||
|
|
||
|
atexit.register(_quit)
|
||
|
|
||
|
|
||
|
def _set_bit(pin, value):
|
||
|
global _reg_queue
|
||
|
|
||
|
if value:
|
||
|
_reg_queue[-1] |= (1 << pin)
|
||
|
else:
|
||
|
_reg_queue[-1] &= ~(1 << pin)
|
||
|
|
||
|
|
||
|
def _next():
|
||
|
global _reg_queue
|
||
|
|
||
|
if len(_reg_queue) == 0:
|
||
|
_reg_queue = [0b00000000]
|
||
|
else:
|
||
|
_reg_queue.append(_reg_queue[-1])
|
||
|
|
||
|
|
||
|
def _enqueue():
|
||
|
global _reg_queue
|
||
|
|
||
|
_led_queue.put(_reg_queue)
|
||
|
|
||
|
_reg_queue = []
|
||
|
|
||
|
|
||
|
def _chunk(l, n):
|
||
|
for i in range(0, len(l)+1, n):
|
||
|
yield l[i:i + n]
|
||
|
|
||
|
|
||
|
def _write_byte(byte):
|
||
|
for x in range(8):
|
||
|
_next()
|
||
|
_set_bit(LED_CLOCK, 0)
|
||
|
_set_bit(LED_DATA, byte & 0b10000000)
|
||
|
_next()
|
||
|
_set_bit(LED_CLOCK, 1)
|
||
|
byte <<= 1
|
||
|
|
||
|
|
||
|
def on_hold(buttons, handler=None, hold_time=2):
|
||
|
"""Attach a hold handler to one or more buttons.
|
||
|
|
||
|
This handler is fired when you hold a button for hold_time seconds.
|
||
|
|
||
|
When fired it will run in its own Thread.
|
||
|
|
||
|
It will be passed one argument, the button index::
|
||
|
|
||
|
@buttonshim.on_hold(buttonshim.BUTTON_A)
|
||
|
def handler(button):
|
||
|
# Your code here
|
||
|
|
||
|
:param buttons: A single button, or a list of buttons
|
||
|
:param handler: Optional: a function to bind as the handler
|
||
|
:param hold_time: Optional: the hold time in seconds (default 2)
|
||
|
|
||
|
"""
|
||
|
setup()
|
||
|
|
||
|
if buttons is None:
|
||
|
buttons = [BUTTON_A, BUTTON_B, BUTTON_C, BUTTON_D, BUTTON_E]
|
||
|
|
||
|
if isinstance(buttons, int):
|
||
|
buttons = [buttons]
|
||
|
|
||
|
def attach_handler(handler):
|
||
|
for button in buttons:
|
||
|
_handlers[button].hold = handler
|
||
|
_handlers[button].hold_time = hold_time
|
||
|
|
||
|
if handler is not None:
|
||
|
attach_handler(handler)
|
||
|
else:
|
||
|
return attach_handler
|
||
|
|
||
|
|
||
|
def on_press(buttons, handler=None, repeat=False, repeat_time=0.5):
|
||
|
"""Attach a press handler to one or more buttons.
|
||
|
|
||
|
This handler is fired when you press a button.
|
||
|
|
||
|
When fired it will be run in its own Thread.
|
||
|
|
||
|
It will be passed two arguments, the button index and a
|
||
|
boolean indicating whether the button has been pressed/released::
|
||
|
|
||
|
@buttonshim.on_press(buttonshim.BUTTON_A)
|
||
|
def handler(button, pressed):
|
||
|
# Your code here
|
||
|
|
||
|
:param buttons: A single button, or a list of buttons
|
||
|
:param handler: Optional: a function to bind as the handler
|
||
|
:param repeat: Optional: Repeat the handler if the button is held
|
||
|
:param repeat_time: Optional: Time, in seconds, after which to repeat
|
||
|
|
||
|
"""
|
||
|
setup()
|
||
|
|
||
|
if buttons is None:
|
||
|
buttons = [BUTTON_A, BUTTON_B, BUTTON_C, BUTTON_D, BUTTON_E]
|
||
|
|
||
|
if isinstance(buttons, int):
|
||
|
buttons = [buttons]
|
||
|
|
||
|
def attach_handler(handler):
|
||
|
for button in buttons:
|
||
|
_handlers[button].press = handler
|
||
|
_handlers[button].repeat = repeat
|
||
|
_handlers[button].repeat_time = repeat_time
|
||
|
|
||
|
if handler is not None:
|
||
|
attach_handler(handler)
|
||
|
else:
|
||
|
return attach_handler
|
||
|
|
||
|
|
||
|
def on_release(buttons=None, handler=None):
|
||
|
"""Attach a release handler to one or more buttons.
|
||
|
|
||
|
This handler is fired when you let go of a button.
|
||
|
|
||
|
When fired it will be run in its own Thread.
|
||
|
|
||
|
It will be passed two arguments, the button index and a
|
||
|
boolean indicating whether the button has been pressed/released::
|
||
|
|
||
|
@buttonshim.on_release(buttonshim.BUTTON_A)
|
||
|
def handler(button, pressed):
|
||
|
# Your code here
|
||
|
|
||
|
:param buttons: A single button, or a list of buttons
|
||
|
:param handler: Optional: a function to bind as the handler
|
||
|
|
||
|
"""
|
||
|
setup()
|
||
|
|
||
|
if buttons is None:
|
||
|
buttons = [BUTTON_A, BUTTON_B, BUTTON_C, BUTTON_D, BUTTON_E]
|
||
|
|
||
|
if isinstance(buttons, int):
|
||
|
buttons = [buttons]
|
||
|
|
||
|
def attach_handler(handler):
|
||
|
for button in buttons:
|
||
|
_handlers[button].release = handler
|
||
|
|
||
|
if handler is not None:
|
||
|
attach_handler(handler)
|
||
|
else:
|
||
|
return attach_handler
|
||
|
|
||
|
|
||
|
def set_brightness(brightness):
|
||
|
global _brightness
|
||
|
|
||
|
setup()
|
||
|
|
||
|
if not isinstance(brightness, int) and not isinstance(brightness, float):
|
||
|
raise ValueError("Brightness should be an int or float")
|
||
|
|
||
|
if brightness < 0.0 or brightness > 1.0:
|
||
|
raise ValueError("Brightness should be between 0.0 and 1.0")
|
||
|
|
||
|
_brightness = brightness
|
||
|
|
||
|
|
||
|
def set_pixel(r, g, b):
|
||
|
"""Set the Button SHIM RGB pixel
|
||
|
|
||
|
Display an RGB colour on the Button SHIM pixel.
|
||
|
|
||
|
:param r: Amount of red, from 0 to 255
|
||
|
:param g: Amount of green, from 0 to 255
|
||
|
:param b: Amount of blue, from 0 to 255
|
||
|
|
||
|
You can use HTML colours directly with hexadecimal notation in Python. EG::
|
||
|
|
||
|
buttonshim.set_pixel(0xFF, 0x00, 0xFF)
|
||
|
|
||
|
"""
|
||
|
setup()
|
||
|
|
||
|
if not isinstance(r, int) or r < 0 or r > 255:
|
||
|
raise ValueError("Argument r should be an int from 0 to 255")
|
||
|
|
||
|
if not isinstance(g, int) or g < 0 or g > 255:
|
||
|
raise ValueError("Argument g should be an int from 0 to 255")
|
||
|
|
||
|
if not isinstance(b, int) or b < 0 or b > 255:
|
||
|
raise ValueError("Argument b should be an int from 0 to 255")
|
||
|
|
||
|
r, g, b = [int(x * _brightness) for x in (r, g, b)]
|
||
|
|
||
|
_write_byte(0)
|
||
|
_write_byte(0)
|
||
|
_write_byte(0b11101111)
|
||
|
_write_byte(LED_GAMMA[b & 0xff])
|
||
|
_write_byte(LED_GAMMA[g & 0xff])
|
||
|
_write_byte(LED_GAMMA[r & 0xff])
|
||
|
_write_byte(0)
|
||
|
_write_byte(0)
|
||
|
_enqueue()
|
||
|
|
||
|
def blink(r, g, b, ontime, offtime, blinktimes):
|
||
|
logging.info("[buttonshim] Blink")
|
||
|
for i in range(0, blinktimes):
|
||
|
set_pixel(r, g, b)
|
||
|
time.sleep(ontime)
|
||
|
set_pixel(0, 0, 0)
|
||
|
time.sleep(offtime)
|
||
|
|
||
|
def runCommand(button, pressed, plugin):
|
||
|
logging.info(f"[buttonshim] Button Pressed! Loading command from slot '{button}' for button '{NAMES[button]}'")
|
||
|
bCfg = plugin.options['buttons'][NAMES[button]]
|
||
|
blinkCfg = bCfg['blink']
|
||
|
logging.debug(blink)
|
||
|
if blinkCfg['enabled'] == True:
|
||
|
logging.debug(f"[buttonshim] Blinking led")
|
||
|
red = int(blinkCfg['red'])
|
||
|
green = int(blinkCfg['green'])
|
||
|
blue = int(blinkCfg['blue'])
|
||
|
on_time = float(blinkCfg['on_time'])
|
||
|
off_time = float(blinkCfg['off_time'])
|
||
|
blink_times = int(blinkCfg['blink_times'])
|
||
|
logging.debug(f"red {red} green {green} blue {blue} on_time {on_time} off_time {off_time} blink_times {blink_times}")
|
||
|
thread = Thread(target=blink, args=(red, green, blue, on_time, off_time, blink_times))
|
||
|
thread.start()
|
||
|
logging.debug(f"[buttonshim] Blink thread started")
|
||
|
command = bCfg['command']
|
||
|
if command == '':
|
||
|
logging.debug(f"[buttonshim] Command empty")
|
||
|
else:
|
||
|
logging.debug(f"[buttonshim] Process create: {command}")
|
||
|
process = subprocess.Popen(command, shell=True, stdin=None, stdout=open("/dev/null", "w"), stderr=None, executable="/bin/bash")
|
||
|
process.wait()
|
||
|
process = None
|
||
|
logging.debug(f"[buttonshim] Process end")
|
||
|
|
||
|
class Buttonshim(plugins.Plugin):
|
||
|
__author__ = 'gon@o2online.de'
|
||
|
__version__ = '0.0.1'
|
||
|
__license__ = 'GPL3'
|
||
|
__description__ = 'Pimoroni Button Shim GPIO Button and RGB LED support plugin based on the pimoroni-buttonshim-lib and the pwnagotchi-gpio-buttons-plugin'
|
||
|
|
||
|
def __init__(self):
|
||
|
self.running = False
|
||
|
self.options = dict()
|
||
|
global _handlers
|
||
|
_handlers = [Handler(self) for x in range(NUM_BUTTONS)]
|
||
|
on_press([BUTTON_A, BUTTON_B, BUTTON_C, BUTTON_D, BUTTON_E], runCommand)
|
||
|
|
||
|
def on_loaded(self):
|
||
|
logging.info("[buttonshim] GPIO Button plugin loaded.")
|
||
|
self.running = True
|