refactored main.py
This commit is contained in:
parent
780664b1c0
commit
56b1d21bb0
sdcard/rootfs/root/pwnagotchi/scripts
@ -12,7 +12,6 @@ import pwnagotchi.version as version
|
||||
import pwnagotchi.plugins as plugins
|
||||
|
||||
from pwnagotchi.log import SessionParser
|
||||
from pwnagotchi.voice import Voice
|
||||
from pwnagotchi.agent import Agent
|
||||
from pwnagotchi.ui.display import Display
|
||||
|
||||
@ -34,41 +33,6 @@ args = parser.parse_args()
|
||||
config = utils.load_config(args)
|
||||
utils.setup_logging(args, config)
|
||||
|
||||
if args.do_clear:
|
||||
print("clearing the display ...")
|
||||
cleardisplay = config['ui']['display']['type']
|
||||
if cleardisplay in ('inkyphat', 'inky'):
|
||||
print("inky display")
|
||||
from inky import InkyPHAT
|
||||
|
||||
epd = InkyPHAT(config['ui']['display']['color'])
|
||||
epd.set_border(InkyPHAT.BLACK)
|
||||
self._render_cb = self._inky_render
|
||||
elif cleardisplay in ('papirus', 'papi'):
|
||||
print("papirus display")
|
||||
from pwnagotchi.ui.papirus.epd import EPD
|
||||
|
||||
os.environ['EPD_SIZE'] = '2.0'
|
||||
epd = EPD()
|
||||
epd.clear()
|
||||
elif cleardisplay in ('waveshare_1', 'ws_1', 'waveshare1', 'ws1'):
|
||||
print("waveshare v1 display")
|
||||
from pwnagotchi.ui.waveshare.v1.epd2in13 import EPD
|
||||
|
||||
epd = EPD()
|
||||
epd.init(epd.lut_full_update)
|
||||
epd.Clear(0xFF)
|
||||
elif cleardisplay in ('waveshare_2', 'ws_2', 'waveshare2', 'ws2'):
|
||||
print("waveshare v2 display")
|
||||
from pwnagotchi.ui.waveshare.v2.waveshare import EPD
|
||||
|
||||
epd = EPD()
|
||||
epd.init(epd.FULL_UPDATE)
|
||||
epd.Clear(0xff)
|
||||
else:
|
||||
print("unknown display type %s" % cleardisplay)
|
||||
quit()
|
||||
|
||||
plugins.load_from_path(plugins.default_path)
|
||||
if 'plugins' in config['main'] and config['main']['plugins'] is not None:
|
||||
plugins.load_from_path(config['main']['plugins'])
|
||||
@ -79,74 +43,78 @@ display = Display(config=config, state={'name': '%s>' % pwnagotchi.name()})
|
||||
agent = Agent(view=display, config=config)
|
||||
|
||||
logging.info("%s@%s (v%s)" % (pwnagotchi.name(), agent._identity, version.version))
|
||||
# for key, value in config['personality'].items():
|
||||
# logging.info(" %s: %s" % (key, value))
|
||||
|
||||
for _, plugin in plugins.loaded.items():
|
||||
logging.info("plugin '%s' v%s loaded from %s" % (plugin.__name__, plugin.__version__, plugin.__file__))
|
||||
logging.debug("plugin '%s' v%s loaded from %s" % (plugin.__name__, plugin.__version__, plugin.__file__))
|
||||
|
||||
if args.do_manual:
|
||||
if args.do_clear:
|
||||
logging.info("clearing the display ...")
|
||||
display.clear()
|
||||
|
||||
elif args.do_manual:
|
||||
logging.info("entering manual mode ...")
|
||||
|
||||
log = SessionParser(config['main']['log'])
|
||||
logging.info("the last session lasted %s (%d completed epochs, trained for %d), average reward:%s (min:%s max:%s)" % (
|
||||
log.duration_human,
|
||||
log.epochs,
|
||||
log.train_epochs,
|
||||
log.avg_reward,
|
||||
log.min_reward,
|
||||
log.max_reward))
|
||||
logging.info(
|
||||
"the last session lasted %s (%d completed epochs, trained for %d), average reward:%s (min:%s max:%s)" % (
|
||||
log.duration_human,
|
||||
log.epochs,
|
||||
log.train_epochs,
|
||||
log.avg_reward,
|
||||
log.min_reward,
|
||||
log.max_reward))
|
||||
|
||||
while True:
|
||||
display.on_manual_mode(log)
|
||||
time.sleep(1)
|
||||
|
||||
if Agent.is_connected():
|
||||
plugins.on('internet_available', config, log)
|
||||
plugins.on('internet_available', display, config, log)
|
||||
|
||||
quit()
|
||||
else:
|
||||
logging.info("entering auto mode ...")
|
||||
|
||||
agent.start_ai()
|
||||
agent.setup_events()
|
||||
agent.set_starting()
|
||||
agent.start_monitor_mode()
|
||||
agent.start_event_polling()
|
||||
agent.start_ai()
|
||||
agent.setup_events()
|
||||
agent.set_starting()
|
||||
agent.start_monitor_mode()
|
||||
agent.start_event_polling()
|
||||
|
||||
# print initial stats
|
||||
agent.next_epoch()
|
||||
# print initial stats
|
||||
agent.next_epoch()
|
||||
|
||||
agent.set_ready()
|
||||
agent.set_ready()
|
||||
|
||||
while True:
|
||||
try:
|
||||
# recon on all channels
|
||||
agent.recon()
|
||||
# get nearby access points grouped by channel
|
||||
channels = agent.get_access_points_by_channel()
|
||||
# check for free channels to use
|
||||
agent.check_channels(channels)
|
||||
# for each channel
|
||||
for ch, aps in channels:
|
||||
agent.set_channel(ch)
|
||||
while True:
|
||||
try:
|
||||
# recon on all channels
|
||||
agent.recon()
|
||||
# get nearby access points grouped by channel
|
||||
channels = agent.get_access_points_by_channel()
|
||||
# check for free channels to use
|
||||
agent.check_channels(channels)
|
||||
# for each channel
|
||||
for ch, aps in channels:
|
||||
agent.set_channel(ch)
|
||||
|
||||
if not agent.is_stale() and agent.any_activity():
|
||||
logging.info("%d access points on channel %d" % (len(aps), ch))
|
||||
if not agent.is_stale() and agent.any_activity():
|
||||
logging.info("%d access points on channel %d" % (len(aps), ch))
|
||||
|
||||
# for each ap on this channel
|
||||
for ap in aps:
|
||||
# send an association frame in order to get for a PMKID
|
||||
agent.associate(ap)
|
||||
# deauth all client stations in order to get a full handshake
|
||||
for sta in ap['clients']:
|
||||
agent.deauth(ap, sta)
|
||||
# for each ap on this channel
|
||||
for ap in aps:
|
||||
# send an association frame in order to get for a PMKID
|
||||
agent.associate(ap)
|
||||
# deauth all client stations in order to get a full handshake
|
||||
for sta in ap['clients']:
|
||||
agent.deauth(ap, sta)
|
||||
|
||||
# An interesting effect of this:
|
||||
#
|
||||
# From Pwnagotchi's perspective, the more new access points
|
||||
# and / or client stations nearby, the longer one epoch of
|
||||
# its relative time will take ... basically, in Pwnagotchi's universe,
|
||||
# WiFi electromagnetic fields affect time like gravitational fields
|
||||
# affect ours ... neat ^_^
|
||||
agent.next_epoch()
|
||||
except Exception as e:
|
||||
logging.exception("main loop exception")
|
||||
# An interesting effect of this:
|
||||
#
|
||||
# From Pwnagotchi's perspective, the more new access points
|
||||
# and / or client stations nearby, the longer one epoch of
|
||||
# its relative time will take ... basically, in Pwnagotchi's universe,
|
||||
# WiFi electromagnetic fields affect time like gravitational fields
|
||||
# affect ours ... neat ^_^
|
||||
agent.next_epoch()
|
||||
except Exception as e:
|
||||
logging.exception("main loop exception")
|
||||
|
@ -18,7 +18,7 @@ def on_loaded():
|
||||
|
||||
|
||||
# called in manual mode when there's internet connectivity
|
||||
def on_internet_available(config, log):
|
||||
def on_internet_available(ui, config, log):
|
||||
pass
|
||||
|
||||
|
||||
|
@ -15,14 +15,14 @@ running = False
|
||||
|
||||
|
||||
def on_loaded():
|
||||
logging.info("GPS plugin loaded for %s" % device)
|
||||
logging.info("gps plugin loaded for %s" % device)
|
||||
|
||||
|
||||
def on_ready(agent):
|
||||
global running
|
||||
|
||||
if os.path.exists(device):
|
||||
logging.info("enabling GPS bettercap's module for %s" % device)
|
||||
logging.info("enabling gps bettercap's module for %s" % device)
|
||||
try:
|
||||
agent.run('gps off')
|
||||
except:
|
||||
|
@ -8,16 +8,14 @@ __enabled__ = True
|
||||
import logging
|
||||
from pwnagotchi.voice import Voice
|
||||
|
||||
UI = None
|
||||
|
||||
|
||||
def on_loaded():
|
||||
logging.info("Twitter plugin loaded.")
|
||||
logging.info("twitter plugin loaded.")
|
||||
|
||||
|
||||
# called in manual mode when there's internet connectivity
|
||||
def on_internet_available(config, log):
|
||||
if config['twitter']['enabled'] and log.is_new() and log.handshakes > 0 and UI:
|
||||
def on_internet_available(ui, config, log):
|
||||
if config['twitter']['enabled'] and log.is_new() and log.handshakes > 0:
|
||||
try:
|
||||
import tweepy
|
||||
except ImportError:
|
||||
@ -28,11 +26,11 @@ def on_internet_available(config, log):
|
||||
|
||||
picture = '/dev/shm/pwnagotchi.png'
|
||||
|
||||
UI.on_manual_mode(log)
|
||||
UI.update(force=True)
|
||||
UI.image().save(picture, 'png')
|
||||
UI.set('status', 'Tweeting...')
|
||||
UI.update(force=True)
|
||||
ui.on_manual_mode(log)
|
||||
ui.update(force=True)
|
||||
ui.image().save(picture, 'png')
|
||||
ui.set('status', 'Tweeting...')
|
||||
ui.update(force=True)
|
||||
|
||||
try:
|
||||
auth = tweepy.OAuthHandler(config['twitter']['consumer_key'], config['twitter']['consumer_secret'])
|
||||
@ -46,9 +44,3 @@ def on_internet_available(config, log):
|
||||
logging.info("tweeted: %s" % tweet)
|
||||
except Exception as e:
|
||||
logging.exception("error while tweeting")
|
||||
|
||||
|
||||
def on_ui_setup(ui):
|
||||
# need that object
|
||||
global UI
|
||||
UI = ui
|
||||
|
@ -102,12 +102,15 @@ class Display(View):
|
||||
def _is_papirus(self):
|
||||
return self._display_type in ('papirus', 'papi')
|
||||
|
||||
def _is_waveshare1(self):
|
||||
def _is_waveshare_v1(self):
|
||||
return self._display_type in ('waveshare_1', 'ws_1', 'waveshare1', 'ws1')
|
||||
|
||||
def _is_waveshare2(self):
|
||||
def _is_waveshare_v2(self):
|
||||
return self._display_type in ('waveshare_2', 'ws_2', 'waveshare2', 'ws2')
|
||||
|
||||
def _is_waveshare(self):
|
||||
return self._is_waveshare_v1() or self._is_waveshare_v2()
|
||||
|
||||
def _init_display(self):
|
||||
if self._is_inky():
|
||||
logging.info("initializing inky display")
|
||||
@ -124,7 +127,7 @@ class Display(View):
|
||||
self._display.clear()
|
||||
self._render_cb = self._papirus_render
|
||||
|
||||
elif self._is_waveshare1():
|
||||
elif self._is_waveshare_v1():
|
||||
logging.info("initializing waveshare v1 display")
|
||||
from pwnagotchi.ui.waveshare.v1.epd2in13 import EPD
|
||||
self._display = EPD()
|
||||
@ -133,7 +136,7 @@ class Display(View):
|
||||
self._display.init(self._display.lut_partial_update)
|
||||
self._render_cb = self._waveshare_render
|
||||
|
||||
elif self._is_waveshare2():
|
||||
elif self._is_waveshare_v2():
|
||||
logging.info("initializing waveshare v2 display")
|
||||
from pwnagotchi.ui.waveshare.v2.waveshare import EPD
|
||||
self._display = EPD()
|
||||
@ -149,6 +152,18 @@ class Display(View):
|
||||
|
||||
self.on_render(self._on_view_rendered)
|
||||
|
||||
def clear(self):
|
||||
if self._display is None:
|
||||
logging.error("no display object created")
|
||||
elif self._is_inky():
|
||||
self._display.Clear()
|
||||
elif self._is_papirus():
|
||||
self._display.clear()
|
||||
elif self._is_waveshare():
|
||||
self._display.Clear(WHITE)
|
||||
else:
|
||||
logging.critical("unknown display type %s" % self._display_type)
|
||||
|
||||
def _inky_render(self):
|
||||
if self._display_color != 'mono':
|
||||
display_colors = 3
|
||||
@ -183,9 +198,9 @@ class Display(View):
|
||||
|
||||
def _waveshare_render(self):
|
||||
buf = self._display.getbuffer(self._canvas)
|
||||
if self._is_waveshare1():
|
||||
if self._is_waveshare_v1():
|
||||
self._display.display(buf)
|
||||
elif self._is_waveshare2():
|
||||
elif self._is_waveshare_v2():
|
||||
self._display.displayPartial(buf)
|
||||
|
||||
def image(self):
|
||||
|
Loading…
x
Reference in New Issue
Block a user