This commit is contained in:
Simone Margaritelli 2019-10-13 17:25:31 +02:00
commit ef31366078
2 changed files with 414 additions and 0 deletions

View File

@ -66,6 +66,12 @@ main:
wordlist_folder: /opt/wordlists/
enabled: false
enabled: false # if you want to use this, set to
mac: ~ # mac of your bluetooth device
ip: '' # ip from which your pwnagotchi should be reachable
netmask: 24
interval: 1 # check every x minutes for device
# monitor interface to use
iface: mon0
# command to run to bring the mon interface up in case it's not up already

View File

@ -0,0 +1,408 @@
__author__ = ''
__version__ = '1.0.0'
__name__ = 'bt-tether'
__license__ = 'GPL3'
__description__ = 'This makes the display reachable over bluetooth'
import os
import time
import re
import logging
import subprocess
import dbus
from pwnagotchi.ui.components import LabeledValue
from pwnagotchi.ui.view import BLACK
import pwnagotchi.ui.fonts as fonts
from pwnagotchi.utils import StatusFile
READY = False
INTERVAL = StatusFile('/root/.bt-tether')
OPTIONS = dict()
class BTError(Exception):
Custom bluetooth exception
class BTNap:
This class creates a bluetooth connection to the specified bt-mac
IFACE_BASE = 'org.bluez'
IFACE_DEV = 'org.bluez.Device1'
IFACE_ADAPTER = 'org.bluez.Adapter1'
IFACE_PROPS = 'org.freedesktop.DBus.Properties'
def __init__(self, mac):
self._mac = mac
def get_bus():
Get systembus obj
bus = getattr(BTNap.get_bus, 'cached_obj', None)
if not bus:
bus = BTNap.get_bus.cached_obj = dbus.SystemBus()
return bus
def get_manager():
Get manager obj
manager = getattr(BTNap.get_manager, 'cached_obj', None)
if not manager:
manager = BTNap.get_manager.cached_obj = dbus.Interface(
BTNap.get_bus().get_object(BTNap.IFACE_BASE, '/'),
'org.freedesktop.DBus.ObjectManager' )
return manager
def prop_get(obj, k, iface=None):
Get a property of the obj
if iface is None:
iface = obj.dbus_interface
return obj.Get(iface, k, dbus_interface=BTNap.IFACE_PROPS)
def prop_set(obj, k, v, iface=None):
Set a property of the obj
if iface is None:
iface = obj.dbus_interface
return obj.Set(iface, k, v, dbus_interface=BTNap.IFACE_PROPS)
def find_adapter(pattern=None):
Find the bt adapter
return BTNap.find_adapter_in_objects(BTNap.get_manager().GetManagedObjects(), pattern)
def find_adapter_in_objects(objects, pattern=None):
Finds the obj with a pattern
bus, obj = BTNap.get_bus(), None
for path, ifaces in objects.items():
adapter = ifaces.get(BTNap.IFACE_ADAPTER)
if adapter is None:
if not pattern or pattern == adapter['Address'] or path.endswith(pattern):
obj = bus.get_object(BTNap.IFACE_BASE, path)
yield dbus.Interface(obj, BTNap.IFACE_ADAPTER)
if obj is None:
raise BTError('Bluetooth adapter not found')
def find_device(device_address, adapter_pattern=None):
Finds the device
return BTNap.find_device_in_objects(BTNap.get_manager().GetManagedObjects(),
device_address, adapter_pattern)
def find_device_in_objects(objects, device_address, adapter_pattern=None):
Finds the device in objects
bus = BTNap.get_bus()
path_prefix = ''
if adapter_pattern:
if not isinstance(adapter_pattern, str):
adapter = adapter_pattern
adapter = BTNap.find_adapter_in_objects(objects, adapter_pattern)
path_prefix = adapter.object_path
for path, ifaces in objects.items():
device = ifaces.get(BTNap.IFACE_DEV)
if device is None:
if device['Address'] == device_address and path.startswith(path_prefix):
obj = bus.get_object(BTNap.IFACE_BASE, path)
return dbus.Interface(obj, BTNap.IFACE_DEV)
raise BTError('Bluetooth device not found')
def power(self, on=True):
Set power of devices to on/off
devs = list(BTNap.find_adapter())
devs = dict((BTNap.prop_get(dev, 'Address'), dev) for dev in devs)
for dev_addr, dev in devs.items():
BTNap.prop_set(dev, 'Powered', on)
logging.debug('Set power of %s (addr %s) to %s', dev.object_path, dev_addr, str(on))
if devs:
return list(devs.values())[0]
return None
def wait_for_device(self, timeout=30):
Wait for device
returns device if found None if not
bt_dev = self.power(True)
if not bt_dev:
return None
# could be set to 0, so check if > -1
while timeout > -1:
dev_remote = BTNap.find_device(self._mac, bt_dev)
logging.debug('Using remote device (addr: %s): %s',
BTNap.prop_get(dev_remote, 'Address'), dev_remote.object_path )
return dev_remote
except BTError:
timeout -= 1
# Device not found :(
return None
def connect(self, reconnect=False):
Connect to device
return True if connected; False if failed
# power up devices
bt_dev = self.power(True)
if not bt_dev:
return False
# check if device is close
dev_remote = self.wait_for_device()
if not dev_remote:
return False
#wait_iter = lambda: time.sleep(3600)
# signal.signal(signal.SIGTERM, lambda sig,frm: sys.exit(0))
except Exception:
net = dbus.Interface(dev_remote, 'org.bluez.Network1')
except dbus.exceptions.DBusException as err:
if err.get_dbus_name() != 'org.bluez.Error.Failed':
connected = BTNap.prop_get(net, 'Connected')
if not connected:
return False
if reconnect:
return self.connect(reconnect=False)
return True
class SystemdUnitWrapper:
systemd wrapper
def __init__(self, unit):
self.unit = unit
def _action_on_unit(action, unit):
process = subprocess.Popen(f"systemctl {action} {unit}", shell=True, stdin=None,
stdout=open("/dev/null", "w"), stderr=None, executable="/bin/bash")
if process.returncode > 0:
return False
return True
def daemon_reload():
Calls systemctl daemon-reload
process = subprocess.Popen("systemctl daemon-reload", shell=True, stdin=None,
stdout=open("/dev/null", "w"), stderr=None, executable="/bin/bash")
if process.returncode > 0:
return False
return True
def is_active(self):
Checks if unit is active
return SystemdUnitWrapper._action_on_unit('is-active', self.unit)
def is_enabled(self):
Checks if unit is enabled
return SystemdUnitWrapper._action_on_unit('is-enabled', self.unit)
def is_failed(self):
Checks if unit is failed
return SystemdUnitWrapper._action_on_unit('is-failed', self.unit)
def enable(self):
Enables the unit
return SystemdUnitWrapper._action_on_unit('enable', self.unit)
def disable(self):
Disables the unit
return SystemdUnitWrapper._action_on_unit('disable', self.unit)
def start(self):
Starts the unit
return SystemdUnitWrapper._action_on_unit('start', self.unit)
def stop(self):
Stops the unit
return SystemdUnitWrapper._action_on_unit('stop', self.unit)
def restart(self):
Restarts the unit
return SystemdUnitWrapper._action_on_unit('restart', self.unit)
class IfaceWrapper:
Small wrapper to check and manage ifaces
def __init__(self, iface):
self.iface = iface
self.path = f"/sys/class/net/{iface}"
def exists(self):
Checks if iface exists
return os.path.exists(self.path)
def is_up(self):
Checks if iface is ip
return open(f"{self.path}/operstate", 'r').read().rsplit('\n') == 'up'
def set_addr(self, addr):
Set the netmask
process = subprocess.Popen(f"ip addr add {addr} dev {self.iface}", shell=True, stdin=None,
stdout=open("/dev/null", "w"), stderr=None, executable="/bin/bash")
if process.returncode == 2 or process.returncode == 0: # 2 = already set
return True
return False
def on_loaded():
Gets called when the plugin gets loaded
global READY
for opt in ['mac', 'ip', 'netmask', 'interval']:
if opt not in OPTIONS or (opt in OPTIONS and OPTIONS[opt] is None):
logging.error("BT-TET: Pleace specify the %s in your config.yml.", opt)
# ensure bluetooth is running
bt_unit = SystemdUnitWrapper('bluetooth.service')
if not bt_unit.is_active():
if not bt_unit.start():
logging.error("BT-TET: Can't start bluetooth.service")
READY = True
def on_ui_update(ui):
Try to connect to device
if INTERVAL.newer_then_minutes(OPTIONS['interval']):
bt = BTNap(OPTIONS['mac'])
if bt.connect():
btnap_iface = IfaceWrapper('bnep0')
if btnap_iface.exists():
# check ip
addr = f"{OPTIONS['ip']}/{OPTIONS['netmask']}"
if not btnap_iface.set_addr(addr):
ui.set('bluetooth', 'ERR1')
logging.error("Could not set ip of bnep0 to %s", addr)
ui.set('bluetooth', 'CON')
ui.set('bluetooth', 'ERR2')
ui.set('bluetooth', 'NF')
def on_ui_setup(ui):
ui.add_element('bluetooth', LabeledValue(color=BLACK, label='BT', value='-', position=(ui.width() / 2 - 30, 0),
label_font=fonts.Bold, text_font=fonts.Medium))