new: implemented plugin system (closes #54)
This commit is contained in:
parent
00d78248d8
commit
47042f0946
@ -148,6 +148,12 @@ Now you can use the `preview.py`-script to preview the changes:
|
||||
# Now open http://localhost:8080 and http://localhost:8081
|
||||
```
|
||||
|
||||
### Plugins
|
||||
|
||||
Pwnagotchi has a simple plugins system that you can use to customize your unit and its behaviour. You can place your plugins anywhere
|
||||
as python files and then edit the `config.yml` file (`main.plugins` value) to point to their containing folder. Check the [plugins folder](https://github.com/evilsocket/pwnagotchi/tree/master/sdcard/rootfs/root/pwnagotchi/scripts/pwnagotchi/plugins) for a list of default
|
||||
plugins and all the callbacks that you can define for your own customizations.
|
||||
|
||||
### Random Info
|
||||
|
||||
- `hostname` sets the unit name.
|
||||
|
@ -2,6 +2,8 @@
|
||||
main:
|
||||
# currently implemented: en (default), de, nl, it
|
||||
lang: en
|
||||
# custom plugins path, if null only default plugins with be loaded
|
||||
plugins: null
|
||||
# monitor interface to use
|
||||
iface: mon0
|
||||
# command to run to bring the mon interface up in case it's not up already
|
||||
@ -21,7 +23,7 @@ main:
|
||||
|
||||
ai:
|
||||
# if false, only the default 'personality' will be used
|
||||
enabled: true
|
||||
enabled: false
|
||||
path: /root/brain.nn
|
||||
# 1.0 - laziness = probability of start training
|
||||
laziness: 0.1
|
||||
|
@ -5,7 +5,7 @@ import time
|
||||
import traceback
|
||||
|
||||
import core
|
||||
import pwnagotchi
|
||||
import pwnagotchi, pwnagotchi.plugins as plugins
|
||||
|
||||
from pwnagotchi.log import SessionParser
|
||||
from pwnagotchi.voice import Voice
|
||||
@ -25,39 +25,49 @@ args = parser.parse_args()
|
||||
if args.do_clear:
|
||||
print("clearing the display ...")
|
||||
with open(args.config, 'rt') as fp:
|
||||
config = yaml.safe_load(fp)
|
||||
cleardisplay=config['ui']['display']['type']
|
||||
if cleardisplay in ('inkyphat', 'inky'):
|
||||
print("inky display")
|
||||
from inky import InkyPHAT
|
||||
epd = InkyPHAT(config['ui']['display']['color'])
|
||||
epd.set_border(InkyPHAT.BLACK)
|
||||
self._render_cb = self._inky_render
|
||||
elif cleardisplay in ('papirus', 'papi'):
|
||||
print("papirus display")
|
||||
from pwnagotchi.ui.papirus.epd import EPD
|
||||
os.environ['EPD_SIZE'] = '2.0'
|
||||
epd = EPD()
|
||||
epd.clear()
|
||||
elif cleardisplay in ('waveshare_1', 'ws_1', 'waveshare1', 'ws1'):
|
||||
print("waveshare v1 display")
|
||||
from pwnagotchi.ui.waveshare.v1.epd2in13 import EPD
|
||||
epd = EPD()
|
||||
epd.init(epd.lut_full_update)
|
||||
epd.Clear(0xFF)
|
||||
elif cleardisplay in ('waveshare_2', 'ws_2', 'waveshare2', 'ws2'):
|
||||
print("waveshare v2 display")
|
||||
from pwnagotchi.ui.waveshare.v2.waveshare import EPD
|
||||
epd = EPD()
|
||||
epd.init(epd.FULL_UPDATE)
|
||||
epd.Clear(0xff)
|
||||
else:
|
||||
print("unknown display type %s" % cleardisplay)
|
||||
quit()
|
||||
config = yaml.safe_load(fp)
|
||||
cleardisplay = config['ui']['display']['type']
|
||||
if cleardisplay in ('inkyphat', 'inky'):
|
||||
print("inky display")
|
||||
from inky import InkyPHAT
|
||||
|
||||
epd = InkyPHAT(config['ui']['display']['color'])
|
||||
epd.set_border(InkyPHAT.BLACK)
|
||||
self._render_cb = self._inky_render
|
||||
elif cleardisplay in ('papirus', 'papi'):
|
||||
print("papirus display")
|
||||
from pwnagotchi.ui.papirus.epd import EPD
|
||||
|
||||
os.environ['EPD_SIZE'] = '2.0'
|
||||
epd = EPD()
|
||||
epd.clear()
|
||||
elif cleardisplay in ('waveshare_1', 'ws_1', 'waveshare1', 'ws1'):
|
||||
print("waveshare v1 display")
|
||||
from pwnagotchi.ui.waveshare.v1.epd2in13 import EPD
|
||||
|
||||
epd = EPD()
|
||||
epd.init(epd.lut_full_update)
|
||||
epd.Clear(0xFF)
|
||||
elif cleardisplay in ('waveshare_2', 'ws_2', 'waveshare2', 'ws2'):
|
||||
print("waveshare v2 display")
|
||||
from pwnagotchi.ui.waveshare.v2.waveshare import EPD
|
||||
|
||||
epd = EPD()
|
||||
epd.init(epd.FULL_UPDATE)
|
||||
epd.Clear(0xff)
|
||||
else:
|
||||
print("unknown display type %s" % cleardisplay)
|
||||
quit()
|
||||
|
||||
with open(args.config, 'rt') as fp:
|
||||
config = yaml.safe_load(fp)
|
||||
|
||||
plugins.load_from_path(plugins.default_path)
|
||||
if 'plugins' in config['main'] and config['main']['plugins'] is not None:
|
||||
plugins.load_from_path(config['main']['plugins'])
|
||||
|
||||
plugins.on('loaded')
|
||||
|
||||
display = Display(config=config, state={'name': '%s>' % pwnagotchi.name()})
|
||||
agent = Agent(view=display, config=config)
|
||||
|
||||
@ -65,6 +75,9 @@ core.log("%s@%s (v%s)" % (pwnagotchi.name(), agent._identity, pwnagotchi.version
|
||||
# for key, value in config['personality'].items():
|
||||
# core.log(" %s: %s" % (key, value))
|
||||
|
||||
for _, plugin in plugins.loaded.items():
|
||||
core.log("plugin '%s' v%s loaded from %s" % (plugin.__name__, plugin.__version__, plugin.__file__))
|
||||
|
||||
if args.do_manual:
|
||||
core.log("entering manual mode ...")
|
||||
|
||||
@ -112,13 +125,15 @@ core.logfile = config['main']['log']
|
||||
|
||||
agent.start_ai()
|
||||
agent.setup_events()
|
||||
agent.set_ready()
|
||||
agent.set_starting()
|
||||
agent.start_monitor_mode()
|
||||
agent.start_event_polling()
|
||||
|
||||
# print initial stats
|
||||
agent.next_epoch()
|
||||
|
||||
agent.set_ready()
|
||||
|
||||
while True:
|
||||
try:
|
||||
# recon on all channels
|
||||
|
@ -8,6 +8,7 @@ import _thread
|
||||
|
||||
import core
|
||||
|
||||
import pwnagotchi.plugins as plugins
|
||||
from bettercap.client import Client
|
||||
from pwnagotchi.mesh.utils import AsyncAdvertiser
|
||||
from pwnagotchi.ai.train import AsyncTrainer
|
||||
@ -47,29 +48,35 @@ class Agent(Client, AsyncAdvertiser, AsyncTrainer):
|
||||
def supported_channels(self):
|
||||
return self._supported_channels
|
||||
|
||||
def on_ai_ready(self):
|
||||
self._view.on_ai_ready()
|
||||
def set_starting(self):
|
||||
self._view.on_starting()
|
||||
|
||||
def set_ready(self):
|
||||
self._view.on_starting()
|
||||
plugins.on('ready', self)
|
||||
|
||||
def set_free_channel(self, channel):
|
||||
self._view.on_free_channel(channel)
|
||||
plugins.on('free_channel', self, channel)
|
||||
|
||||
def set_bored(self):
|
||||
self._view.on_bored()
|
||||
plugins.on('bored', self)
|
||||
|
||||
def set_sad(self):
|
||||
self._view.on_sad()
|
||||
plugins.on('sad', self)
|
||||
|
||||
def set_excited(self):
|
||||
self._view.on_excited()
|
||||
plugins.on('excited', self)
|
||||
|
||||
def set_lonely(self):
|
||||
self._view.on_lonely()
|
||||
plugins.on('lonely', self)
|
||||
|
||||
def set_rebooting(self):
|
||||
self._view.on_rebooting()
|
||||
plugins.on('rebooting', self)
|
||||
|
||||
def setup_events(self):
|
||||
core.log("connecting to %s ..." % self.url)
|
||||
@ -128,6 +135,7 @@ class Agent(Client, AsyncAdvertiser, AsyncTrainer):
|
||||
self.start_advertising()
|
||||
|
||||
def wait_for(self, t, sleeping=True):
|
||||
plugins.on('sleep' if sleeping else 'wait', self, t)
|
||||
self._view.wait(t, sleeping)
|
||||
self._epoch.track(sleep=True, inc=t)
|
||||
|
||||
@ -179,6 +187,7 @@ class Agent(Client, AsyncAdvertiser, AsyncTrainer):
|
||||
|
||||
def set_access_points(self, aps):
|
||||
self._access_points = aps
|
||||
plugins.on('wifi_update', self, aps)
|
||||
self._epoch.observe(aps, self._advertiser.peers() if self._advertiser is not None else ())
|
||||
return self._access_points
|
||||
|
||||
@ -338,6 +347,7 @@ class Agent(Client, AsyncAdvertiser, AsyncTrainer):
|
||||
if apsta is None:
|
||||
core.log("!!! captured new handshake: %s !!!" % key)
|
||||
self._last_pwnd = ap_mac
|
||||
plugins.on('handshake', self, ap_mac, sta_mac)
|
||||
else:
|
||||
(ap, sta) = apsta
|
||||
self._last_pwnd = ap['hostname'] if ap['hostname'] != '' and ap[
|
||||
@ -346,6 +356,7 @@ class Agent(Client, AsyncAdvertiser, AsyncTrainer):
|
||||
ap['channel'],
|
||||
sta['mac'], sta['vendor'],
|
||||
ap['hostname'], ap['mac'], ap['vendor']))
|
||||
plugins.on('handshake', self, ap, sta)
|
||||
|
||||
except Exception as e:
|
||||
core.log("error: %s" % e)
|
||||
@ -419,6 +430,7 @@ class Agent(Client, AsyncAdvertiser, AsyncTrainer):
|
||||
except Exception as e:
|
||||
self._on_error(ap['mac'], e)
|
||||
|
||||
plugins.on('association', self, ap)
|
||||
if throttle > 0:
|
||||
time.sleep(throttle)
|
||||
self._view.on_normal()
|
||||
@ -439,6 +451,7 @@ class Agent(Client, AsyncAdvertiser, AsyncTrainer):
|
||||
except Exception as e:
|
||||
self._on_error(sta['mac'], e)
|
||||
|
||||
plugins.on('deauthentication', self, ap, sta)
|
||||
if throttle > 0:
|
||||
time.sleep(throttle)
|
||||
self._view.on_normal()
|
||||
@ -470,6 +483,9 @@ class Agent(Client, AsyncAdvertiser, AsyncTrainer):
|
||||
self._current_channel = channel
|
||||
self._epoch.track(hop=True)
|
||||
self._view.set('channel', '%d' % channel)
|
||||
|
||||
plugins.on('channel_hop', self, channel)
|
||||
|
||||
except Exception as e:
|
||||
core.log("error: %s" % e)
|
||||
|
||||
@ -509,6 +525,8 @@ class Agent(Client, AsyncAdvertiser, AsyncTrainer):
|
||||
core.log("%d epochs with activity -> excited" % self._epoch.active_for)
|
||||
self.set_excited()
|
||||
|
||||
plugins.on('epoch', self, self._epoch.epoch - 1, self._epoch.data())
|
||||
|
||||
if self._epoch.blind_for >= self._config['main']['mon_max_blind_epochs']:
|
||||
core.log("%d epochs without visible access points -> rebooting ..." % self._epoch.blind_for)
|
||||
self._reboot()
|
||||
|
@ -7,6 +7,7 @@ import pwnagotchi.mesh.wifi as wifi
|
||||
|
||||
from pwnagotchi.ai.reward import RewardFunction
|
||||
|
||||
|
||||
class Epoch(object):
|
||||
def __init__(self, config):
|
||||
self.epoch = 0
|
||||
@ -92,7 +93,8 @@ class Epoch(object):
|
||||
try:
|
||||
peers_per_chan[peer.last_channel - 1] += 1.0
|
||||
except IndexError as e:
|
||||
core.log("got peer data on channel %d, we can store %d channels" % (peer.last_channel, wifi.NumChannels))
|
||||
core.log(
|
||||
"got peer data on channel %d, we can store %d channels" % (peer.last_channel, wifi.NumChannels))
|
||||
|
||||
# normalize
|
||||
aps_per_chan = [e / num_aps for e in aps_per_chan]
|
||||
|
@ -7,6 +7,7 @@ import json
|
||||
|
||||
import core
|
||||
|
||||
import pwnagotchi.plugins as plugins
|
||||
import pwnagotchi.ai as ai
|
||||
from pwnagotchi.ai.epoch import Epoch
|
||||
|
||||
@ -68,14 +69,14 @@ class Stats(object):
|
||||
core.log("[ai] saving %s" % self.path)
|
||||
|
||||
data = json.dumps({
|
||||
'born_at': self.born_at,
|
||||
'epochs_lived': self.epochs_lived,
|
||||
'epochs_trained': self.epochs_trained,
|
||||
'rewards': {
|
||||
'best': self.best_reward,
|
||||
'worst': self.worst_reward
|
||||
}
|
||||
})
|
||||
'born_at': self.born_at,
|
||||
'epochs_lived': self.epochs_lived,
|
||||
'epochs_trained': self.epochs_trained,
|
||||
'rewards': {
|
||||
'best': self.best_reward,
|
||||
'worst': self.worst_reward
|
||||
}
|
||||
})
|
||||
|
||||
temp = "%s.tmp" % self.path
|
||||
with open(temp, 'wt') as fp:
|
||||
@ -98,6 +99,11 @@ class AsyncTrainer(object):
|
||||
self._is_training = training
|
||||
self._training_epochs = for_epochs
|
||||
|
||||
if training:
|
||||
plugins.on('ai_training_start', self, for_epochs)
|
||||
else:
|
||||
plugins.on('ai_training_end', self)
|
||||
|
||||
def is_training(self):
|
||||
return self._is_training
|
||||
|
||||
@ -123,8 +129,10 @@ class AsyncTrainer(object):
|
||||
|
||||
def on_ai_training_step(self, _locals, _globals):
|
||||
self._model.env.render()
|
||||
plugins.on('ai_training_step', self, _locals, _globals)
|
||||
|
||||
def on_ai_policy(self, new_params):
|
||||
plugins.on('ai_policy', self, new_params)
|
||||
core.log("[ai] setting new policy:")
|
||||
for name, value in new_params.items():
|
||||
if name in self._config['personality']:
|
||||
@ -139,13 +147,19 @@ class AsyncTrainer(object):
|
||||
self.run('set wifi.sta.ttl %d' % self._config['personality']['sta_ttl'])
|
||||
self.run('set wifi.rssi.min %d' % self._config['personality']['min_rssi'])
|
||||
|
||||
def on_ai_ready(self):
|
||||
self._view.on_ai_ready()
|
||||
plugins.on('ai_ready', self)
|
||||
|
||||
def on_ai_best_reward(self, r):
|
||||
core.log("[ai] best reward so far: %s" % r)
|
||||
self._view.on_motivated(r)
|
||||
plugins.on('ai_best_reward', self, r)
|
||||
|
||||
def on_ai_worst_reward(self, r):
|
||||
core.log("[ai] worst reward so far: %s" % r)
|
||||
self._view.on_demotivated(r)
|
||||
plugins.on('ai_worst_reward', self, r)
|
||||
|
||||
def _ai_worker(self):
|
||||
self._model = ai.load(self._config, self, self._epoch)
|
||||
|
@ -1,7 +1,7 @@
|
||||
import _thread
|
||||
|
||||
import core
|
||||
import pwnagotchi
|
||||
import pwnagotchi, pwnagotchi.plugins as plugins
|
||||
from pwnagotchi.mesh import get_identity
|
||||
|
||||
|
||||
@ -37,6 +37,8 @@ class AsyncAdvertiser(object):
|
||||
|
||||
def _on_new_unit(self, peer):
|
||||
self._view.on_new_peer(peer)
|
||||
plugins.on('peer_detected', self, peer)
|
||||
|
||||
def _on_lost_unit(self, peer):
|
||||
self._view.on_lost_peer(peer)
|
||||
plugins.on('peer_lost', self, peer)
|
||||
|
@ -0,0 +1,45 @@
|
||||
import os
|
||||
import glob
|
||||
import importlib, importlib.util
|
||||
|
||||
# import core
|
||||
|
||||
default_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), "default")
|
||||
loaded = {}
|
||||
|
||||
|
||||
def dummy_callback():
|
||||
pass
|
||||
|
||||
|
||||
def on(event_name, *args, **kwargs):
|
||||
global loaded
|
||||
cb_name = 'on_%s' % event_name
|
||||
for _, plugin in loaded.items():
|
||||
if cb_name in plugin.__dict__:
|
||||
# print("calling %s %s(%s)" %(cb_name, args, kwargs))
|
||||
plugin.__dict__[cb_name](*args, **kwargs)
|
||||
|
||||
|
||||
def load_from_file(filename):
|
||||
plugin_name = os.path.basename(filename.replace(".py", ""))
|
||||
spec = importlib.util.spec_from_file_location(plugin_name, filename)
|
||||
instance = importlib.util.module_from_spec(spec)
|
||||
spec.loader.exec_module(instance)
|
||||
return plugin_name, instance
|
||||
|
||||
|
||||
def load_from_path(path):
|
||||
global loaded
|
||||
|
||||
for filename in glob.glob(os.path.join(path, "*.py")):
|
||||
name, plugin = load_from_file(filename)
|
||||
if name in loaded:
|
||||
raise Exception("plugin %s already loaded from %s" % (name, plugin.__file__))
|
||||
elif not plugin.__enabled__:
|
||||
# print("plugin %s is not enabled" % name)
|
||||
pass
|
||||
else:
|
||||
loaded[name] = plugin
|
||||
|
||||
return loaded
|
@ -0,0 +1,162 @@
|
||||
__author__ = 'evilsocket@gmail.com'
|
||||
__version__ = '1.0.0'
|
||||
__name__ = 'hello_world'
|
||||
__license__ = 'GPL3'
|
||||
__description__ = 'An example plugin for pwnagotchi that implements all the available callbacks.'
|
||||
__enabled__ = False # IMPORTANT: set this to True to enable your plugin.
|
||||
|
||||
from pwnagotchi.ui.components import LabeledValue
|
||||
from pwnagotchi.ui.view import BLACK
|
||||
import pwnagotchi.ui.fonts as fonts
|
||||
import core
|
||||
|
||||
|
||||
# called when the plugin is loaded
|
||||
def on_loaded():
|
||||
core.log("WARNING: plugin %s should be disabled!" % __name__)
|
||||
|
||||
|
||||
# called to setup the ui elements
|
||||
def on_ui_setup(ui):
|
||||
# add custom UI elements
|
||||
ui.add_element('ups', LabeledValue(color=BLACK, label='UPS', value='0%/0V', position=(ui.width() / 2 - 25, 0),
|
||||
label_font=fonts.Bold, text_font=fonts.Medium))
|
||||
|
||||
|
||||
# called when the ui is updated
|
||||
def on_ui_update(ui):
|
||||
# update those elements
|
||||
some_voltage = 0.1
|
||||
some_capacity = 100.0
|
||||
|
||||
ui.set('ups', "%4.2fV/%2i%%" % (some_voltage, some_capacity))
|
||||
|
||||
|
||||
# called when the hardware display setup is done, display is an hardware specific object
|
||||
def on_display_setup(display):
|
||||
pass
|
||||
|
||||
|
||||
# called when everything is ready and the main loop is about to start
|
||||
def on_ready(agent):
|
||||
core.log("unit is ready")
|
||||
# you can run custom bettercap commands if you want
|
||||
# agent.run('ble.recon on')
|
||||
# or set a custom state
|
||||
# agent.set_bored()
|
||||
|
||||
|
||||
# called when the AI finished loading
|
||||
def on_ai_ready(agent):
|
||||
pass
|
||||
|
||||
|
||||
# called when the AI finds a new set of parameters
|
||||
def on_ai_policy(agent, policy):
|
||||
pass
|
||||
|
||||
|
||||
# called when the AI starts training for a given number of epochs
|
||||
def on_ai_training_start(agent, epochs):
|
||||
pass
|
||||
|
||||
|
||||
# called after the AI completed a training epoch
|
||||
def on_ai_training_step(agent, _locals, _globals):
|
||||
pass
|
||||
|
||||
|
||||
# called when the AI has done training
|
||||
def on_ai_training_end(agent):
|
||||
pass
|
||||
|
||||
|
||||
# called when the AI got the best reward so far
|
||||
def on_ai_best_reward(agent, reward):
|
||||
pass
|
||||
|
||||
|
||||
# called when the AI got the best reward so far
|
||||
def on_ai_worst_reward(agent, reward):
|
||||
pass
|
||||
|
||||
|
||||
# called when a non overlapping wifi channel is found to be free
|
||||
def on_free_channel(agent, channel):
|
||||
pass
|
||||
|
||||
|
||||
# called when the status is set to bored
|
||||
def on_bored(agent):
|
||||
pass
|
||||
|
||||
|
||||
# called when the status is set to sad
|
||||
def on_sad(agent):
|
||||
pass
|
||||
|
||||
|
||||
# called when the status is set to excited
|
||||
def on_excited(agent):
|
||||
pass
|
||||
|
||||
|
||||
# called when the status is set to lonely
|
||||
def on_lonely(agent):
|
||||
pass
|
||||
|
||||
|
||||
# called when the agent is rebooting the board
|
||||
def on_rebooting(agent):
|
||||
pass
|
||||
|
||||
|
||||
# called when the agent is waiting for t seconds
|
||||
def on_wait(agent, t):
|
||||
pass
|
||||
|
||||
|
||||
# called when the agent is sleeping for t seconds
|
||||
def on_sleep(agent, t):
|
||||
pass
|
||||
|
||||
|
||||
# called when the agent refreshed its access points list
|
||||
def on_wifi_update(agent, access_points):
|
||||
pass
|
||||
|
||||
|
||||
# called when the agent is sending an association frame
|
||||
def on_association(agent, access_point):
|
||||
pass
|
||||
|
||||
|
||||
# callend when the agent is deauthenticating a client station from an AP
|
||||
def on_deauthentication(agent, access_point, client_station):
|
||||
pass
|
||||
|
||||
|
||||
# callend when the agent is tuning on a specific channel
|
||||
def on_channel_hop(agent, channel):
|
||||
pass
|
||||
|
||||
|
||||
# called when a new handshake is captured, access_point and client_station are json objects
|
||||
# if the agent could match the BSSIDs to the current list, otherwise they are just the strings of the BSSIDs
|
||||
def on_handshake(agent, access_point, client_station):
|
||||
pass
|
||||
|
||||
|
||||
# called when an epoch is over (where an epoch is a single loop of the main algorithm)
|
||||
def on_epoch(agent, epoch, epoch_data):
|
||||
pass
|
||||
|
||||
|
||||
# called when a new peer is detected
|
||||
def on_peer_detected(agent, peer):
|
||||
pass
|
||||
|
||||
|
||||
# called when a known peer is lost
|
||||
def on_peer_lost(agent, peer):
|
||||
pass
|
@ -0,0 +1,65 @@
|
||||
# Based on UPS Lite v1.1 from https://github.com/xenDE
|
||||
#
|
||||
# funtions for get UPS status - needs enable "i2c" in raspi-config
|
||||
#
|
||||
# https://github.com/linshuqin329/UPS-Lite
|
||||
#
|
||||
# For Raspberry Pi Zero Ups Power Expansion Board with Integrated Serial Port S3U4
|
||||
# https://www.ebay.de/itm/For-Raspberry-Pi-Zero-Ups-Power-Expansion-Board-with-Integrated-Serial-Port-S3U4/323873804310
|
||||
# https://www.aliexpress.com/item/32888533624.html
|
||||
__author__ = 'evilsocket@gmail.com'
|
||||
__version__ = '1.0.0'
|
||||
__name__ = 'ups_lite'
|
||||
__license__ = 'GPL3'
|
||||
__description__ = 'A plugin that will add a voltage indicator for the UPS Lite v1.1'
|
||||
__enabled__ = False
|
||||
|
||||
import struct
|
||||
|
||||
from pwnagotchi.ui.components import LabeledValue
|
||||
from pwnagotchi.ui.view import BLACK
|
||||
import pwnagotchi.ui.fonts as fonts
|
||||
|
||||
|
||||
# TODO: add enable switch in config.yml an cleanup all to the best place
|
||||
class UPS:
|
||||
def __init__(self):
|
||||
# only import when the module is loaded and enabled
|
||||
import smbus
|
||||
# 0 = /dev/i2c-0 (port I2C0), 1 = /dev/i2c-1 (port I2C1)
|
||||
self._bus = smbus.SMBus(1)
|
||||
|
||||
def voltage(self):
|
||||
try:
|
||||
address = 0x36
|
||||
read = self._bus.read_word_data(address, 2)
|
||||
swapped = struct.unpack("<H", struct.pack(">H", read))[0]
|
||||
return swapped * 1.25 / 1000 / 16
|
||||
except:
|
||||
return 0.0
|
||||
|
||||
def capacity(self):
|
||||
try:
|
||||
address = 0x36
|
||||
read = self._bus.read_word_data(address, 4)
|
||||
swapped = struct.unpack("<H", struct.pack(">H", read))[0]
|
||||
return swapped / 256
|
||||
except:
|
||||
return 0.0
|
||||
|
||||
|
||||
ups = None
|
||||
|
||||
|
||||
def on_loaded():
|
||||
global ups
|
||||
ups = UPS()
|
||||
|
||||
|
||||
def on_ui_setup(ui):
|
||||
ui.add_element('ups', LabeledValue(color=BLACK, label='UPS', value='0%/0V', position=(ui.width() / 2 - 25, 0),
|
||||
label_font=fonts.Bold, text_font=fonts.Medium))
|
||||
|
||||
|
||||
def on_ui_update(ui):
|
||||
ui.set('ups', "%4.2fV/%2i%%" % (ups.voltage(), ups.capacity()))
|
@ -4,7 +4,7 @@ from threading import Lock
|
||||
import shutil
|
||||
import core
|
||||
import os
|
||||
import pwnagotchi
|
||||
import pwnagotchi, pwnagotchi.plugins as plugins
|
||||
|
||||
from pwnagotchi.ui.view import WHITE, View
|
||||
|
||||
@ -146,6 +146,8 @@ class Display(View):
|
||||
else:
|
||||
core.log("unknown display type %s" % self._display_type)
|
||||
|
||||
plugins.on('display_setup', self._display)
|
||||
|
||||
self.on_render(self._on_view_rendered)
|
||||
|
||||
def image(self):
|
||||
|
@ -7,6 +7,9 @@ class State(object):
|
||||
self._lock = Lock()
|
||||
self._listeners = {}
|
||||
|
||||
def add_element(self, key, elem):
|
||||
self._state[key] = elem
|
||||
|
||||
def add_listener(self, key, cb):
|
||||
with self._lock:
|
||||
self._listeners[key] = cb
|
||||
|
@ -4,7 +4,7 @@ import time
|
||||
from PIL import Image, ImageDraw
|
||||
|
||||
import core
|
||||
import pwnagotchi
|
||||
import pwnagotchi.plugins as plugins
|
||||
from pwnagotchi.voice import Voice
|
||||
|
||||
import pwnagotchi.ui.fonts as fonts
|
||||
@ -41,7 +41,7 @@ def setup_display_specifics(config):
|
||||
name_pos = (int(width / 2) - 15, int(height * .15))
|
||||
status_pos = (int(width / 2) - 15, int(height * .30))
|
||||
|
||||
elif config['ui']['display']['type'] in ('ws_1', 'ws1', 'waveshare_1', 'waveshare1',
|
||||
elif config['ui']['display']['type'] in ('ws_1', 'ws1', 'waveshare_1', 'waveshare1',
|
||||
'ws_2', 'ws2', 'waveshare_2', 'waveshare2'):
|
||||
fonts.setup(10, 9, 10, 35)
|
||||
|
||||
@ -105,8 +105,19 @@ class View(object):
|
||||
for key, value in state.items():
|
||||
self._state.set(key, value)
|
||||
|
||||
plugins.on('ui_setup', self)
|
||||
|
||||
_thread.start_new_thread(self._refresh_handler, ())
|
||||
|
||||
def add_element(self, key, elem):
|
||||
self._state.add_element(key, elem)
|
||||
|
||||
def width(self):
|
||||
return self._width
|
||||
|
||||
def height(self):
|
||||
return self._height
|
||||
|
||||
def on_state_change(self, key, cb):
|
||||
self._state.add_listener(key, cb)
|
||||
|
||||
@ -294,6 +305,8 @@ class View(object):
|
||||
self._canvas = Image.new('1', (self._width, self._height), WHITE)
|
||||
drawer = ImageDraw.Draw(self._canvas)
|
||||
|
||||
plugins.on('ui_update', self)
|
||||
|
||||
for key, lv in self._state.items():
|
||||
lv.draw(self._canvas, drawer)
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user