Merge pull request #275 from dadav/feature/bluetooth-plugin
Add bluetooth plugin
This commit is contained in:
commit
3a14d1d87f
@ -66,6 +66,12 @@ main:
|
|||||||
wordlist_folder: /opt/wordlists/
|
wordlist_folder: /opt/wordlists/
|
||||||
AircrackOnly:
|
AircrackOnly:
|
||||||
enabled: false
|
enabled: false
|
||||||
|
bt-tether:
|
||||||
|
enabled: false # if you want to use this, set ui.display.video.address to 0.0.0.0
|
||||||
|
mac: ~ # mac of your bluetooth device
|
||||||
|
ip: '192.168.44.44' # ip from which your pwnagotchi should be reachable
|
||||||
|
netmask: 24
|
||||||
|
interval: 1 # check every x minutes for device
|
||||||
# monitor interface to use
|
# monitor interface to use
|
||||||
iface: mon0
|
iface: mon0
|
||||||
# command to run to bring the mon interface up in case it's not up already
|
# command to run to bring the mon interface up in case it's not up already
|
||||||
|
408
pwnagotchi/plugins/default/bt-tether.py
Normal file
408
pwnagotchi/plugins/default/bt-tether.py
Normal file
@ -0,0 +1,408 @@
|
|||||||
|
__author__ = '33197631+dadav@users.noreply.github.com'
|
||||||
|
__version__ = '1.0.0'
|
||||||
|
__name__ = 'bt-tether'
|
||||||
|
__license__ = 'GPL3'
|
||||||
|
__description__ = 'This makes the display reachable over bluetooth'
|
||||||
|
|
||||||
|
import os
|
||||||
|
import time
|
||||||
|
import re
|
||||||
|
import logging
|
||||||
|
import subprocess
|
||||||
|
import dbus
|
||||||
|
from pwnagotchi.ui.components import LabeledValue
|
||||||
|
from pwnagotchi.ui.view import BLACK
|
||||||
|
import pwnagotchi.ui.fonts as fonts
|
||||||
|
from pwnagotchi.utils import StatusFile
|
||||||
|
|
||||||
|
READY = False
|
||||||
|
INTERVAL = StatusFile('/root/.bt-tether')
|
||||||
|
OPTIONS = dict()
|
||||||
|
|
||||||
|
|
||||||
|
class BTError(Exception):
|
||||||
|
"""
|
||||||
|
Custom bluetooth exception
|
||||||
|
"""
|
||||||
|
pass
|
||||||
|
|
||||||
|
class BTNap:
|
||||||
|
"""
|
||||||
|
This class creates a bluetooth connection to the specified bt-mac
|
||||||
|
|
||||||
|
see https://github.com/bablokb/pi-btnap/blob/master/files/usr/local/sbin/btnap.service.py
|
||||||
|
"""
|
||||||
|
|
||||||
|
IFACE_BASE = 'org.bluez'
|
||||||
|
IFACE_DEV = 'org.bluez.Device1'
|
||||||
|
IFACE_ADAPTER = 'org.bluez.Adapter1'
|
||||||
|
IFACE_PROPS = 'org.freedesktop.DBus.Properties'
|
||||||
|
|
||||||
|
def __init__(self, mac):
|
||||||
|
self._mac = mac
|
||||||
|
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def get_bus():
|
||||||
|
"""
|
||||||
|
Get systembus obj
|
||||||
|
"""
|
||||||
|
bus = getattr(BTNap.get_bus, 'cached_obj', None)
|
||||||
|
if not bus:
|
||||||
|
bus = BTNap.get_bus.cached_obj = dbus.SystemBus()
|
||||||
|
return bus
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def get_manager():
|
||||||
|
"""
|
||||||
|
Get manager obj
|
||||||
|
"""
|
||||||
|
manager = getattr(BTNap.get_manager, 'cached_obj', None)
|
||||||
|
if not manager:
|
||||||
|
manager = BTNap.get_manager.cached_obj = dbus.Interface(
|
||||||
|
BTNap.get_bus().get_object(BTNap.IFACE_BASE, '/'),
|
||||||
|
'org.freedesktop.DBus.ObjectManager' )
|
||||||
|
return manager
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def prop_get(obj, k, iface=None):
|
||||||
|
"""
|
||||||
|
Get a property of the obj
|
||||||
|
"""
|
||||||
|
if iface is None:
|
||||||
|
iface = obj.dbus_interface
|
||||||
|
return obj.Get(iface, k, dbus_interface=BTNap.IFACE_PROPS)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def prop_set(obj, k, v, iface=None):
|
||||||
|
"""
|
||||||
|
Set a property of the obj
|
||||||
|
"""
|
||||||
|
if iface is None:
|
||||||
|
iface = obj.dbus_interface
|
||||||
|
return obj.Set(iface, k, v, dbus_interface=BTNap.IFACE_PROPS)
|
||||||
|
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def find_adapter(pattern=None):
|
||||||
|
"""
|
||||||
|
Find the bt adapter
|
||||||
|
"""
|
||||||
|
|
||||||
|
return BTNap.find_adapter_in_objects(BTNap.get_manager().GetManagedObjects(), pattern)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def find_adapter_in_objects(objects, pattern=None):
|
||||||
|
"""
|
||||||
|
Finds the obj with a pattern
|
||||||
|
"""
|
||||||
|
bus, obj = BTNap.get_bus(), None
|
||||||
|
for path, ifaces in objects.items():
|
||||||
|
adapter = ifaces.get(BTNap.IFACE_ADAPTER)
|
||||||
|
if adapter is None:
|
||||||
|
continue
|
||||||
|
if not pattern or pattern == adapter['Address'] or path.endswith(pattern):
|
||||||
|
obj = bus.get_object(BTNap.IFACE_BASE, path)
|
||||||
|
yield dbus.Interface(obj, BTNap.IFACE_ADAPTER)
|
||||||
|
if obj is None:
|
||||||
|
raise BTError('Bluetooth adapter not found')
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def find_device(device_address, adapter_pattern=None):
|
||||||
|
"""
|
||||||
|
Finds the device
|
||||||
|
"""
|
||||||
|
return BTNap.find_device_in_objects(BTNap.get_manager().GetManagedObjects(),
|
||||||
|
device_address, adapter_pattern)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def find_device_in_objects(objects, device_address, adapter_pattern=None):
|
||||||
|
"""
|
||||||
|
Finds the device in objects
|
||||||
|
"""
|
||||||
|
bus = BTNap.get_bus()
|
||||||
|
path_prefix = ''
|
||||||
|
if adapter_pattern:
|
||||||
|
if not isinstance(adapter_pattern, str):
|
||||||
|
adapter = adapter_pattern
|
||||||
|
else:
|
||||||
|
adapter = BTNap.find_adapter_in_objects(objects, adapter_pattern)
|
||||||
|
path_prefix = adapter.object_path
|
||||||
|
for path, ifaces in objects.items():
|
||||||
|
device = ifaces.get(BTNap.IFACE_DEV)
|
||||||
|
if device is None:
|
||||||
|
continue
|
||||||
|
if device['Address'] == device_address and path.startswith(path_prefix):
|
||||||
|
obj = bus.get_object(BTNap.IFACE_BASE, path)
|
||||||
|
return dbus.Interface(obj, BTNap.IFACE_DEV)
|
||||||
|
raise BTError('Bluetooth device not found')
|
||||||
|
|
||||||
|
def power(self, on=True):
|
||||||
|
"""
|
||||||
|
Set power of devices to on/off
|
||||||
|
"""
|
||||||
|
|
||||||
|
devs = list(BTNap.find_adapter())
|
||||||
|
devs = dict((BTNap.prop_get(dev, 'Address'), dev) for dev in devs)
|
||||||
|
|
||||||
|
for dev_addr, dev in devs.items():
|
||||||
|
BTNap.prop_set(dev, 'Powered', on)
|
||||||
|
logging.debug('Set power of %s (addr %s) to %s', dev.object_path, dev_addr, str(on))
|
||||||
|
|
||||||
|
if devs:
|
||||||
|
return list(devs.values())[0]
|
||||||
|
|
||||||
|
return None
|
||||||
|
|
||||||
|
def wait_for_device(self, timeout=30):
|
||||||
|
"""
|
||||||
|
Wait for device
|
||||||
|
|
||||||
|
returns device if found None if not
|
||||||
|
"""
|
||||||
|
bt_dev = self.power(True)
|
||||||
|
|
||||||
|
if not bt_dev:
|
||||||
|
return None
|
||||||
|
|
||||||
|
# could be set to 0, so check if > -1
|
||||||
|
while timeout > -1:
|
||||||
|
try:
|
||||||
|
dev_remote = BTNap.find_device(self._mac, bt_dev)
|
||||||
|
logging.debug('Using remote device (addr: %s): %s',
|
||||||
|
BTNap.prop_get(dev_remote, 'Address'), dev_remote.object_path )
|
||||||
|
return dev_remote
|
||||||
|
except BTError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
time.sleep(1)
|
||||||
|
timeout -= 1
|
||||||
|
|
||||||
|
# Device not found :(
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def connect(self, reconnect=False):
|
||||||
|
"""
|
||||||
|
Connect to device
|
||||||
|
|
||||||
|
return True if connected; False if failed
|
||||||
|
"""
|
||||||
|
|
||||||
|
# power up devices
|
||||||
|
bt_dev = self.power(True)
|
||||||
|
if not bt_dev:
|
||||||
|
return False
|
||||||
|
|
||||||
|
# check if device is close
|
||||||
|
dev_remote = self.wait_for_device()
|
||||||
|
|
||||||
|
if not dev_remote:
|
||||||
|
return False
|
||||||
|
|
||||||
|
#wait_iter = lambda: time.sleep(3600)
|
||||||
|
# signal.signal(signal.SIGTERM, lambda sig,frm: sys.exit(0))
|
||||||
|
|
||||||
|
try:
|
||||||
|
dev_remote.ConnectProfile('nap')
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
net = dbus.Interface(dev_remote, 'org.bluez.Network1')
|
||||||
|
|
||||||
|
try:
|
||||||
|
net.Connect('nap')
|
||||||
|
except dbus.exceptions.DBusException as err:
|
||||||
|
if err.get_dbus_name() != 'org.bluez.Error.Failed':
|
||||||
|
raise
|
||||||
|
|
||||||
|
connected = BTNap.prop_get(net, 'Connected')
|
||||||
|
|
||||||
|
if not connected:
|
||||||
|
return False
|
||||||
|
|
||||||
|
if reconnect:
|
||||||
|
net.Disconnect()
|
||||||
|
return self.connect(reconnect=False)
|
||||||
|
|
||||||
|
return True
|
||||||
|
|
||||||
|
|
||||||
|
#################################################
|
||||||
|
#################################################
|
||||||
|
#################################################
|
||||||
|
|
||||||
|
class SystemdUnitWrapper:
|
||||||
|
"""
|
||||||
|
systemd wrapper
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, unit):
|
||||||
|
self.unit = unit
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _action_on_unit(action, unit):
|
||||||
|
process = subprocess.Popen(f"systemctl {action} {unit}", shell=True, stdin=None,
|
||||||
|
stdout=open("/dev/null", "w"), stderr=None, executable="/bin/bash")
|
||||||
|
process.wait()
|
||||||
|
if process.returncode > 0:
|
||||||
|
return False
|
||||||
|
return True
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def daemon_reload():
|
||||||
|
"""
|
||||||
|
Calls systemctl daemon-reload
|
||||||
|
"""
|
||||||
|
process = subprocess.Popen("systemctl daemon-reload", shell=True, stdin=None,
|
||||||
|
stdout=open("/dev/null", "w"), stderr=None, executable="/bin/bash")
|
||||||
|
process.wait()
|
||||||
|
if process.returncode > 0:
|
||||||
|
return False
|
||||||
|
return True
|
||||||
|
|
||||||
|
def is_active(self):
|
||||||
|
"""
|
||||||
|
Checks if unit is active
|
||||||
|
"""
|
||||||
|
return SystemdUnitWrapper._action_on_unit('is-active', self.unit)
|
||||||
|
|
||||||
|
def is_enabled(self):
|
||||||
|
"""
|
||||||
|
Checks if unit is enabled
|
||||||
|
"""
|
||||||
|
return SystemdUnitWrapper._action_on_unit('is-enabled', self.unit)
|
||||||
|
|
||||||
|
def is_failed(self):
|
||||||
|
"""
|
||||||
|
Checks if unit is failed
|
||||||
|
"""
|
||||||
|
return SystemdUnitWrapper._action_on_unit('is-failed', self.unit)
|
||||||
|
|
||||||
|
def enable(self):
|
||||||
|
"""
|
||||||
|
Enables the unit
|
||||||
|
"""
|
||||||
|
return SystemdUnitWrapper._action_on_unit('enable', self.unit)
|
||||||
|
|
||||||
|
def disable(self):
|
||||||
|
"""
|
||||||
|
Disables the unit
|
||||||
|
"""
|
||||||
|
return SystemdUnitWrapper._action_on_unit('disable', self.unit)
|
||||||
|
|
||||||
|
def start(self):
|
||||||
|
"""
|
||||||
|
Starts the unit
|
||||||
|
"""
|
||||||
|
return SystemdUnitWrapper._action_on_unit('start', self.unit)
|
||||||
|
|
||||||
|
def stop(self):
|
||||||
|
"""
|
||||||
|
Stops the unit
|
||||||
|
"""
|
||||||
|
return SystemdUnitWrapper._action_on_unit('stop', self.unit)
|
||||||
|
|
||||||
|
def restart(self):
|
||||||
|
"""
|
||||||
|
Restarts the unit
|
||||||
|
"""
|
||||||
|
return SystemdUnitWrapper._action_on_unit('restart', self.unit)
|
||||||
|
|
||||||
|
|
||||||
|
class IfaceWrapper:
|
||||||
|
"""
|
||||||
|
Small wrapper to check and manage ifaces
|
||||||
|
|
||||||
|
see: https://github.com/rlisagor/pynetlinux/blob/master/pynetlinux/ifconfig.py
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, iface):
|
||||||
|
self.iface = iface
|
||||||
|
self.path = f"/sys/class/net/{iface}"
|
||||||
|
|
||||||
|
def exists(self):
|
||||||
|
"""
|
||||||
|
Checks if iface exists
|
||||||
|
"""
|
||||||
|
return os.path.exists(self.path)
|
||||||
|
|
||||||
|
def is_up(self):
|
||||||
|
"""
|
||||||
|
Checks if iface is ip
|
||||||
|
"""
|
||||||
|
return open(f"{self.path}/operstate", 'r').read().rsplit('\n') == 'up'
|
||||||
|
|
||||||
|
|
||||||
|
def set_addr(self, addr):
|
||||||
|
"""
|
||||||
|
Set the netmask
|
||||||
|
"""
|
||||||
|
process = subprocess.Popen(f"ip addr add {addr} dev {self.iface}", shell=True, stdin=None,
|
||||||
|
stdout=open("/dev/null", "w"), stderr=None, executable="/bin/bash")
|
||||||
|
process.wait()
|
||||||
|
|
||||||
|
if process.returncode == 2 or process.returncode == 0: # 2 = already set
|
||||||
|
return True
|
||||||
|
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
def on_loaded():
|
||||||
|
"""
|
||||||
|
Gets called when the plugin gets loaded
|
||||||
|
"""
|
||||||
|
global READY
|
||||||
|
global INTERVAL
|
||||||
|
|
||||||
|
for opt in ['mac', 'ip', 'netmask', 'interval']:
|
||||||
|
if opt not in OPTIONS or (opt in OPTIONS and OPTIONS[opt] is None):
|
||||||
|
logging.error("BT-TET: Pleace specify the %s in your config.yml.", opt)
|
||||||
|
return
|
||||||
|
|
||||||
|
# ensure bluetooth is running
|
||||||
|
bt_unit = SystemdUnitWrapper('bluetooth.service')
|
||||||
|
if not bt_unit.is_active():
|
||||||
|
if not bt_unit.start():
|
||||||
|
logging.error("BT-TET: Can't start bluetooth.service")
|
||||||
|
return
|
||||||
|
|
||||||
|
INTERVAL.update()
|
||||||
|
READY = True
|
||||||
|
|
||||||
|
|
||||||
|
def on_ui_update(ui):
|
||||||
|
"""
|
||||||
|
Try to connect to device
|
||||||
|
"""
|
||||||
|
|
||||||
|
if READY:
|
||||||
|
global INTERVAL
|
||||||
|
if INTERVAL.newer_then_minutes(OPTIONS['interval']):
|
||||||
|
return
|
||||||
|
|
||||||
|
INTERVAL.update()
|
||||||
|
|
||||||
|
bt = BTNap(OPTIONS['mac'])
|
||||||
|
if bt.connect():
|
||||||
|
btnap_iface = IfaceWrapper('bnep0')
|
||||||
|
|
||||||
|
if btnap_iface.exists():
|
||||||
|
# check ip
|
||||||
|
addr = f"{OPTIONS['ip']}/{OPTIONS['netmask']}"
|
||||||
|
|
||||||
|
if not btnap_iface.set_addr(addr):
|
||||||
|
ui.set('bluetooth', 'ERR1')
|
||||||
|
logging.error("Could not set ip of bnep0 to %s", addr)
|
||||||
|
return
|
||||||
|
|
||||||
|
ui.set('bluetooth', 'CON')
|
||||||
|
else:
|
||||||
|
ui.set('bluetooth', 'ERR2')
|
||||||
|
else:
|
||||||
|
ui.set('bluetooth', 'NF')
|
||||||
|
|
||||||
|
|
||||||
|
def on_ui_setup(ui):
|
||||||
|
ui.add_element('bluetooth', LabeledValue(color=BLACK, label='BT', value='-', position=(ui.width() / 2 - 30, 0),
|
||||||
|
label_font=fonts.Bold, text_font=fonts.Medium))
|
Loading…
x
Reference in New Issue
Block a user