From 2f948306ebeabf10050bb0c07a3254b420c30751 Mon Sep 17 00:00:00 2001 From: Simone Margaritelli Date: Fri, 1 Nov 2019 13:51:45 +0100 Subject: [PATCH] misc: refactored plugin system to use classes --- pwnagotchi/plugins/__init__.py | 64 +++-- pwnagotchi/plugins/default/AircrackOnly.py | 81 +++--- pwnagotchi/plugins/default/auto-backup.py | 74 +++-- pwnagotchi/plugins/default/auto-update.py | 57 ++-- pwnagotchi/plugins/default/bt-tether.py | 132 ++++----- pwnagotchi/plugins/default/example.py | 258 ++++++++---------- pwnagotchi/plugins/default/gpio_buttons.py | 64 ++--- pwnagotchi/plugins/default/gps.py | 65 +++-- pwnagotchi/plugins/default/grid.py | 171 ++++++------ pwnagotchi/plugins/default/memtemp.py | 60 ++-- pwnagotchi/plugins/default/net-pos.py | 222 ++++++++------- pwnagotchi/plugins/default/onlinehashcrack.py | 138 +++++----- pwnagotchi/plugins/default/paw-gps.py | 30 +- pwnagotchi/plugins/default/quickdic.py | 77 +++--- pwnagotchi/plugins/default/screen_refresh.py | 37 ++- pwnagotchi/plugins/default/twitter.py | 77 +++--- .../plugins/default/unfiltered_example.py | 22 -- pwnagotchi/plugins/default/ups_lite.py | 32 +-- pwnagotchi/plugins/default/wigle.py | 192 +++++++------ pwnagotchi/plugins/default/wpa-sec.py | 139 +++++----- 20 files changed, 943 insertions(+), 1049 deletions(-) delete mode 100644 pwnagotchi/plugins/default/unfiltered_example.py diff --git a/pwnagotchi/plugins/__init__.py b/pwnagotchi/plugins/__init__.py index cb58323..a183294 100644 --- a/pwnagotchi/plugins/__init__.py +++ b/pwnagotchi/plugins/__init__.py @@ -7,20 +7,20 @@ default_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), "defaul loaded = {} -def dummy_callback(): - pass +class Plugin: + @classmethod + def __init_subclass__(cls, **kwargs): + super().__init_subclass__(**kwargs) + global loaded + plugin_name = cls.__module__.split('.')[0] + plugin_instance = cls() + logging.debug("loaded plugin %s as %s" % (plugin_name, plugin_instance)) + loaded[plugin_name] = plugin_instance def on(event_name, *args, **kwargs): - global loaded - cb_name = 'on_%s' % event_name for plugin_name, plugin in loaded.items(): - if cb_name in plugin.__dict__: - try: - plugin.__dict__[cb_name](*args, **kwargs) - except Exception as e: - logging.error("error while running %s.%s : %s" % (plugin_name, cb_name, e)) - logging.error(e, exc_info=True) + one(plugin_name, event_name, *args, **kwargs) def one(plugin_name, event_name, *args, **kwargs): @@ -28,15 +28,17 @@ def one(plugin_name, event_name, *args, **kwargs): if plugin_name in loaded: plugin = loaded[plugin_name] cb_name = 'on_%s' % event_name - if cb_name in plugin.__dict__: + callback = getattr(plugin, cb_name, None) + if callback is not None and callable(callback): try: - plugin.__dict__[cb_name](*args, **kwargs) + callback(*args, **kwargs) except Exception as e: logging.error("error while running %s.%s : %s" % (plugin_name, cb_name, e)) logging.error(e, exc_info=True) def load_from_file(filename): + logging.debug("loading %s" % filename) plugin_name = os.path.basename(filename.replace(".py", "")) spec = importlib.util.spec_from_file_location(plugin_name, filename) instance = importlib.util.module_from_spec(spec) @@ -46,19 +48,15 @@ def load_from_file(filename): def load_from_path(path, enabled=()): global loaded + logging.debug("loading plugins from %s - enabled: %s" % (path, enabled)) for filename in glob.glob(os.path.join(path, "*.py")): - try: - name, plugin = load_from_file(filename) - if name in loaded: - raise Exception("plugin %s already loaded from %s" % (name, plugin.__file__)) - elif name not in enabled: - # print("plugin %s is not enabled" % name) - pass - else: - loaded[name] = plugin - except Exception as e: - logging.warning("error while loading %s: %s" % (filename, e)) - logging.debug(e, exc_info=True) + plugin_name = os.path.basename(filename.replace(".py", "")) + if plugin_name in enabled: + try: + load_from_file(filename) + except Exception as e: + logging.warning("error while loading %s: %s" % (filename, e)) + logging.debug(e, exc_info=True) return loaded @@ -66,17 +64,17 @@ def load_from_path(path, enabled=()): def load(config): enabled = [name for name, options in config['main']['plugins'].items() if 'enabled' in options and options['enabled']] - custom_path = config['main']['custom_plugins'] if 'custom_plugins' in config['main'] else None + # load default plugins - loaded = load_from_path(default_path, enabled=enabled) - # set the options - for name, plugin in loaded.items(): - plugin.__dict__['OPTIONS'] = config['main']['plugins'][name] + load_from_path(default_path, enabled=enabled) + # load custom ones + custom_path = config['main']['custom_plugins'] if 'custom_plugins' in config['main'] else None if custom_path is not None: - loaded = load_from_path(custom_path, enabled=enabled) - # set the options - for name, plugin in loaded.items(): - plugin.__dict__['OPTIONS'] = config['main']['plugins'][name] + load_from_path(custom_path, enabled=enabled) + + # propagate options + for name, plugin in loaded.items(): + plugin.options = config['main']['plugins'][name] on('loaded') diff --git a/pwnagotchi/plugins/default/AircrackOnly.py b/pwnagotchi/plugins/default/AircrackOnly.py index b6d2028..4691e68 100644 --- a/pwnagotchi/plugins/default/AircrackOnly.py +++ b/pwnagotchi/plugins/default/AircrackOnly.py @@ -1,56 +1,57 @@ -__author__ = 'pwnagotchi [at] rossmarks [dot] uk' -__version__ = '1.0.1' -__name__ = 'AircrackOnly' -__license__ = 'GPL3' -__description__ = 'confirm pcap contains handshake/PMKID or delete it' - -''' -Aircrack-ng needed, to install: -> apt-get install aircrack-ng -''' +import pwnagotchi.plugins as plugins import logging import subprocess import string import os -OPTIONS = dict() +''' +Aircrack-ng needed, to install: +> apt-get install aircrack-ng +''' -def on_loaded(): - logging.info("aircrackonly plugin loaded") -def on_handshake(agent, filename, access_point, client_station): - display = agent._view - todelete = 0 +class AircrackOnly(plugins.Plugin): + __author__ = 'pwnagotchi [at] rossmarks [dot] uk' + __version__ = '1.0.1' + __license__ = 'GPL3' + __description__ = 'confirm pcap contains handshake/PMKID or delete it' - result = subprocess.run(('/usr/bin/aircrack-ng '+ filename +' | grep "1 handshake" | awk \'{print $2}\''),shell=True, stdout=subprocess.PIPE) - result = result.stdout.decode('utf-8').translate({ord(c) :None for c in string.whitespace}) - if result: - logging.info("[AircrackOnly] contains handshake") - else: - todelete = 1 + def __init__(self): + super().__init__(self) + self.text_to_set = "" - if todelete == 0: - result = subprocess.run(('/usr/bin/aircrack-ng '+ filename +' | grep "PMKID" | awk \'{print $2}\''),shell=True, stdout=subprocess.PIPE) - result = result.stdout.decode('utf-8').translate({ord(c) :None for c in string.whitespace}) + def on_loaded(self): + logging.info("aircrackonly plugin loaded") + + def on_handshake(self, agent, filename, access_point, client_station): + display = agent._view + todelete = 0 + + result = subprocess.run(('/usr/bin/aircrack-ng ' + filename + ' | grep "1 handshake" | awk \'{print $2}\''), + shell=True, stdout=subprocess.PIPE) + result = result.stdout.decode('utf-8').translate({ord(c): None for c in string.whitespace}) if result: - logging.info("[AircrackOnly] contains PMKID") + logging.info("[AircrackOnly] contains handshake") else: todelete = 1 - if todelete == 1: - os.remove(filename) - set_text("Removed an uncrackable pcap") - display.update(force=True) + if todelete == 0: + result = subprocess.run(('/usr/bin/aircrack-ng ' + filename + ' | grep "PMKID" | awk \'{print $2}\''), + shell=True, stdout=subprocess.PIPE) + result = result.stdout.decode('utf-8').translate({ord(c): None for c in string.whitespace}) + if result: + logging.info("[AircrackOnly] contains PMKID") + else: + todelete = 1 -text_to_set = ""; -def set_text(text): - global text_to_set - text_to_set = text + if todelete == 1: + os.remove(filename) + self.text_to_set = "Removed an uncrackable pcap" + display.update(force=True) -def on_ui_update(ui): - global text_to_set - if text_to_set: - ui.set('face', "(>.<)") - ui.set('status', text_to_set) - text_to_set = "" + def on_ui_update(self, ui): + if self.text_to_set: + ui.set('face', "(>.<)") + ui.set('status', self.text_to_set) + self.text_to_set = "" diff --git a/pwnagotchi/plugins/default/auto-backup.py b/pwnagotchi/plugins/default/auto-backup.py index c235f78..9b15efc 100644 --- a/pwnagotchi/plugins/default/auto-backup.py +++ b/pwnagotchi/plugins/default/auto-backup.py @@ -1,49 +1,47 @@ -__author__ = '33197631+dadav@users.noreply.github.com' -__version__ = '1.0.0' -__name__ = 'auto-backup' -__license__ = 'GPL3' -__description__ = 'This plugin backups files when internet is available.' - +import pwnagotchi.plugins as plugins from pwnagotchi.utils import StatusFile import logging import os import subprocess -OPTIONS = dict() -READY = False -STATUS = StatusFile('/root/.auto-backup') +class AutoBackup(plugins.Plugin): + __author__ = '33197631+dadav@users.noreply.github.com' + __version__ = '1.0.0' + __license__ = 'GPL3' + __description__ = 'This plugin backups files when internet is available.' -def on_loaded(): - global READY + def __init__(self): + self.ready = False + self.status = StatusFile('/root/.auto-backup') - if 'files' not in OPTIONS or ('files' in OPTIONS and OPTIONS['files'] is None): - logging.error("AUTO-BACKUP: No files to backup.") - return - - if 'interval' not in OPTIONS or ('interval' in OPTIONS and OPTIONS['interval'] is None): - logging.error("AUTO-BACKUP: Interval is not set.") - return - - if 'commands' not in OPTIONS or ('commands' in OPTIONS and OPTIONS['commands'] is None): - logging.error("AUTO-BACKUP: No commands given.") - return - - READY = True - logging.info("AUTO-BACKUP: Successfully loaded.") - - -def on_internet_available(agent): - global STATUS - - if READY: - if STATUS.newer_then_days(OPTIONS['interval']): + def on_loaded(self): + if 'files' not in self.options or ('files' in self.options and self.options['files'] is None): + logging.error("AUTO-BACKUP: No files to backup.") return - + + if 'interval' not in self.options or ('interval' in self.options and self.options['interval'] is None): + logging.error("AUTO-BACKUP: Interval is not set.") + return + + if 'commands' not in self.options or ('commands' in self.options and self.options['commands'] is None): + logging.error("AUTO-BACKUP: No commands given.") + return + + self.ready = True + logging.info("AUTO-BACKUP: Successfully loaded.") + + def on_internet_available(self, agent): + if not self.ready: + return + + if self.status.newer_then_days(self.options['interval']): + return + # Only backup existing files to prevent errors - existing_files = list(filter(lambda f: os.path.exists(f), OPTIONS['files'])) + existing_files = list(filter(lambda f: os.path.exists(f), self.options['files'])) files_to_backup = " ".join(existing_files) - + try: display = agent.view() @@ -51,10 +49,10 @@ def on_internet_available(agent): display.set('status', 'Backing up ...') display.update() - for cmd in OPTIONS['commands']: + for cmd in self.options['commands']: logging.info(f"AUTO-BACKUP: Running {cmd.format(files=files_to_backup)}") process = subprocess.Popen(cmd.format(files=files_to_backup), shell=True, stdin=None, - stdout=open("/dev/null", "w"), stderr=None, executable="/bin/bash") + stdout=open("/dev/null", "w"), stderr=None, executable="/bin/bash") process.wait() if process.returncode > 0: raise OSError(f"Command failed (rc: {process.returncode})") @@ -62,7 +60,7 @@ def on_internet_available(agent): logging.info("AUTO-BACKUP: backup done") display.set('status', 'Backup done!') display.update() - STATUS.update() + self.status.update() except OSError as os_e: logging.info(f"AUTO-BACKUP: Error: {os_e}") display.set('status', 'Backup failed!') diff --git a/pwnagotchi/plugins/default/auto-update.py b/pwnagotchi/plugins/default/auto-update.py index 3365490..0a4b0ff 100644 --- a/pwnagotchi/plugins/default/auto-update.py +++ b/pwnagotchi/plugins/default/auto-update.py @@ -1,9 +1,3 @@ -__author__ = 'evilsocket@gmail.com' -__version__ = '1.1.1' -__name__ = 'auto-update' -__license__ = 'GPL3' -__description__ = 'This plugin checks when updates are available and applies them when internet is available.' - import os import re import logging @@ -15,21 +9,9 @@ import glob import pkg_resources import pwnagotchi +import pwnagotchi.plugins as plugins from pwnagotchi.utils import StatusFile -OPTIONS = dict() -READY = False -STATUS = StatusFile('/root/.auto-update') - - -def on_loaded(): - global READY - if 'interval' not in OPTIONS or ('interval' in OPTIONS and OPTIONS['interval'] is None): - logging.error("[update] main.plugins.auto-update.interval is not set") - return - READY = True - logging.info("[update] plugin loaded.") - def check(version, repo, native=True): logging.debug("checking remote version for %s, local is %s" % (repo, version)) @@ -158,14 +140,32 @@ def parse_version(cmd): raise Exception('could not parse version from "%s": output=\n%s' % (cmd, out)) -def on_internet_available(agent): - global STATUS +class AutoUpdate(plugins.Plugin): + __author__ = 'evilsocket@gmail.com' + __version__ = '1.1.1' + __name__ = 'auto-update' + __license__ = 'GPL3' + __description__ = 'This plugin checks when updates are available and applies them when internet is available.' - logging.debug("[update] internet connectivity is available (ready %s)" % READY) + def __init__(self): + self.ready = False + self.status = StatusFile('/root/.auto-update') - if READY: - if STATUS.newer_then_hours(OPTIONS['interval']): - logging.debug("[update] last check happened less than %d hours ago" % OPTIONS['interval']) + def on_loaded(self): + if 'interval' not in self.options or ('interval' in self.options and self.options['interval'] is None): + logging.error("[update] main.plugins.auto-update.interval is not set") + return + self.ready = True + logging.info("[update] plugin loaded.") + + def on_internet_available(self, agent): + logging.debug("[update] internet connectivity is available (ready %s)" % self.ready) + + if not self.ready: + return + + if self.status.newer_then_hours(self.options['interval']): + logging.debug("[update] last check happened less than %d hours ago" % self.options['interval']) return logging.info("[update] checking for updates ...") @@ -187,7 +187,8 @@ def on_internet_available(agent): info = check(local_version, repo, is_native) if info['url'] is not None: logging.warning( - "update for %s available (local version is '%s'): %s" % (repo, info['current'], info['url'])) + "update for %s available (local version is '%s'): %s" % ( + repo, info['current'], info['url'])) info['service'] = svc_name to_install.append(info) @@ -195,7 +196,7 @@ def on_internet_available(agent): num_installed = 0 if num_updates > 0: - if OPTIONS['install']: + if self.options['install']: for update in to_install: if install(display, update): num_installed += 1 @@ -204,7 +205,7 @@ def on_internet_available(agent): logging.info("[update] done") - STATUS.update() + self.status.update() if num_installed > 0: display.update(force=True, new_data={'status': 'Rebooting ...'}) diff --git a/pwnagotchi/plugins/default/bt-tether.py b/pwnagotchi/plugins/default/bt-tether.py index 2da90cf..7c89387 100644 --- a/pwnagotchi/plugins/default/bt-tether.py +++ b/pwnagotchi/plugins/default/bt-tether.py @@ -1,9 +1,3 @@ -__author__ = '33197631+dadav@users.noreply.github.com' -__version__ = '1.0.0' -__name__ = 'bt-tether' -__license__ = 'GPL3' -__description__ = 'This makes the display reachable over bluetooth' - import os import time import re @@ -14,11 +8,8 @@ from pwnagotchi.ui.components import LabeledValue from pwnagotchi.ui.view import BLACK import pwnagotchi.ui.fonts as fonts from pwnagotchi.utils import StatusFile +import pwnagotchi.plugins as plugins -READY = False -INTERVAL = StatusFile('/root/.bt-tether') -OPTIONS = dict() -NETWORK = None class BTError(Exception): """ @@ -26,6 +17,7 @@ class BTError(Exception): """ pass + class BTNap: """ This class creates a bluetooth connection to the specified bt-mac @@ -41,7 +33,6 @@ class BTNap: def __init__(self, mac): self._mac = mac - @staticmethod def get_bus(): """ @@ -59,9 +50,9 @@ class BTNap: """ manager = getattr(BTNap.get_manager, 'cached_obj', None) if not manager: - manager = BTNap.get_manager.cached_obj = dbus.Interface( - BTNap.get_bus().get_object(BTNap.IFACE_BASE, '/'), - 'org.freedesktop.DBus.ObjectManager' ) + manager = BTNap.get_manager.cached_obj = dbus.Interface( + BTNap.get_bus().get_object(BTNap.IFACE_BASE, '/'), + 'org.freedesktop.DBus.ObjectManager') return manager @staticmethod @@ -82,7 +73,6 @@ class BTNap: iface = obj.dbus_interface return obj.Set(iface, k, v, dbus_interface=BTNap.IFACE_PROPS) - @staticmethod def find_adapter(pattern=None): """ @@ -98,14 +88,14 @@ class BTNap: """ bus, obj = BTNap.get_bus(), None for path, ifaces in objects.items(): - adapter = ifaces.get(BTNap.IFACE_ADAPTER) - if adapter is None: - continue - if not pattern or pattern == adapter['Address'] or path.endswith(pattern): - obj = bus.get_object(BTNap.IFACE_BASE, path) - yield dbus.Interface(obj, BTNap.IFACE_ADAPTER) + adapter = ifaces.get(BTNap.IFACE_ADAPTER) + if adapter is None: + continue + if not pattern or pattern == adapter['Address'] or path.endswith(pattern): + obj = bus.get_object(BTNap.IFACE_BASE, path) + yield dbus.Interface(obj, BTNap.IFACE_ADAPTER) if obj is None: - raise BTError('Bluetooth adapter not found') + raise BTError('Bluetooth adapter not found') @staticmethod def find_device(device_address, adapter_pattern=None): @@ -178,7 +168,6 @@ class BTNap: logging.debug("BT-TETHER: Device is not connected.") return None, False - def is_paired(self): """ Check if already connected @@ -198,7 +187,6 @@ class BTNap: logging.debug("BT-TETHER: Device is not paired.") return False - def wait_for_device(self, timeout=15): """ Wait for device @@ -227,7 +215,7 @@ class BTNap: try: dev_remote = BTNap.find_device(self._mac, bt_dev) logging.debug("BT-TETHER: Using remote device (addr: %s): %s", - BTNap.prop_get(dev_remote, 'Address'), dev_remote.object_path ) + BTNap.prop_get(dev_remote, 'Address'), dev_remote.object_path) break except BTError: logging.debug("BT-TETHER: Not found yet ...") @@ -259,7 +247,6 @@ class BTNap: pass return False - @staticmethod def nap(device): logging.debug('BT-TETHER: Trying to nap ...') @@ -267,7 +254,7 @@ class BTNap: try: logging.debug('BT-TETHER: Connecting to profile ...') device.ConnectProfile('nap') - except Exception: # raises exception, but still works + except Exception: # raises exception, but still works pass net = dbus.Interface(device, 'org.bluez.Network1') @@ -297,7 +284,7 @@ class SystemdUnitWrapper: @staticmethod def _action_on_unit(action, unit): process = subprocess.Popen(f"systemctl {action} {unit}", shell=True, stdin=None, - stdout=open("/dev/null", "w"), stderr=None, executable="/bin/bash") + stdout=open("/dev/null", "w"), stderr=None, executable="/bin/bash") process.wait() if process.returncode > 0: return False @@ -309,7 +296,7 @@ class SystemdUnitWrapper: Calls systemctl daemon-reload """ process = subprocess.Popen("systemctl daemon-reload", shell=True, stdin=None, - stdout=open("/dev/null", "w"), stderr=None, executable="/bin/bash") + stdout=open("/dev/null", "w"), stderr=None, executable="/bin/bash") process.wait() if process.returncode > 0: return False @@ -387,16 +374,15 @@ class IfaceWrapper: """ return open(f"{self.path}/operstate", 'r').read().rsplit('\n') == 'up' - def set_addr(self, addr): """ Set the netmask """ process = subprocess.Popen(f"ip addr add {addr} dev {self.iface}", shell=True, stdin=None, - stdout=open("/dev/null", "w"), stderr=None, executable="/bin/bash") + stdout=open("/dev/null", "w"), stderr=None, executable="/bin/bash") process.wait() - if process.returncode == 2 or process.returncode == 0: # 2 = already set + if process.returncode == 2 or process.returncode == 0: # 2 = already set return True return False @@ -404,7 +390,7 @@ class IfaceWrapper: @staticmethod def set_route(addr): process = subprocess.Popen(f"ip route replace default via {addr}", shell=True, stdin=None, - stdout=open("/dev/null", "w"), stderr=None, executable="/bin/bash") + stdout=open("/dev/null", "w"), stderr=None, executable="/bin/bash") process.wait() if process.returncode > 0: @@ -413,44 +399,47 @@ class IfaceWrapper: return True +class BTTether(plugins.Plugin): + __author__ = '33197631+dadav@users.noreply.github.com' + __version__ = '1.0.0' + __license__ = 'GPL3' + __description__ = 'This makes the display reachable over bluetooth' -def on_loaded(): - """ - Gets called when the plugin gets loaded - """ - global READY - global INTERVAL + def __init__(self): + self.ready = False + self.interval = StatusFile('/root/.bt-tether') + self.network = None - for opt in ['share_internet', 'mac', 'ip', 'netmask', 'interval']: - if opt not in OPTIONS or (opt in OPTIONS and OPTIONS[opt] is None): - logging.error("BT-TET: Please specify the %s in your config.yml.", opt) + def on_loaded(self): + for opt in ['share_internet', 'mac', 'ip', 'netmask', 'interval']: + if opt not in self.options or (opt in self.options and self.options[opt] is None): + logging.error("BT-TET: Please specify the %s in your config.yml.", opt) + return + + # ensure bluetooth is running + bt_unit = SystemdUnitWrapper('bluetooth.service') + if not bt_unit.is_active(): + if not bt_unit.start(): + logging.error("BT-TET: Can't start bluetooth.service") + return + + self.interval.update() + self.ready = True + + def on_ui_setup(self, ui): + ui.add_element('bluetooth', LabeledValue(color=BLACK, label='BT', value='-', position=(ui.width() / 2 - 15, 0), + label_font=fonts.Bold, text_font=fonts.Medium)) + + def on_ui_update(self, ui): + if not self.ready: return - # ensure bluetooth is running - bt_unit = SystemdUnitWrapper('bluetooth.service') - if not bt_unit.is_active(): - if not bt_unit.start(): - logging.error("BT-TET: Can't start bluetooth.service") + if self.interval.newer_then_minutes(self.options['interval']): return - INTERVAL.update() - READY = True + self.interval.update() - -def on_ui_update(ui): - """ - Try to connect to device - """ - - if READY: - global INTERVAL - global NETWORK - if INTERVAL.newer_then_minutes(OPTIONS['interval']): - return - - INTERVAL.update() - - bt = BTNap(OPTIONS['mac']) + bt = BTNap(self.options['mac']) logging.debug('BT-TETHER: Check if already connected and paired') dev_remote, connected = bt.is_connected() @@ -483,14 +472,13 @@ def on_ui_update(ui): else: logging.debug('BT-TETHER: Already paired.') - btnap_iface = IfaceWrapper('bnep0') logging.debug('BT-TETHER: Check interface') if not btnap_iface.exists(): # connected and paired but not napping logging.debug('BT-TETHER: Try to connect to nap ...') network, status = BTNap.nap(dev_remote) - NETWORK = network + self.network = network if status: logging.info('BT-TETHER: Napping!') ui.set('bluetooth', 'C') @@ -504,7 +492,7 @@ def on_ui_update(ui): logging.debug('BT-TETHER: Interface found') # check ip - addr = f"{OPTIONS['ip']}/{OPTIONS['netmask']}" + addr = f"{self.options['ip']}/{self.options['netmask']}" logging.debug('BT-TETHER: Try to set ADDR to interface') if not btnap_iface.set_addr(addr): @@ -515,9 +503,10 @@ def on_ui_update(ui): logging.debug('BT-TETHER: Set ADDR to interface') # change route if sharking - if OPTIONS['share_internet']: + if self.options['share_internet']: logging.debug('BT-TETHER: Set routing and change resolv.conf') - IfaceWrapper.set_route(".".join(OPTIONS['ip'].split('.')[:-1] + ['1'])) # im not proud about that + IfaceWrapper.set_route( + ".".join(self.options['ip'].split('.')[:-1] + ['1'])) # im not proud about that # fix resolv.conf; dns over https ftw! with open('/etc/resolv.conf', 'r+') as resolv: nameserver = resolv.read() @@ -530,8 +519,3 @@ def on_ui_update(ui): else: logging.error('BT-TETHER: bnep0 not found') ui.set('bluetooth', 'BE') - - -def on_ui_setup(ui): - ui.add_element('bluetooth', LabeledValue(color=BLACK, label='BT', value='-', position=(ui.width() / 2 - 15, 0), - label_font=fonts.Bold, text_font=fonts.Medium)) diff --git a/pwnagotchi/plugins/default/example.py b/pwnagotchi/plugins/default/example.py index 72087cf..3aa5c7d 100644 --- a/pwnagotchi/plugins/default/example.py +++ b/pwnagotchi/plugins/default/example.py @@ -1,182 +1,154 @@ -__author__ = 'evilsocket@gmail.com' -__version__ = '1.0.0' -__name__ = 'hello_world' -__license__ = 'GPL3' -__description__ = 'An example plugin for pwnagotchi that implements all the available callbacks.' - import logging +import pwnagotchi.plugins as plugins from pwnagotchi.ui.components import LabeledValue from pwnagotchi.ui.view import BLACK import pwnagotchi.ui.fonts as fonts -# Will be set with the options in config.yml config['main']['plugins'][__name__] -OPTIONS = dict() +class Example(plugins.Plugin): + __author__ = 'evilsocket@gmail.com' + __version__ = '1.0.0' + __license__ = 'GPL3' + __description__ = 'An example plugin for pwnagotchi that implements all the available callbacks.' -# called when :/plugins/ is opened -def on_webhook(response, path): - res = "Hook triggered" - response.send_response(200) - response.send_header('Content-type', 'text/html') - response.end_headers() + def __init__(self): + logging.debug("example plugin created") - try: - response.wfile.write(bytes(res, "utf-8")) - except Exception as ex: - logging.error(ex) + # called when the plugin is loaded + def on_loaded(self): + logging.warning("WARNING: this plugin should be disabled! options = " % self.options) -# called when the plugin is loaded -def on_loaded(): - logging.warning("WARNING: plugin %s should be disabled!" % __name__) + # called when :/plugins/ is opened + def on_webhook(self, response, path): + res = "Hook triggered" + response.send_response(200) + response.send_header('Content-type', 'text/html') + response.end_headers() + try: + response.wfile.write(bytes(res, "utf-8")) + except Exception as ex: + logging.error(ex) -# called in manual mode when there's internet connectivity -def on_internet_available(agent): - pass + # called in manual mode when there's internet connectivity + def on_internet_available(self, agent): + pass + # called to setup the ui elements + def on_ui_setup(self, ui): + # add custom UI elements + ui.add_element('ups', LabeledValue(color=BLACK, label='UPS', value='0%/0V', position=(ui.width() / 2 - 25, 0), + label_font=fonts.Bold, text_font=fonts.Medium)) -# called to setup the ui elements -def on_ui_setup(ui): - # add custom UI elements - ui.add_element('ups', LabeledValue(color=BLACK, label='UPS', value='0%/0V', position=(ui.width() / 2 - 25, 0), - label_font=fonts.Bold, text_font=fonts.Medium)) + # called when the ui is updated + def on_ui_update(self, ui): + # update those elements + some_voltage = 0.1 + some_capacity = 100.0 + ui.set('ups', "%4.2fV/%2i%%" % (some_voltage, some_capacity)) + # called when the hardware display setup is done, display is an hardware specific object + def on_display_setup(self, display): + pass -# called when the ui is updated -def on_ui_update(ui): - # update those elements - some_voltage = 0.1 - some_capacity = 100.0 + # called when everything is ready and the main loop is about to start + def on_ready(self, agent): + logging.info("unit is ready") + # you can run custom bettercap commands if you want + # agent.run('ble.recon on') + # or set a custom state + # agent.set_bored() - ui.set('ups', "%4.2fV/%2i%%" % (some_voltage, some_capacity)) + # called when the AI finished loading + def on_ai_ready(self, agent): + pass + # called when the AI finds a new set of parameters + def on_ai_policy(self, agent, policy): + pass -# called when the hardware display setup is done, display is an hardware specific object -def on_display_setup(display): - pass + # called when the AI starts training for a given number of epochs + def on_ai_training_start(self, agent, epochs): + pass + # called after the AI completed a training epoch + def on_ai_training_step(self, agent, _locals, _globals): + pass -# called when everything is ready and the main loop is about to start -def on_ready(agent): - logging.info("unit is ready") - # you can run custom bettercap commands if you want - # agent.run('ble.recon on') - # or set a custom state - # agent.set_bored() + # called when the AI has done training + def on_ai_training_end(self, agent): + pass + # called when the AI got the best reward so far + def on_ai_best_reward(self, agent, reward): + pass -# called when the AI finished loading -def on_ai_ready(agent): - pass + # called when the AI got the worst reward so far + def on_ai_worst_reward(self, agent, reward): + pass + # called when a non overlapping wifi channel is found to be free + def on_free_channel(self, agent, channel): + pass -# called when the AI finds a new set of parameters -def on_ai_policy(agent, policy): - pass + # called when the status is set to bored + def on_bored(self, agent): + pass + # called when the status is set to sad + def on_sad(self, agent): + pass -# called when the AI starts training for a given number of epochs -def on_ai_training_start(agent, epochs): - pass + # called when the status is set to excited + def on_excited(aself, gent): + pass + # called when the status is set to lonely + def on_lonely(self, agent): + pass -# called after the AI completed a training epoch -def on_ai_training_step(agent, _locals, _globals): - pass + # called when the agent is rebooting the board + def on_rebooting(self, agent): + pass + # called when the agent is waiting for t seconds + def on_wait(self, agent, t): + pass -# called when the AI has done training -def on_ai_training_end(agent): - pass + # called when the agent is sleeping for t seconds + def on_sleep(self, agent, t): + pass + # called when the agent refreshed its access points list + def on_wifi_update(self, agent, access_points): + pass -# called when the AI got the best reward so far -def on_ai_best_reward(agent, reward): - pass + # called when the agent is sending an association frame + def on_association(self, agent, access_point): + pass + # called when the agent is deauthenticating a client station from an AP + def on_deauthentication(self, agent, access_point, client_station): + pass -# called when the AI got the worst reward so far -def on_ai_worst_reward(agent, reward): - pass + # callend when the agent is tuning on a specific channel + def on_channel_hop(self, agent, channel): + pass + # called when a new handshake is captured, access_point and client_station are json objects + # if the agent could match the BSSIDs to the current list, otherwise they are just the strings of the BSSIDs + def on_handshake(self, agent, filename, access_point, client_station): + pass -# called when a non overlapping wifi channel is found to be free -def on_free_channel(agent, channel): - pass + # called when an epoch is over (where an epoch is a single loop of the main algorithm) + def on_epoch(self, agent, epoch, epoch_data): + pass + # called when a new peer is detected + def on_peer_detected(self, agent, peer): + pass -# called when the status is set to bored -def on_bored(agent): - pass - - -# called when the status is set to sad -def on_sad(agent): - pass - - -# called when the status is set to excited -def on_excited(agent): - pass - - -# called when the status is set to lonely -def on_lonely(agent): - pass - - -# called when the agent is rebooting the board -def on_rebooting(agent): - pass - - -# called when the agent is waiting for t seconds -def on_wait(agent, t): - pass - - -# called when the agent is sleeping for t seconds -def on_sleep(agent, t): - pass - - -# called when the agent refreshed its access points list -def on_wifi_update(agent, access_points): - pass - - -# called when the agent is sending an association frame -def on_association(agent, access_point): - pass - - -# callend when the agent is deauthenticating a client station from an AP -def on_deauthentication(agent, access_point, client_station): - pass - - -# callend when the agent is tuning on a specific channel -def on_channel_hop(agent, channel): - pass - - -# called when a new handshake is captured, access_point and client_station are json objects -# if the agent could match the BSSIDs to the current list, otherwise they are just the strings of the BSSIDs -def on_handshake(agent, filename, access_point, client_station): - pass - - -# called when an epoch is over (where an epoch is a single loop of the main algorithm) -def on_epoch(agent, epoch, epoch_data): - pass - - -# called when a new peer is detected -def on_peer_detected(agent, peer): - pass - - -# called when a known peer is lost -def on_peer_lost(agent, peer): - pass + # called when a known peer is lost + def on_peer_lost(self, agent, peer): + pass diff --git a/pwnagotchi/plugins/default/gpio_buttons.py b/pwnagotchi/plugins/default/gpio_buttons.py index c0f6ceb..7d9adb8 100644 --- a/pwnagotchi/plugins/default/gpio_buttons.py +++ b/pwnagotchi/plugins/default/gpio_buttons.py @@ -1,38 +1,40 @@ -__author__ = 'ratmandu@gmail.com' -__version__ = '1.0.0' -__name__ = 'gpio_buttons' -__license__ = 'GPL3' -__description__ = 'GPIO Button support plugin' - import logging import RPi.GPIO as GPIO import subprocess - -running = False -OPTIONS = dict() -GPIOs = {} -COMMANDs = None - -def runCommand(channel): - command = GPIOs[channel] - logging.info(f"Button Pressed! Running command: {command}") - process = subprocess.Popen(command, shell=True, stdin=None, stdout=open("/dev/null", "w"), stderr=None, executable="/bin/bash") - process.wait() +import pwnagotchi.plugins as plugins -def on_loaded(): - logging.info("GPIO Button plugin loaded.") - - #get list of GPIOs - gpios = OPTIONS['gpios'] +class GPIOButtons(plugins.Plugin): + __author__ = 'ratmandu@gmail.com' + __version__ = '1.0.0' + __license__ = 'GPL3' + __description__ = 'GPIO Button support plugin' - #set gpio numbering - GPIO.setmode(GPIO.BCM) + def __init__(self): + self.running = False + self.ports = {} + self.commands = None - for i in gpios: - gpio = list(i)[0] - command = i[gpio] - GPIOs[gpio] = command - GPIO.setup(gpio, GPIO.IN, GPIO.PUD_UP) - GPIO.add_event_detect(gpio, GPIO.FALLING, callback=runCommand, bouncetime=250) - logging.info("Added command: %s to GPIO #%d", command, gpio) + def runCommand(self, channel): + command = self.ports[channel] + logging.info(f"Button Pressed! Running command: {command}") + process = subprocess.Popen(command, shell=True, stdin=None, stdout=open("/dev/null", "w"), stderr=None, + executable="/bin/bash") + process.wait() + + def on_loaded(self): + logging.info("GPIO Button plugin loaded.") + + # get list of GPIOs + gpios = self.options['gpios'] + + # set gpio numbering + GPIO.setmode(GPIO.BCM) + + for i in gpios: + gpio = list(i)[0] + command = i[gpio] + self.ports[gpio] = command + GPIO.setup(gpio, GPIO.IN, GPIO.PUD_UP) + GPIO.add_event_detect(gpio, GPIO.FALLING, callback=self.runCommand, bouncetime=250) + logging.info("Added command: %s to GPIO #%d", command, gpio) diff --git a/pwnagotchi/plugins/default/gps.py b/pwnagotchi/plugins/default/gps.py index 234df3a..0194559 100644 --- a/pwnagotchi/plugins/default/gps.py +++ b/pwnagotchi/plugins/default/gps.py @@ -1,45 +1,42 @@ -__author__ = 'evilsocket@gmail.com' -__version__ = '1.0.0' -__name__ = 'gps' -__license__ = 'GPL3' -__description__ = 'Save GPS coordinates whenever an handshake is captured.' - import logging import json import os - -running = False -OPTIONS = dict() +import pwnagotchi.plugins as plugins -def on_loaded(): - logging.info("gps plugin loaded for %s" % OPTIONS['device']) +class GPS(plugins.Plugin): + __author__ = 'evilsocket@gmail.com' + __version__ = '1.0.0' + __license__ = 'GPL3' + __description__ = 'Save GPS coordinates whenever an handshake is captured.' + def __init__(self): + self.running = False -def on_ready(agent): - global running + def on_loaded(self): + logging.info("gps plugin loaded for %s" % self.options['device']) - if os.path.exists(OPTIONS['device']): - logging.info("enabling gps bettercap's module for %s" % OPTIONS['device']) - try: - agent.run('gps off') - except: - pass + def on_ready(self, agent): + if os.path.exists(self.options['device']): + logging.info("enabling gps bettercap's module for %s" % self.options['device']) + try: + agent.run('gps off') + except: + pass - agent.run('set gps.device %s' % OPTIONS['device']) - agent.run('set gps.speed %d' % OPTIONS['speed']) - agent.run('gps on') - running = True - else: - logging.warning("no GPS detected") + agent.run('set gps.device %s' % self.options['device']) + agent.run('set gps.speed %d' % self.options['speed']) + agent.run('gps on') + running = True + else: + logging.warning("no GPS detected") + def on_handshake(self, agent, filename, access_point, client_station): + if self.running: + info = agent.session() + gps = info['gps'] + gps_filename = filename.replace('.pcap', '.gps.json') -def on_handshake(agent, filename, access_point, client_station): - if running: - info = agent.session() - gps = info['gps'] - gps_filename = filename.replace('.pcap', '.gps.json') - - logging.info("saving GPS to %s (%s)" % (gps_filename, gps)) - with open(gps_filename, 'w+t') as fp: - json.dump(gps, fp) + logging.info("saving GPS to %s (%s)" % (gps_filename, gps)) + with open(gps_filename, 'w+t') as fp: + json.dump(gps, fp) diff --git a/pwnagotchi/plugins/default/grid.py b/pwnagotchi/plugins/default/grid.py index 9ffd61f..5860939 100644 --- a/pwnagotchi/plugins/default/grid.py +++ b/pwnagotchi/plugins/default/grid.py @@ -1,10 +1,3 @@ -__author__ = 'evilsocket@gmail.com' -__version__ = '1.0.1' -__name__ = 'grid' -__license__ = 'GPL3' -__description__ = 'This plugin signals the unit cryptographic identity and list of pwned networks and list of pwned ' \ - 'networks to api.pwnagotchi.ai ' - import os import logging import time @@ -12,18 +5,9 @@ import glob import re import pwnagotchi.grid as grid +import pwnagotchi.plugins as plugins from pwnagotchi.utils import StatusFile, WifiInfo, extract_from_pcap -OPTIONS = dict() -REPORT = StatusFile('/root/.api-report.json', data_format='json') - -UNREAD_MESSAGES = 0 -TOTAL_MESSAGES = 0 - - -def on_loaded(): - logging.info("grid plugin loaded.") - def parse_pcap(filename): logging.info("grid: parsing %s ..." % filename) @@ -57,93 +41,100 @@ def parse_pcap(filename): return info[WifiInfo.ESSID], info[WifiInfo.BSSID] -def is_excluded(what): - for skip in OPTIONS['exclude']: - skip = skip.lower() - what = what.lower() - if skip in what or skip.replace(':', '') in what: - return True - return False +class Grid(plugins.Plugin): + __author__ = 'evilsocket@gmail.com' + __version__ = '1.0.1' + __license__ = 'GPL3' + __description__ = 'This plugin signals the unit cryptographic identity and list of pwned networks and list of pwned ' \ + 'networks to api.pwnagotchi.ai ' + def __init__(self): + self.options = dict() + self.report = StatusFile('/root/.api-report.json', data_format='json') -def set_reported(reported, net_id): - global REPORT - reported.append(net_id) - REPORT.update(data={'reported': reported}) + self.unread_messages = 0 + self.total_messages = 0 + def is_excluded(self, what): + for skip in self.options['exclude']: + skip = skip.lower() + what = what.lower() + if skip in what or skip.replace(':', '') in what: + return True + return False -def check_inbox(agent): - global REPORT, UNREAD_MESSAGES, TOTAL_MESSAGES + def on_loaded(self): + logging.info("grid plugin loaded.") - logging.debug("checking mailbox ...") + def set_reported(self, reported, net_id): + reported.append(net_id) + self.report.update(data={'reported': reported}) - messages = grid.inbox() - TOTAL_MESSAGES = len(messages) - UNREAD_MESSAGES = len([m for m in messages if m['seen_at'] is None]) + def check_inbox(self, agent): + logging.debug("checking mailbox ...") + messages = grid.inbox() + self.total_messages = len(messages) + self.unread_messages = len([m for m in messages if m['seen_at'] is None]) - if UNREAD_MESSAGES: - logging.debug("[grid] unread:%d total:%d" % (UNREAD_MESSAGES, TOTAL_MESSAGES)) - agent.view().on_unread_messages(UNREAD_MESSAGES, TOTAL_MESSAGES) + if self.unread_messages: + logging.debug("[grid] unread:%d total:%d" % (self.unread_messages, self.total_messages)) + agent.view().on_unread_messages(self.unread_messages, self.total_messages) + def check_handshakes(self, agent): + logging.debug("checking pcaps") -def check_handshakes(agent): - logging.debug("checking pcaps") + pcap_files = glob.glob(os.path.join(agent.config()['bettercap']['handshakes'], "*.pcap")) + num_networks = len(pcap_files) + reported = self.report.data_field_or('reported', default=[]) + num_reported = len(reported) + num_new = num_networks - num_reported - pcap_files = glob.glob(os.path.join(agent.config()['bettercap']['handshakes'], "*.pcap")) - num_networks = len(pcap_files) - reported = REPORT.data_field_or('reported', default=[]) - num_reported = len(reported) - num_new = num_networks - num_reported + if num_new > 0: + if self.options['report']: + logging.info("grid: %d new networks to report" % num_new) + logging.debug("self.options: %s" % self.options) + logging.debug(" exclude: %s" % self.options['exclude']) - if num_new > 0: - if OPTIONS['report']: - logging.info("grid: %d new networks to report" % num_new) - logging.debug("OPTIONS: %s" % OPTIONS) - logging.debug(" exclude: %s" % OPTIONS['exclude']) + for pcap_file in pcap_files: + net_id = os.path.basename(pcap_file).replace('.pcap', '') + if net_id not in reported: + if self.is_excluded(net_id): + logging.debug("skipping %s due to exclusion filter" % pcap_file) + self.set_reported(reported, net_id) + continue - for pcap_file in pcap_files: - net_id = os.path.basename(pcap_file).replace('.pcap', '') - if net_id not in reported: - if is_excluded(net_id): - logging.debug("skipping %s due to exclusion filter" % pcap_file) - set_reported(reported, net_id) - continue - - essid, bssid = parse_pcap(pcap_file) - if bssid: - if is_excluded(essid) or is_excluded(bssid): - logging.debug("not reporting %s due to exclusion filter" % pcap_file) - set_reported(reported, net_id) + essid, bssid = parse_pcap(pcap_file) + if bssid: + if self.is_excluded(essid) or self.is_excluded(bssid): + logging.debug("not reporting %s due to exclusion filter" % pcap_file) + self.set_reported(reported, net_id) + else: + if grid.report_ap(essid, bssid): + self.set_reported(reported, net_id) + time.sleep(1.5) else: - if grid.report_ap(essid, bssid): - set_reported(reported, net_id) - time.sleep(1.5) - else: - logging.warning("no bssid found?!") - else: - logging.debug("grid: reporting disabled") + logging.warning("no bssid found?!") + else: + logging.debug("grid: reporting disabled") + def on_internet_available(self, agent): + logging.debug("internet available") -def on_internet_available(agent): - global REPORT, UNREAD_MESSAGES, TOTAL_MESSAGES + try: + grid.update_data(agent.last_session) + except Exception as e: + logging.error("error connecting to the pwngrid-peer service: %s" % e) + logging.debug(e, exc_info=True) + return - logging.debug("internet available") + try: + self.check_inbox(agent) + except Exception as e: + logging.error("[grid] error while checking inbox: %s" % e) + logging.debug(e, exc_info=True) - try: - grid.update_data(agent.last_session) - except Exception as e: - logging.error("error connecting to the pwngrid-peer service: %s" % e) - logging.debug(e, exc_info=True) - return - - try: - check_inbox(agent) - except Exception as e: - logging.error("[grid] error while checking inbox: %s" % e) - logging.debug(e, exc_info=True) - - try: - check_handshakes(agent) - except Exception as e: - logging.error("[grid] error while checking pcaps: %s" % e) - logging.debug(e, exc_info=True) + try: + self.check_handshakes(agent) + except Exception as e: + logging.error("[grid] error while checking pcaps: %s" % e) + logging.debug(e, exc_info=True) diff --git a/pwnagotchi/plugins/default/memtemp.py b/pwnagotchi/plugins/default/memtemp.py index 3d6682b..521aa65 100644 --- a/pwnagotchi/plugins/default/memtemp.py +++ b/pwnagotchi/plugins/default/memtemp.py @@ -17,48 +17,44 @@ # - Added horizontal and vertical orientation # ############################################################### - -__author__ = 'https://github.com/xenDE' -__version__ = '1.0.1' -__name__ = 'memtemp' -__license__ = 'GPL3' -__description__ = 'A plugin that will display memory/cpu usage and temperature' - from pwnagotchi.ui.components import LabeledValue from pwnagotchi.ui.view import BLACK import pwnagotchi.ui.fonts as fonts +import pwnagotchi.plugins as plugins import pwnagotchi import logging -OPTIONS = dict() +class MemTemp(plugins.Plugin): + __author__ = 'https://github.com/xenDE' + __version__ = '1.0.1' + __license__ = 'GPL3' + __description__ = 'A plugin that will display memory/cpu usage and temperature' -def on_loaded(): - logging.info("memtemp plugin loaded.") + def on_loaded(self): + logging.info("memtemp plugin loaded.") + def mem_usage(self): + return int(pwnagotchi.mem_usage() * 100) -def mem_usage(): - return int(pwnagotchi.mem_usage() * 100) + def cpu_load(self): + return int(pwnagotchi.cpu_load() * 100) + def on_ui_setup(self, ui): + if self.options['orientation'] == "horizontal": + ui.add_element('memtemp', LabeledValue(color=BLACK, label='', value='mem cpu temp\n - - -', + position=(ui.width() / 2 + 30, ui.height() / 2 + 15), + label_font=fonts.Small, text_font=fonts.Small)) + elif self.options['orientation'] == "vertical": + ui.add_element('memtemp', LabeledValue(color=BLACK, label='', value=' mem:-\n cpu:-\ntemp:-', + position=(ui.width() / 2 + 55, ui.height() / 2), + label_font=fonts.Small, text_font=fonts.Small)) -def cpu_load(): - return int(pwnagotchi.cpu_load() * 100) + def on_ui_update(self, ui): + if self.options['orientation'] == "horizontal": + ui.set('memtemp', + " mem cpu temp\n %s%% %s%% %sc" % (self.mem_usage(), self.cpu_load(), pwnagotchi.temperature())) - -def on_ui_setup(ui): - if OPTIONS['orientation'] == "horizontal": - ui.add_element('memtemp', LabeledValue(color=BLACK, label='', value='mem cpu temp\n - - -', - position=(ui.width() / 2 + 30, ui.height() / 2 + 15), - label_font=fonts.Small, text_font=fonts.Small)) - elif OPTIONS['orientation'] == "vertical": - ui.add_element('memtemp', LabeledValue(color=BLACK, label='', value=' mem:-\n cpu:-\ntemp:-', - position=(ui.width() / 2 + 55, ui.height() / 2), - label_font=fonts.Small, text_font=fonts.Small)) - - -def on_ui_update(ui): - if OPTIONS['orientation'] == "horizontal": - ui.set('memtemp', " mem cpu temp\n %s%% %s%% %sc" % (mem_usage(), cpu_load(), pwnagotchi.temperature())) - - elif OPTIONS['orientation'] == "vertical": - ui.set('memtemp', " mem:%s%%\n cpu:%s%%\ntemp:%sc" % (mem_usage(), cpu_load(), pwnagotchi.temperature())) + elif self.options['orientation'] == "vertical": + ui.set('memtemp', + " mem:%s%%\n cpu:%s%%\ntemp:%sc" % (self.mem_usage(), self.cpu_load(), pwnagotchi.temperature())) diff --git a/pwnagotchi/plugins/default/net-pos.py b/pwnagotchi/plugins/default/net-pos.py index d0c54ae..3f1f4e5 100644 --- a/pwnagotchi/plugins/default/net-pos.py +++ b/pwnagotchi/plugins/default/net-pos.py @@ -1,140 +1,134 @@ -__author__ = 'zenzen san' -__version__ = '2.0.0' -__name__ = 'net-pos' -__license__ = 'GPL3' -__description__ = """Saves a json file with the access points with more signal - whenever a handshake is captured. - When internet is available the files are converted in geo locations - using Mozilla LocationService """ - import logging import json import os import requests +import pwnagotchi.plugins as plugins from pwnagotchi.utils import StatusFile MOZILLA_API_URL = 'https://location.services.mozilla.com/v1/geolocate?key={api}' -REPORT = StatusFile('/root/.net_pos_saved', data_format='json') -SKIP = list() -READY = False -OPTIONS = dict() -def on_loaded(): - global READY +class NetPos(plugins.Plugin): + __author__ = 'zenzen san' + __version__ = '2.0.0' + __license__ = 'GPL3' + __description__ = """Saves a json file with the access points with more signal + whenever a handshake is captured. + When internet is available the files are converted in geo locations + using Mozilla LocationService """ - if 'api_key' not in OPTIONS or ('api_key' in OPTIONS and OPTIONS['api_key'] is None): - logging.error("NET-POS: api_key isn't set. Can't use mozilla's api.") - return + def __init__(self): + self.report = StatusFile('/root/.net_pos_saved', data_format='json') + self.skip = list() + self.ready = False - READY = True + def on_loaded(self): + if 'api_key' not in self.options or ('api_key' in self.options and self.options['api_key'] is None): + logging.error("NET-POS: api_key isn't set. Can't use mozilla's api.") + return - logging.info("net-pos plugin loaded.") + self.ready = True + logging.info("net-pos plugin loaded.") -def _append_saved(path): - to_save = list() - if isinstance(path, str): - to_save.append(path) - elif isinstance(path, list): - to_save += path - else: - raise TypeError("Expected list or str, got %s" % type(path)) + def _append_saved(self, path): + to_save = list() + if isinstance(path, str): + to_save.append(path) + elif isinstance(path, list): + to_save += path + else: + raise TypeError("Expected list or str, got %s" % type(path)) - with open('/root/.net_pos_saved', 'a') as saved_file: - for x in to_save: - saved_file.write(x + "\n") + with open('/root/.net_pos_saved', 'a') as saved_file: + for x in to_save: + saved_file.write(x + "\n") -def on_internet_available(agent): - global SKIP - global REPORT + def on_internet_available(self, agent): + if self.ready: + config = agent.config() + display = agent.view() + reported = self.report.data_field_or('reported', default=list()) + handshake_dir = config['bettercap']['handshakes'] - if READY: - config = agent.config() - display = agent.view() - reported = REPORT.data_field_or('reported', default=list()) - handshake_dir = config['bettercap']['handshakes'] + all_files = os.listdir(handshake_dir) + all_np_files = [os.path.join(handshake_dir, filename) + for filename in all_files + if filename.endswith('.net-pos.json')] + new_np_files = set(all_np_files) - set(reported) - set(self.skip) - all_files = os.listdir(handshake_dir) - all_np_files = [os.path.join(handshake_dir, filename) - for filename in all_files - if filename.endswith('.net-pos.json')] - new_np_files = set(all_np_files) - set(reported) - set(SKIP) - - if new_np_files: - logging.info("NET-POS: Found %d new net-pos files. Fetching positions ...", len(new_np_files)) - display.set('status', f"Found {len(new_np_files)} new net-pos files. Fetching positions ...") - display.update(force=True) - for idx, np_file in enumerate(new_np_files): - - geo_file = np_file.replace('.net-pos.json', '.geo.json') - if os.path.exists(geo_file): - # got already the position - reported.append(np_file) - REPORT.update(data={'reported': reported}) - continue - - try: - geo_data = _get_geo_data(np_file) # returns json obj - except requests.exceptions.RequestException as req_e: - logging.error("NET-POS: %s", req_e) - SKIP += np_file - continue - except json.JSONDecodeError as js_e: - logging.error("NET-POS: %s", js_e) - SKIP += np_file - continue - except OSError as os_e: - logging.error("NET-POS: %s", os_e) - SKIP += np_file - continue - - with open(geo_file, 'w+t') as sf: - json.dump(geo_data, sf) - - reported.append(np_file) - REPORT.update(data={'reported': reported}) - - display.set('status', f"Fetching positions ({idx+1}/{len(new_np_files)})") + if new_np_files: + logging.info("NET-POS: Found %d new net-pos files. Fetching positions ...", len(new_np_files)) + display.set('status', f"Found {len(new_np_files)} new net-pos files. Fetching positions ...") display.update(force=True) + for idx, np_file in enumerate(new_np_files): + geo_file = np_file.replace('.net-pos.json', '.geo.json') + if os.path.exists(geo_file): + # got already the position + reported.append(np_file) + self.report.update(data={'reported': reported}) + continue -def on_handshake(agent, filename, access_point, client_station): - netpos = _get_netpos(agent) - netpos_filename = filename.replace('.pcap', '.net-pos.json') - logging.info("NET-POS: Saving net-location to %s", netpos_filename) + try: + geo_data = self._get_geo_data(np_file) # returns json obj + except requests.exceptions.RequestException as req_e: + logging.error("NET-POS: %s", req_e) + self.skip += np_file + continue + except json.JSONDecodeError as js_e: + logging.error("NET-POS: %s", js_e) + self.skip += np_file + continue + except OSError as os_e: + logging.error("NET-POS: %s", os_e) + self.skip += np_file + continue - try: - with open(netpos_filename, 'w+t') as net_pos_file: - json.dump(netpos, net_pos_file) - except OSError as os_e: - logging.error("NET-POS: %s", os_e) + with open(geo_file, 'w+t') as sf: + json.dump(geo_data, sf) + reported.append(np_file) + self.report.update(data={'reported': reported}) -def _get_netpos(agent): - aps = agent.get_access_points() - netpos = dict() - netpos['wifiAccessPoints'] = list() - # 6 seems a good number to save a wifi networks location - for access_point in sorted(aps, key=lambda i: i['rssi'], reverse=True)[:6]: - netpos['wifiAccessPoints'].append({'macAddress': access_point['mac'], - 'signalStrength': access_point['rssi']}) - return netpos + display.set('status', f"Fetching positions ({idx + 1}/{len(new_np_files)})") + display.update(force=True) -def _get_geo_data(path, timeout=30): - geourl = MOZILLA_API_URL.format(api=OPTIONS['api_key']) + def on_handshake(self, agent, filename, access_point, client_station): + netpos = self._get_netpos(agent) + netpos_filename = filename.replace('.pcap', '.net-pos.json') + logging.info("NET-POS: Saving net-location to %s", netpos_filename) - try: - with open(path, "r") as json_file: - data = json.load(json_file) - except json.JSONDecodeError as js_e: - raise js_e - except OSError as os_e: - raise os_e + try: + with open(netpos_filename, 'w+t') as net_pos_file: + json.dump(netpos, net_pos_file) + except OSError as os_e: + logging.error("NET-POS: %s", os_e) - try: - result = requests.post(geourl, - json=data, - timeout=timeout) - return result.json() - except requests.exceptions.RequestException as req_e: - raise req_e + def _get_netpos(self, agent): + aps = agent.get_access_points() + netpos = dict() + netpos['wifiAccessPoints'] = list() + # 6 seems a good number to save a wifi networks location + for access_point in sorted(aps, key=lambda i: i['rssi'], reverse=True)[:6]: + netpos['wifiAccessPoints'].append({'macAddress': access_point['mac'], + 'signalStrength': access_point['rssi']}) + return netpos + + def _get_geo_data(self, path, timeout=30): + geourl = MOZILLA_API_URL.format(api=self.options['api_key']) + + try: + with open(path, "r") as json_file: + data = json.load(json_file) + except json.JSONDecodeError as js_e: + raise js_e + except OSError as os_e: + raise os_e + + try: + result = requests.post(geourl, + json=data, + timeout=timeout) + return result.json() + except requests.exceptions.RequestException as req_e: + raise req_e diff --git a/pwnagotchi/plugins/default/onlinehashcrack.py b/pwnagotchi/plugins/default/onlinehashcrack.py index 47f881a..7a02a30 100644 --- a/pwnagotchi/plugins/default/onlinehashcrack.py +++ b/pwnagotchi/plugins/default/onlinehashcrack.py @@ -1,86 +1,82 @@ -__author__ = '33197631+dadav@users.noreply.github.com' -__version__ = '2.0.0' -__name__ = 'onlinehashcrack' -__license__ = 'GPL3' -__description__ = 'This plugin automatically uploads handshakes to https://onlinehashcrack.com' - import os import logging import requests from pwnagotchi.utils import StatusFile - -READY = False -REPORT = StatusFile('/root/.ohc_uploads', data_format='json') -SKIP = list() -OPTIONS = dict() +import pwnagotchi.plugins as plugins -def on_loaded(): - """ - Gets called when the plugin gets loaded - """ - global READY +class OnlineHashCrack(plugins.Plugin): + __author__ = '33197631+dadav@users.noreply.github.com' + __version__ = '2.0.0' + __license__ = 'GPL3' + __description__ = 'This plugin automatically uploads handshakes to https://onlinehashcrack.com' - if 'email' not in OPTIONS or ('email' in OPTIONS and OPTIONS['email'] is None): - logging.error("OHC: Email isn't set. Can't upload to onlinehashcrack.com") - return + def __init__(self): + self.ready = False + self.report = StatusFile('/root/.ohc_uploads', data_format='json') + self.skip = list() - READY = True + def on_loaded(self): + """ + Gets called when the plugin gets loaded + """ + if 'email' not in self.options or ('email' in self.options and self.options['email'] is None): + logging.error("OHC: Email isn't set. Can't upload to onlinehashcrack.com") + return + self.ready = True -def _upload_to_ohc(path, timeout=30): - """ - Uploads the file to onlinehashcrack.com - """ - with open(path, 'rb') as file_to_upload: - data = {'email': OPTIONS['email']} - payload = {'file': file_to_upload} + def _upload_to_ohc(self, path, timeout=30): + """ + Uploads the file to onlinehashcrack.com + """ + with open(path, 'rb') as file_to_upload: + data = {'email': self.options['email']} + payload = {'file': file_to_upload} - try: - result = requests.post('https://api.onlinehashcrack.com', - data=data, - files=payload, - timeout=timeout) - if 'already been sent' in result.text: - logging.warning(f"{path} was already uploaded.") - except requests.exceptions.RequestException as e: - logging.error(f"OHC: Got an exception while uploading {path} -> {e}") - raise e + try: + result = requests.post('https://api.onlinehashcrack.com', + data=data, + files=payload, + timeout=timeout) + if 'already been sent' in result.text: + logging.warning(f"{path} was already uploaded.") + except requests.exceptions.RequestException as e: + logging.error(f"OHC: Got an exception while uploading {path} -> {e}") + raise e + def on_internet_available(self, agent): + """ + Called in manual mode when there's internet connectivity + """ + if self.ready: + display = agent.view() + config = agent.config() + reported = self.report.data_field_or('reported', default=list()) -def on_internet_available(agent): - """ - Called in manual mode when there's internet connectivity - """ - global REPORT - global SKIP - if READY: - display = agent.view() - config = agent.config() - reported = REPORT.data_field_or('reported', default=list()) + handshake_dir = config['bettercap']['handshakes'] + handshake_filenames = os.listdir(handshake_dir) + handshake_paths = [os.path.join(handshake_dir, filename) for filename in handshake_filenames if + filename.endswith('.pcap')] + handshake_new = set(handshake_paths) - set(reported) - set(self.skip) - handshake_dir = config['bettercap']['handshakes'] - handshake_filenames = os.listdir(handshake_dir) - handshake_paths = [os.path.join(handshake_dir, filename) for filename in handshake_filenames if filename.endswith('.pcap')] - handshake_new = set(handshake_paths) - set(reported) - set(SKIP) - - if handshake_new: - logging.info("OHC: Internet connectivity detected. Uploading new handshakes to onelinehashcrack.com") - - for idx, handshake in enumerate(handshake_new): - display.set('status', f"Uploading handshake to onlinehashcrack.com ({idx + 1}/{len(handshake_new)})") - display.update(force=True) - try: - _upload_to_ohc(handshake) - reported.append(handshake) - REPORT.update(data={'reported': reported}) - logging.info(f"OHC: Successfully uploaded {handshake}") - except requests.exceptions.RequestException as req_e: - SKIP.append(handshake) - logging.error("OHC: %s", req_e) - continue - except OSError as os_e: - SKIP.append(handshake) - logging.error("OHC: %s", os_e) - continue + if handshake_new: + logging.info("OHC: Internet connectivity detected. Uploading new handshakes to onelinehashcrack.com") + for idx, handshake in enumerate(handshake_new): + display.set('status', + f"Uploading handshake to onlinehashcrack.com ({idx + 1}/{len(handshake_new)})") + display.update(force=True) + try: + self._upload_to_ohc(handshake) + reported.append(handshake) + self.report.update(data={'reported': reported}) + logging.info(f"OHC: Successfully uploaded {handshake}") + except requests.exceptions.RequestException as req_e: + self.skip.append(handshake) + logging.error("OHC: %s", req_e) + continue + except OSError as os_e: + self.skip.append(handshake) + logging.error("OHC: %s", os_e) + continue diff --git a/pwnagotchi/plugins/default/paw-gps.py b/pwnagotchi/plugins/default/paw-gps.py index a5268c4..a681033 100644 --- a/pwnagotchi/plugins/default/paw-gps.py +++ b/pwnagotchi/plugins/default/paw-gps.py @@ -1,27 +1,27 @@ -__author__ = 'leont' -__version__ = '1.0.0' -__name__ = 'pawgps' -__license__ = 'GPL3' -__description__ = 'Saves GPS coordinates whenever an handshake is captured. The GPS data is get from PAW on android ' +import logging +import requests +import pwnagotchi.plugins as plugins ''' You need an bluetooth connection to your android phone which is running PAW server with the GPS "hack" from Systemic: https://raw.githubusercontent.com/systemik/pwnagotchi-bt-tether/master/GPS-via-PAW ''' -import logging -import requests -OPTIONS = dict() +class PawGPS(plugins.Plugin): + __author__ = 'leont' + __version__ = '1.0.0' + __name__ = 'pawgps' + __license__ = 'GPL3' + __description__ = 'Saves GPS coordinates whenever an handshake is captured. The GPS data is get from PAW on android ' + def on_loaded(self): + logging.info("PAW-GPS loaded") + if 'ip' not in self.options or ('ip' in self.options and self.options['ip'] is None): + logging.info("PAW-GPS: No IP Address in the config file is defined, it uses the default (192.168.44.1)") -def on_loaded(): - logging.info("PAW-GPS loaded") - if 'ip' not in OPTIONS or ('ip' in OPTIONS and OPTIONS['ip'] is None): - logging.info("PAW-GPS: No IP Address in the config file is defined, it uses the default (192.168.44.1)") - -def on_handshake(agent, filename, access_point, client_station): - if 'ip' not in OPTIONS or ('ip' in OPTIONS and OPTIONS['ip'] is None): + def on_handshake(self, agent, filename, access_point, client_station): + if 'ip' not in self.options or ('ip' in self.options and self.options['ip'] is None): ip = "192.168.44.1" gps = requests.get('http://' + ip + '/gps.xhtml') diff --git a/pwnagotchi/plugins/default/quickdic.py b/pwnagotchi/plugins/default/quickdic.py index b898b21..7bdd647 100644 --- a/pwnagotchi/plugins/default/quickdic.py +++ b/pwnagotchi/plugins/default/quickdic.py @@ -1,8 +1,8 @@ -__author__ = 'pwnagotchi [at] rossmarks [dot] uk' -__version__ = '1.0.0' -__name__ = 'quickdic' -__license__ = 'GPL3' -__description__ = 'Run a quick dictionary scan against captured handshakes' +import logging +import subprocess +import string +import re +import pwnagotchi.plugins as plugins ''' Aircrack-ng needed, to install: @@ -11,42 +11,41 @@ Upload wordlist files in .txt format to folder in config file (Default: /opt/wor Cracked handshakes stored in handshake folder as [essid].pcap.cracked ''' -import logging -import subprocess -import string -import re -OPTIONS = dict() +class QuickDic(plugins.Plugin): + __author__ = 'pwnagotchi [at] rossmarks [dot] uk' + __version__ = '1.0.0' + __license__ = 'GPL3' + __description__ = 'Run a quick dictionary scan against captured handshakes' -def on_loaded(): - logging.info("Quick dictionary check plugin loaded") + def __init__(self): + self.text_to_set = "" -def on_handshake(agent, filename, access_point, client_station): - display = agent._view + def on_loaded(self): + logging.info("Quick dictionary check plugin loaded") - result = subprocess.run(('/usr/bin/aircrack-ng '+ filename +' | grep "1 handshake" | awk \'{print $2}\''),shell=True, stdout=subprocess.PIPE) - result = result.stdout.decode('utf-8').translate({ord(c) :None for c in string.whitespace}) - if not result: - logging.info("[quickdic] No handshake") - else: - logging.info("[quickdic] Handshake confirmed") - result2 = subprocess.run(('aircrack-ng -w `echo '+OPTIONS['wordlist_folder']+'*.txt | sed \'s/\ /,/g\'` -l '+filename+'.cracked -q -b '+result+' '+filename+' | grep KEY'),shell=True,stdout=subprocess.PIPE) - result2 = result2.stdout.decode('utf-8').strip() - logging.info("[quickdic] "+result2) - if result2 != "KEY NOT FOUND": - key = re.search('\[(.*)\]', result2) - pwd = str(key.group(1)) - set_text("Cracked password: "+pwd) - display.update(force=True) + def on_handshake(self, agent, filename, access_point, client_station): + display = agent.view() + result = subprocess.run(('/usr/bin/aircrack-ng ' + filename + ' | grep "1 handshake" | awk \'{print $2}\''), + shell=True, stdout=subprocess.PIPE) + result = result.stdout.decode('utf-8').translate({ord(c): None for c in string.whitespace}) + if not result: + logging.info("[quickdic] No handshake") + else: + logging.info("[quickdic] Handshake confirmed") + result2 = subprocess.run(('aircrack-ng -w `echo ' + self.options[ + 'wordlist_folder'] + '*.txt | sed \'s/\ /,/g\'` -l ' + filename + '.cracked -q -b ' + result + ' ' + filename + ' | grep KEY'), + shell=True, stdout=subprocess.PIPE) + result2 = result2.stdout.decode('utf-8').strip() + logging.info("[quickdic] " + result2) + if result2 != "KEY NOT FOUND": + key = re.search('\[(.*)\]', result2) + pwd = str(key.group(1)) + self.text_to_set = "Cracked password: " + pwd + display.update(force=True) -text_to_set = ""; -def set_text(text): - global text_to_set - text_to_set = text - -def on_ui_update(ui): - global text_to_set - if text_to_set: - ui.set('face', "(·ω·)") - ui.set('status', text_to_set) - text_to_set = "" + def on_ui_update(self, ui): + if self.text_to_set: + ui.set('face', "(·ω·)") + ui.set('status', self.text_to_set) + self.text_to_set = "" diff --git a/pwnagotchi/plugins/default/screen_refresh.py b/pwnagotchi/plugins/default/screen_refresh.py index 3c7047f..25ea325 100644 --- a/pwnagotchi/plugins/default/screen_refresh.py +++ b/pwnagotchi/plugins/default/screen_refresh.py @@ -1,24 +1,23 @@ -__author__ = 'pwnagotchi [at] rossmarks [dot] uk' -__version__ = '1.0.0' -__name__ = 'screen_refresh' -__license__ = 'GPL3' -__description__ = 'Refresh he e-ink display after X amount of updates' - import logging - -OPTIONS = dict() -update_count = 0; +import pwnagotchi.plugins as plugins -def on_loaded(): - logging.info("Screen refresh plugin loaded") +class ScreenRefresh(plugins.Plugin): + __author__ = 'pwnagotchi [at] rossmarks [dot] uk' + __version__ = '1.0.0' + __license__ = 'GPL3' + __description__ = 'Refresh he e-ink display after X amount of updates' + def __init__(self): + self.update_count = 0; -def on_ui_update(ui): - global update_count - update_count += 1 - if update_count == OPTIONS['refresh_interval']: - ui.init_display() - ui.set('status', "Screen cleaned") - logging.info("Screen refreshing") - update_count = 0 + def on_loaded(self): + logging.info("Screen refresh plugin loaded") + + def on_ui_update(self, ui): + self.update_count += 1 + if self.update_count == self.options['refresh_interval']: + ui.init_display() + ui.set('status', "Screen cleaned") + logging.info("Screen refreshing") + self.update_count = 0 diff --git a/pwnagotchi/plugins/default/twitter.py b/pwnagotchi/plugins/default/twitter.py index fd247ca..80bb2ba 100644 --- a/pwnagotchi/plugins/default/twitter.py +++ b/pwnagotchi/plugins/default/twitter.py @@ -1,50 +1,49 @@ -__author__ = '33197631+dadav@users.noreply.github.com' -__version__ = '1.0.0' -__name__ = 'twitter' -__license__ = 'GPL3' -__description__ = 'This plugin creates tweets about the recent activity of pwnagotchi' - import logging from pwnagotchi.voice import Voice - -OPTIONS = dict() - -def on_loaded(): - logging.info("twitter plugin loaded.") +import pwnagotchi.plugins as plugins -# called in manual mode when there's internet connectivity -def on_internet_available(agent): - config = agent.config() - display = agent.view() - last_session = agent.last_session +class Twitter(plugins.Plugin): + __author__ = 'evilsocket@gmail.com' + __version__ = '1.0.0' + __license__ = 'GPL3' + __description__ = 'This plugin creates tweets about the recent activity of pwnagotchi' - if last_session.is_new() and last_session.handshakes > 0: - try: - import tweepy - except ImportError: - logging.error("Couldn't import tweepy") - return + def on_loaded(self): + logging.info("twitter plugin loaded.") - logging.info("detected a new session and internet connectivity!") + # called in manual mode when there's internet connectivity + def on_internet_available(self, agent): + config = agent.config() + display = agent.view() + last_session = agent.last_session - picture = '/dev/shm/pwnagotchi.png' + if last_session.is_new() and last_session.handshakes > 0: + try: + import tweepy + except ImportError: + logging.error("Couldn't import tweepy") + return - display.on_manual_mode(last_session) - display.update(force=True) - display.image().save(picture, 'png') - display.set('status', 'Tweeting...') - display.update(force=True) + logging.info("detected a new session and internet connectivity!") - try: - auth = tweepy.OAuthHandler(OPTIONS['consumer_key'], OPTIONS['consumer_secret']) - auth.set_access_token(OPTIONS['access_token_key'], OPTIONS['access_token_secret']) - api = tweepy.API(auth) + picture = '/root/pwnagotchi.png' - tweet = Voice(lang=config['main']['lang']).on_last_session_tweet(last_session) - api.update_with_media(filename=picture, status=tweet) - last_session.save_session_id() + display.on_manual_mode(last_session) + display.update(force=True) + display.image().save(picture, 'png') + display.set('status', 'Tweeting...') + display.update(force=True) - logging.info("tweeted: %s" % tweet) - except Exception as e: - logging.exception("error while tweeting") + try: + auth = tweepy.OAuthHandler(self.options['consumer_key'], self.options['consumer_secret']) + auth.set_access_token(self.options['access_token_key'], self.options['access_token_secret']) + api = tweepy.API(auth) + + tweet = Voice(lang=config['main']['lang']).on_last_session_tweet(last_session) + api.update_with_media(filename=picture, status=tweet) + last_session.save_session_id() + + logging.info("tweeted: %s" % tweet) + except Exception as e: + logging.exception("error while tweeting") diff --git a/pwnagotchi/plugins/default/unfiltered_example.py b/pwnagotchi/plugins/default/unfiltered_example.py deleted file mode 100644 index 9fe1e49..0000000 --- a/pwnagotchi/plugins/default/unfiltered_example.py +++ /dev/null @@ -1,22 +0,0 @@ -__author__ = 'diemelcw@gmail.com' -__version__ = '1.0.0' -__name__ = 'unfiltered_example' -__license__ = 'GPL3' -__description__ = 'An example plugin for pwnagotchi that implements on_unfiltered_ap_list(agent,aps)' - -import logging - -# Will be set with the options in config.yml config['main']['plugins'][__name__] -OPTIONS = dict() - -# called when the plugin is loaded -def on_loaded(): - logging.warning("%s plugin loaded" % __name__) - -# called when AP list is ready, before whitelist filtering has occurred -def on_unfiltered_ap_list(agent,aps): - logging.info("Unfiltered AP list to follow") - for ap in aps: - logging.info(ap['hostname']) - - ## Additional logic here ## diff --git a/pwnagotchi/plugins/default/ups_lite.py b/pwnagotchi/plugins/default/ups_lite.py index 5c0d9d7..74e0b62 100644 --- a/pwnagotchi/plugins/default/ups_lite.py +++ b/pwnagotchi/plugins/default/ups_lite.py @@ -7,17 +7,12 @@ # For Raspberry Pi Zero Ups Power Expansion Board with Integrated Serial Port S3U4 # https://www.ebay.de/itm/For-Raspberry-Pi-Zero-Ups-Power-Expansion-Board-with-Integrated-Serial-Port-S3U4/323873804310 # https://www.aliexpress.com/item/32888533624.html -__author__ = 'evilsocket@gmail.com' -__version__ = '1.0.0' -__name__ = 'ups_lite' -__license__ = 'GPL3' -__description__ = 'A plugin that will add a voltage indicator for the UPS Lite v1.1' - import struct from pwnagotchi.ui.components import LabeledValue from pwnagotchi.ui.view import BLACK import pwnagotchi.ui.fonts as fonts +import pwnagotchi.plugins as plugins # TODO: add enable switch in config.yml an cleanup all to the best place @@ -47,18 +42,21 @@ class UPS: return 0.0 -ups = None +class UPSLite(plugins.Plugin): + __author__ = 'evilsocket@gmail.com' + __version__ = '1.0.0' + __license__ = 'GPL3' + __description__ = 'A plugin that will add a voltage indicator for the UPS Lite v1.1' + def __init__(self): + self.ups = None -def on_loaded(): - global ups - ups = UPS() + def on_loaded(self): + self.ups = UPS() + def on_ui_setup(self, ui): + ui.add_element('ups', LabeledValue(color=BLACK, label='UPS', value='0%/0V', position=(ui.width() / 2 - 25, 0), + label_font=fonts.Bold, text_font=fonts.Medium)) -def on_ui_setup(ui): - ui.add_element('ups', LabeledValue(color=BLACK, label='UPS', value='0%/0V', position=(ui.width() / 2 - 25, 0), - label_font=fonts.Bold, text_font=fonts.Medium)) - - -def on_ui_update(ui): - ui.set('ups', "%4.2fV/%2i%%" % (ups.voltage(), ups.capacity())) + def on_ui_update(self, ui): + ui.set('ups', "%4.2fV/%2i%%" % (self.ups.voltage(), self.ups.capacity())) diff --git a/pwnagotchi/plugins/default/wigle.py b/pwnagotchi/plugins/default/wigle.py index 5437132..d0d748f 100644 --- a/pwnagotchi/plugins/default/wigle.py +++ b/pwnagotchi/plugins/default/wigle.py @@ -1,9 +1,3 @@ -__author__ = '33197631+dadav@users.noreply.github.com' -__version__ = '2.0.0' -__name__ = 'wigle' -__license__ = 'GPL3' -__description__ = 'This plugin automatically uploads collected wifis to wigle.net' - import os import logging import json @@ -12,24 +6,7 @@ import csv from datetime import datetime import requests from pwnagotchi.utils import WifiInfo, FieldNotFoundError, extract_from_pcap, StatusFile - -READY = False -REPORT = StatusFile('/root/.wigle_uploads', data_format='json') -SKIP = list() -OPTIONS = dict() - - -def on_loaded(): - """ - Gets called when the plugin gets loaded - """ - global READY - - if 'api_key' not in OPTIONS or ('api_key' in OPTIONS and OPTIONS['api_key'] is None): - logging.error("WIGLE: api_key isn't set. Can't upload to wigle.net") - return - - READY = True +import pwnagotchi.plugins as plugins def _extract_gps_data(path): @@ -54,14 +31,17 @@ def _format_auth(data): out = f"{out}[{auth}]" return out + def _transform_wigle_entry(gps_data, pcap_data): """ Transform to wigle entry in file """ dummy = StringIO() # write kismet header - dummy.write("WigleWifi-1.4,appRelease=20190201,model=Kismet,release=2019.02.01.{},device=kismet,display=kismet,board=kismet,brand=kismet\n") - dummy.write("MAC,SSID,AuthMode,FirstSeen,Channel,RSSI,CurrentLatitude,CurrentLongitude,AltitudeMeters,AccuracyMeters,Type") + dummy.write( + "WigleWifi-1.4,appRelease=20190201,model=Kismet,release=2019.02.01.{},device=kismet,display=kismet,board=kismet,brand=kismet\n") + dummy.write( + "MAC,SSID,AuthMode,FirstSeen,Channel,RSSI,CurrentLatitude,CurrentLongitude,AltitudeMeters,AccuracyMeters,Type") writer = csv.writer(dummy, delimiter=",", quoting=csv.QUOTE_NONE, escapechar="\\") writer.writerow([ @@ -75,10 +55,11 @@ def _transform_wigle_entry(gps_data, pcap_data): gps_data['Latitude'], gps_data['Longitude'], gps_data['Altitude'], - 0, # accuracy? + 0, # accuracy? 'WIFI']) return dummy.getvalue() + def _send_to_wigle(lines, api_key, timeout=30): """ Uploads the file to wigle-net @@ -109,87 +90,100 @@ def _send_to_wigle(lines, api_key, timeout=30): raise re_e -def on_internet_available(agent): - from scapy.all import Scapy_Exception - """ - Called in manual mode when there's internet connectivity - """ - global REPORT - global SKIP +class Wigle(plugins.Plugin): + __author__ = '33197631+dadav@users.noreply.github.com' + __version__ = '2.0.0' + __license__ = 'GPL3' + __description__ = 'This plugin automatically uploads collected wifis to wigle.net' - if READY: - config = agent.config() - display = agent.view() - reported = REPORT.data_field_or('reported', default=list()) + def __init__(self): + self.ready = False + self.report = StatusFile('/root/.wigle_uploads', data_format='json') + self.skip = list() - handshake_dir = config['bettercap']['handshakes'] - all_files = os.listdir(handshake_dir) - all_gps_files = [os.path.join(handshake_dir, filename) - for filename in all_files - if filename.endswith('.gps.json')] - new_gps_files = set(all_gps_files) - set(reported) - set(SKIP) + def on_loaded(self): + if 'api_key' not in self.options or ('api_key' in self.options and self.options['api_key'] is None): + logging.error("WIGLE: api_key isn't set. Can't upload to wigle.net") + return + self.ready = True - if new_gps_files: - logging.info("WIGLE: Internet connectivity detected. Uploading new handshakes to wigle.net") + def on_internet_available(self, agent): + from scapy.all import Scapy_Exception + """ + Called in manual mode when there's internet connectivity + """ + if self.ready: + config = agent.config() + display = agent.view() + reported = self.report.data_field_or('reported', default=list()) - csv_entries = list() - no_err_entries = list() + handshake_dir = config['bettercap']['handshakes'] + all_files = os.listdir(handshake_dir) + all_gps_files = [os.path.join(handshake_dir, filename) + for filename in all_files + if filename.endswith('.gps.json')] + new_gps_files = set(all_gps_files) - set(reported) - set(self.skip) - for gps_file in new_gps_files: - pcap_filename = gps_file.replace('.gps.json', '.pcap') + if new_gps_files: + logging.info("WIGLE: Internet connectivity detected. Uploading new handshakes to wigle.net") - if not os.path.exists(pcap_filename): - logging.error("WIGLE: Can't find pcap for %s", gps_file) - SKIP.append(gps_file) - continue + csv_entries = list() + no_err_entries = list() - try: - gps_data = _extract_gps_data(gps_file) - except OSError as os_err: - logging.error("WIGLE: %s", os_err) - SKIP.append(gps_file) - continue - except json.JSONDecodeError as json_err: - logging.error("WIGLE: %s", json_err) - SKIP.append(gps_file) - continue + for gps_file in new_gps_files: + pcap_filename = gps_file.replace('.gps.json', '.pcap') - if gps_data['Latitude'] == 0 and gps_data['Longitude'] == 0: - logging.warning("WIGLE: Not enough gps-information for %s. Trying again next time.", gps_file) - SKIP.append(gps_file) - continue + if not os.path.exists(pcap_filename): + logging.error("WIGLE: Can't find pcap for %s", gps_file) + self.skip.append(gps_file) + continue + try: + gps_data = _extract_gps_data(gps_file) + except OSError as os_err: + logging.error("WIGLE: %s", os_err) + self.skip.append(gps_file) + continue + except json.JSONDecodeError as json_err: + logging.error("WIGLE: %s", json_err) + self.skip.append(gps_file) + continue - try: - pcap_data = extract_from_pcap(pcap_filename, [WifiInfo.BSSID, - WifiInfo.ESSID, - WifiInfo.ENCRYPTION, - WifiInfo.CHANNEL, - WifiInfo.RSSI]) - except FieldNotFoundError: - logging.error("WIGLE: Could not extract all information. Skip %s", gps_file) - SKIP.append(gps_file) - continue - except Scapy_Exception as sc_e: - logging.error("WIGLE: %s", sc_e) - SKIP.append(gps_file) - continue + if gps_data['Latitude'] == 0 and gps_data['Longitude'] == 0: + logging.warning("WIGLE: Not enough gps-information for %s. Trying again next time.", gps_file) + self.skip.append(gps_file) + continue - new_entry = _transform_wigle_entry(gps_data, pcap_data) - csv_entries.append(new_entry) - no_err_entries.append(gps_file) + try: + pcap_data = extract_from_pcap(pcap_filename, [WifiInfo.BSSID, + WifiInfo.ESSID, + WifiInfo.ENCRYPTION, + WifiInfo.CHANNEL, + WifiInfo.RSSI]) + except FieldNotFoundError: + logging.error("WIGLE: Could not extract all information. Skip %s", gps_file) + self.skip.append(gps_file) + continue + except Scapy_Exception as sc_e: + logging.error("WIGLE: %s", sc_e) + self.skip.append(gps_file) + continue - if csv_entries: - display.set('status', "Uploading gps-data to wigle.net ...") - display.update(force=True) - try: - _send_to_wigle(csv_entries, OPTIONS['api_key']) - reported += no_err_entries - REPORT.update(data={'reported': reported}) - logging.info("WIGLE: Successfully uploaded %d files", len(no_err_entries)) - except requests.exceptions.RequestException as re_e: - SKIP += no_err_entries - logging.error("WIGLE: Got an exception while uploading %s", re_e) - except OSError as os_e: - SKIP += no_err_entries - logging.error("WIGLE: Got the following error: %s", os_e) + new_entry = _transform_wigle_entry(gps_data, pcap_data) + csv_entries.append(new_entry) + no_err_entries.append(gps_file) + + if csv_entries: + display.set('status', "Uploading gps-data to wigle.net ...") + display.update(force=True) + try: + _send_to_wigle(csv_entries, self.options['api_key']) + reported += no_err_entries + self.report.update(data={'reported': reported}) + logging.info("WIGLE: Successfully uploaded %d files", len(no_err_entries)) + except requests.exceptions.RequestException as re_e: + self.skip += no_err_entries + logging.error("WIGLE: Got an exception while uploading %s", re_e) + except OSError as os_e: + self.skip += no_err_entries + logging.error("WIGLE: Got the following error: %s", os_e) diff --git a/pwnagotchi/plugins/default/wpa-sec.py b/pwnagotchi/plugins/default/wpa-sec.py index 5b13f31..ee99f10 100644 --- a/pwnagotchi/plugins/default/wpa-sec.py +++ b/pwnagotchi/plugins/default/wpa-sec.py @@ -1,87 +1,84 @@ -__author__ = '33197631+dadav@users.noreply.github.com' -__version__ = '2.0.1' -__name__ = 'wpa-sec' -__license__ = 'GPL3' -__description__ = 'This plugin automatically uploads handshakes to https://wpa-sec.stanev.org' - import os import logging import requests from pwnagotchi.utils import StatusFile - -READY = False -REPORT = StatusFile('/root/.wpa_sec_uploads', data_format='json') -OPTIONS = dict() -SKIP = list() +import pwnagotchi.plugins as plugins -def on_loaded(): - """ - Gets called when the plugin gets loaded - """ - global READY +class WpaSec(plugins.Plugin): + __author__ = '33197631+dadav@users.noreply.github.com' + __version__ = '2.0.1' + __license__ = 'GPL3' + __description__ = 'This plugin automatically uploads handshakes to https://wpa-sec.stanev.org' - if 'api_key' not in OPTIONS or ('api_key' in OPTIONS and OPTIONS['api_key'] is None): - logging.error("WPA_SEC: API-KEY isn't set. Can't upload to wpa-sec.stanev.org") - return + def __init__(self): + self.ready = False + self.report = StatusFile('/root/.wpa_sec_uploads', data_format='json') + self.options = dict() + self.skip = list() - if 'api_url' not in OPTIONS or ('api_url' in OPTIONS and OPTIONS['api_url'] is None): - logging.error("WPA_SEC: API-URL isn't set. Can't upload, no endpoint configured.") - return - - READY = True + def _upload_to_wpasec(self, path, timeout=30): + """ + Uploads the file to https://wpa-sec.stanev.org, or another endpoint. + """ + with open(path, 'rb') as file_to_upload: + cookie = {'key': self.options['api_key']} + payload = {'file': file_to_upload} + try: + result = requests.post(self.options['api_url'], + cookies=cookie, + files=payload, + timeout=timeout) + if ' already submitted' in result.text: + logging.warning("%s was already submitted.", path) + except requests.exceptions.RequestException as req_e: + raise req_e -def _upload_to_wpasec(path, timeout=30): - """ - Uploads the file to https://wpa-sec.stanev.org, or another endpoint. - """ - with open(path, 'rb') as file_to_upload: - cookie = {'key': OPTIONS['api_key']} - payload = {'file': file_to_upload} + def on_loaded(self): + """ + Gets called when the plugin gets loaded + """ + if 'api_key' not in self.options or ('api_key' in self.options and self.options['api_key'] is None): + logging.error("WPA_SEC: API-KEY isn't set. Can't upload to wpa-sec.stanev.org") + return - try: - result = requests.post(OPTIONS['api_url'], - cookies=cookie, - files=payload, - timeout=timeout) - if ' already submitted' in result.text: - logging.warning("%s was already submitted.", path) - except requests.exceptions.RequestException as req_e: - raise req_e + if 'api_url' not in self.options or ('api_url' in self.options and self.options['api_url'] is None): + logging.error("WPA_SEC: API-URL isn't set. Can't upload, no endpoint configured.") + return + self.ready = True -def on_internet_available(agent): - """ - Called in manual mode when there's internet connectivity - """ - global REPORT - global SKIP - if READY: - config = agent.config() - display = agent.view() - reported = REPORT.data_field_or('reported', default=list()) + def on_internet_available(self, agent): + """ + Called in manual mode when there's internet connectivity + """ + if self.ready: + config = agent.config() + display = agent.view() + reported = self.report.data_field_or('reported', default=list()) - handshake_dir = config['bettercap']['handshakes'] - handshake_filenames = os.listdir(handshake_dir) - handshake_paths = [os.path.join(handshake_dir, filename) for filename in handshake_filenames if filename.endswith('.pcap')] - handshake_new = set(handshake_paths) - set(reported) - set(SKIP) + handshake_dir = config['bettercap']['handshakes'] + handshake_filenames = os.listdir(handshake_dir) + handshake_paths = [os.path.join(handshake_dir, filename) for filename in handshake_filenames if + filename.endswith('.pcap')] + handshake_new = set(handshake_paths) - set(reported) - set(self.skip) - if handshake_new: - logging.info("WPA_SEC: Internet connectivity detected. Uploading new handshakes to wpa-sec.stanev.org") + if handshake_new: + logging.info("WPA_SEC: Internet connectivity detected. Uploading new handshakes to wpa-sec.stanev.org") - for idx, handshake in enumerate(handshake_new): - display.set('status', f"Uploading handshake to wpa-sec.stanev.org ({idx + 1}/{len(handshake_new)})") - display.update(force=True) - try: - _upload_to_wpasec(handshake) - reported.append(handshake) - REPORT.update(data={'reported': reported}) - logging.info("WPA_SEC: Successfully uploaded %s", handshake) - except requests.exceptions.RequestException as req_e: - SKIP.append(handshake) - logging.error("WPA_SEC: %s", req_e) - continue - except OSError as os_e: - logging.error("WPA_SEC: %s", os_e) - continue + for idx, handshake in enumerate(handshake_new): + display.set('status', f"Uploading handshake to wpa-sec.stanev.org ({idx + 1}/{len(handshake_new)})") + display.update(force=True) + try: + self._upload_to_wpasec(handshake) + reported.append(handshake) + self.report.update(data={'reported': reported}) + logging.info("WPA_SEC: Successfully uploaded %s", handshake) + except requests.exceptions.RequestException as req_e: + self.skip.append(handshake) + logging.error("WPA_SEC: %s", req_e) + continue + except OSError as os_e: + logging.error("WPA_SEC: %s", os_e) + continue