340 lines
11 KiB
Python
340 lines
11 KiB
Python
import _thread
|
|
from threading import Lock
|
|
import time
|
|
import logging
|
|
from PIL import ImageDraw
|
|
|
|
import pwnagotchi.utils as utils
|
|
import pwnagotchi.plugins as plugins
|
|
from pwnagotchi.voice import Voice
|
|
|
|
import pwnagotchi.ui.fonts as fonts
|
|
import pwnagotchi.ui.faces as faces
|
|
from pwnagotchi.ui.components import *
|
|
from pwnagotchi.ui.state import State
|
|
|
|
WHITE = 0xff
|
|
BLACK = 0x00
|
|
ROOT = None
|
|
|
|
|
|
class View(object):
|
|
def __init__(self, config, impl, state=None):
|
|
global ROOT
|
|
|
|
# setup faces from the configuration in case the user customized them
|
|
faces.load_from_config(config['ui']['faces'])
|
|
|
|
self._render_cbs = []
|
|
self._config = config
|
|
self._canvas = None
|
|
self._frozen = False
|
|
self._lock = Lock()
|
|
self._voice = Voice(lang=config['main']['lang'])
|
|
self._implementation = impl
|
|
self._layout = impl.layout()
|
|
self._width = self._layout['width']
|
|
self._height = self._layout['height']
|
|
self._state = State(state={
|
|
'channel': LabeledValue(color=BLACK, label='CH', value='00', position=self._layout['channel'],
|
|
label_font=fonts.Bold,
|
|
text_font=fonts.Medium),
|
|
'aps': LabeledValue(color=BLACK, label='APS', value='0 (00)', position=self._layout['aps'],
|
|
label_font=fonts.Bold,
|
|
text_font=fonts.Medium),
|
|
|
|
'uptime': LabeledValue(color=BLACK, label='UP', value='00:00:00', position=self._layout['uptime'],
|
|
label_font=fonts.Bold,
|
|
text_font=fonts.Medium),
|
|
|
|
'line1': Line(self._layout['line1'], color=BLACK),
|
|
'line2': Line(self._layout['line2'], color=BLACK),
|
|
|
|
'face': Text(value=faces.SLEEP, position=self._layout['face'], color=BLACK, font=fonts.Huge),
|
|
|
|
'friend_face': Text(value=None, position=self._layout['friend_face'], font=fonts.Bold, color=BLACK),
|
|
'friend_name': Text(value=None, position=self._layout['friend_name'], font=fonts.BoldSmall,
|
|
color=BLACK),
|
|
|
|
'name': Text(value='%s>' % 'pwnagotchi', position=self._layout['name'], color=BLACK, font=fonts.Bold),
|
|
|
|
'status': Text(value=self._voice.default(),
|
|
position=self._layout['status']['pos'],
|
|
color=BLACK,
|
|
font=self._layout['status']['font'],
|
|
wrap=True,
|
|
# the current maximum number of characters per line, assuming each character is 6 pixels wide
|
|
max_length=self._layout['status']['max']),
|
|
|
|
'shakes': LabeledValue(label='PWND ', value='0 (00)', color=BLACK,
|
|
position=self._layout['shakes'], label_font=fonts.Bold,
|
|
text_font=fonts.Medium),
|
|
'mode': Text(value='AUTO', position=self._layout['mode'],
|
|
font=fonts.Bold, color=BLACK),
|
|
})
|
|
|
|
if state:
|
|
for key, value in state.items():
|
|
self._state.set(key, value)
|
|
|
|
plugins.on('ui_setup', self)
|
|
|
|
if config['ui']['fps'] > 0.0:
|
|
_thread.start_new_thread(self._refresh_handler, ())
|
|
self._ignore_changes = ()
|
|
else:
|
|
logging.warning("ui.fps is 0, the display will only update for major changes")
|
|
self._ignore_changes = ('uptime', 'name')
|
|
|
|
ROOT = self
|
|
|
|
def has_element(self, key):
|
|
self._state.has_element(key)
|
|
|
|
def add_element(self, key, elem):
|
|
self._state.add_element(key, elem)
|
|
|
|
def remove_element(self, key):
|
|
self._state.remove_element(key)
|
|
|
|
def width(self):
|
|
return self._width
|
|
|
|
def height(self):
|
|
return self._height
|
|
|
|
def on_state_change(self, key, cb):
|
|
self._state.add_listener(key, cb)
|
|
|
|
def on_render(self, cb):
|
|
if cb not in self._render_cbs:
|
|
self._render_cbs.append(cb)
|
|
|
|
def _refresh_handler(self):
|
|
delay = 1.0 / self._config['ui']['fps']
|
|
# logging.info("view refresh handler started with period of %.2fs" % delay)
|
|
|
|
while True:
|
|
name = self._state.get('name')
|
|
self.set('name', name.rstrip('█').strip() if '█' in name else (name + ' █'))
|
|
self.update()
|
|
time.sleep(delay)
|
|
|
|
def set(self, key, value):
|
|
self._state.set(key, value)
|
|
|
|
def get(self, key):
|
|
return self._state.get(key)
|
|
|
|
def on_starting(self):
|
|
self.set('status', self._voice.on_starting())
|
|
self.set('face', faces.AWAKE)
|
|
|
|
def on_ai_ready(self):
|
|
self.set('mode', ' AI')
|
|
self.set('face', faces.HAPPY)
|
|
self.set('status', self._voice.on_ai_ready())
|
|
self.update()
|
|
|
|
def on_manual_mode(self, last_session):
|
|
self.set('mode', 'MANU')
|
|
self.set('face', faces.SAD if (last_session.epochs > 3 and last_session.handshakes == 0) else faces.HAPPY)
|
|
self.set('status', self._voice.on_last_session_data(last_session))
|
|
self.set('epoch', "%04d" % last_session.epochs)
|
|
self.set('uptime', last_session.duration)
|
|
self.set('channel', '-')
|
|
self.set('aps', "%d" % last_session.associated)
|
|
self.set('shakes', '%d (%s)' % (last_session.handshakes, \
|
|
utils.total_unique_handshakes(self._config['bettercap']['handshakes'])))
|
|
self.set_closest_peer(last_session.last_peer, last_session.peers)
|
|
|
|
def is_normal(self):
|
|
return self._state.get('face') not in (
|
|
faces.INTENSE,
|
|
faces.COOL,
|
|
faces.BORED,
|
|
faces.HAPPY,
|
|
faces.EXCITED,
|
|
faces.MOTIVATED,
|
|
faces.DEMOTIVATED,
|
|
faces.SMART,
|
|
faces.SAD,
|
|
faces.LONELY)
|
|
|
|
def on_keys_generation(self):
|
|
self.set('face', faces.AWAKE)
|
|
self.set('status', self._voice.on_keys_generation())
|
|
self.update()
|
|
|
|
def on_normal(self):
|
|
self.set('face', faces.AWAKE)
|
|
self.set('status', self._voice.on_normal())
|
|
self.update()
|
|
|
|
def set_closest_peer(self, peer, num_total):
|
|
if peer is None:
|
|
self.set('friend_face', None)
|
|
self.set('friend_name', None)
|
|
else:
|
|
# ref. https://www.metageek.com/training/resources/understanding-rssi-2.html
|
|
if peer.rssi >= -67:
|
|
num_bars = 4
|
|
elif peer.rssi >= -70:
|
|
num_bars = 3
|
|
elif peer.rssi >= -80:
|
|
num_bars = 2
|
|
else:
|
|
num_bars = 1
|
|
|
|
name = '▌' * num_bars
|
|
name += '│' * (4 - num_bars)
|
|
name += ' %s %d (%d)' % (peer.name(), peer.pwnd_run(), peer.pwnd_total())
|
|
|
|
if num_total > 1:
|
|
if num_total > 9000:
|
|
name += ' of over 9000'
|
|
else:
|
|
name += ' of %d' % num_total
|
|
|
|
self.set('friend_face', peer.face())
|
|
self.set('friend_name', name)
|
|
self.update()
|
|
|
|
def on_new_peer(self, peer):
|
|
self.set('face', faces.FRIEND)
|
|
self.set('status', self._voice.on_new_peer(peer))
|
|
self.update()
|
|
|
|
def on_lost_peer(self, peer):
|
|
self.set('face', faces.LONELY)
|
|
self.set('status', self._voice.on_lost_peer(peer))
|
|
self.update()
|
|
|
|
def on_free_channel(self, channel):
|
|
self.set('face', faces.SMART)
|
|
self.set('status', self._voice.on_free_channel(channel))
|
|
self.update()
|
|
|
|
def wait(self, secs, sleeping=True):
|
|
was_normal = self.is_normal()
|
|
part = secs / 10.0
|
|
|
|
for step in range(0, 10):
|
|
# if we weren't in a normal state before goin
|
|
# to sleep, keep that face and status on for
|
|
# a while, otherwise the sleep animation will
|
|
# always override any minor state change before it
|
|
if was_normal or step > 5:
|
|
if sleeping:
|
|
if secs > 1:
|
|
self.set('face', faces.SLEEP)
|
|
self.set('status', self._voice.on_napping(int(secs)))
|
|
else:
|
|
self.set('face', faces.SLEEP2)
|
|
self.set('status', self._voice.on_awakening())
|
|
else:
|
|
self.set('status', self._voice.on_waiting(int(secs)))
|
|
if step % 2 == 0:
|
|
self.set('face', faces.LOOK_R)
|
|
else:
|
|
self.set('face', faces.LOOK_L)
|
|
|
|
time.sleep(part)
|
|
secs -= part
|
|
|
|
self.on_normal()
|
|
|
|
def on_shutdown(self):
|
|
self.set('face', faces.SLEEP)
|
|
self.set('status', self._voice.on_shutdown())
|
|
self.update(force=True)
|
|
self._frozen = True
|
|
|
|
def on_bored(self):
|
|
self.set('face', faces.BORED)
|
|
self.set('status', self._voice.on_bored())
|
|
self.update()
|
|
|
|
def on_sad(self):
|
|
self.set('face', faces.SAD)
|
|
self.set('status', self._voice.on_sad())
|
|
self.update()
|
|
|
|
def on_motivated(self, reward):
|
|
self.set('face', faces.MOTIVATED)
|
|
self.set('status', self._voice.on_motivated(reward))
|
|
self.update()
|
|
|
|
def on_demotivated(self, reward):
|
|
self.set('face', faces.DEMOTIVATED)
|
|
self.set('status', self._voice.on_demotivated(reward))
|
|
self.update()
|
|
|
|
def on_excited(self):
|
|
self.set('face', faces.EXCITED)
|
|
self.set('status', self._voice.on_excited())
|
|
self.update()
|
|
|
|
def on_assoc(self, ap):
|
|
self.set('face', faces.INTENSE)
|
|
self.set('status', self._voice.on_assoc(ap))
|
|
self.update()
|
|
|
|
def on_deauth(self, sta):
|
|
self.set('face', faces.COOL)
|
|
self.set('status', self._voice.on_deauth(sta))
|
|
self.update()
|
|
|
|
def on_miss(self, who):
|
|
self.set('face', faces.SAD)
|
|
self.set('status', self._voice.on_miss(who))
|
|
self.update()
|
|
|
|
def on_lonely(self):
|
|
self.set('face', faces.LONELY)
|
|
self.set('status', self._voice.on_lonely())
|
|
self.update()
|
|
|
|
def on_handshakes(self, new_shakes):
|
|
self.set('face', faces.HAPPY)
|
|
self.set('status', self._voice.on_handshakes(new_shakes))
|
|
self.update()
|
|
|
|
def on_unread_messages(self, count, total):
|
|
self.set('face', faces.EXCITED)
|
|
self.set('status', self._voice.on_unread_messages(count, total))
|
|
self.update()
|
|
|
|
def on_rebooting(self):
|
|
self.set('face', faces.BROKEN)
|
|
self.set('status', self._voice.on_rebooting())
|
|
self.update()
|
|
|
|
def on_custom(self, text):
|
|
self.set('face', faces.DEBUG)
|
|
self.set('status', self._voice.custom(text))
|
|
self.update()
|
|
|
|
def update(self, force=False, new_data={}):
|
|
for key, val in new_data.items():
|
|
self.set(key, val)
|
|
|
|
with self._lock:
|
|
if self._frozen:
|
|
return
|
|
|
|
changes = self._state.changes(ignore=self._ignore_changes)
|
|
if force or len(changes):
|
|
self._canvas = Image.new('1', (self._width, self._height), WHITE)
|
|
drawer = ImageDraw.Draw(self._canvas)
|
|
|
|
plugins.on('ui_update', self)
|
|
|
|
for key, lv in self._state.items():
|
|
lv.draw(self._canvas, drawer)
|
|
|
|
for cb in self._render_cbs:
|
|
cb(self._canvas)
|
|
|
|
self._state.reset()
|