misc: refactored web ui code

This commit is contained in:
Simone Margaritelli 2019-10-20 15:41:06 +02:00
parent 892fda775d
commit bd7c64b2af
2 changed files with 152 additions and 100 deletions

View File

@ -1,113 +1,22 @@
import _thread
from threading import Lock
import shutil
import logging
import pwnagotchi, pwnagotchi.plugins as plugins
import pwnagotchi.plugins as plugins
import pwnagotchi.ui.hw as hw
import pwnagotchi.ui.web as web
from pwnagotchi.ui.view import View
from http.server import BaseHTTPRequestHandler, HTTPServer
class VideoHandler(BaseHTTPRequestHandler):
_lock = Lock()
_index = """<html>
<head>
<title>%s</title>
<style>
.block {
-webkit-appearance: button;
-moz-appearance: button;
appearance: button;
display: block;
cursor: pointer;
text-align: center;
}
</style>
</head>
<body>
<div style="position: absolute; top:0; left:0; width:100%%;">
<img src="/ui" id="ui" style="width:100%%"/>
<br/>
<hr/>
<form action="/shutdown" onsubmit="return confirm('This will halt the unit, continue?');">
<input type="submit" class="block" value="Shutdown"/>
</form>
</div>
<script type="text/javascript">
window.onload = function() {
var image = document.getElementById("ui");
function updateImage() {
image.src = image.src.split("?")[0] + "?" + new Date().getTime();
}
setInterval(updateImage, %d);
}
</script>
</body>
</html>"""
@staticmethod
def render(img):
with VideoHandler._lock:
img.save("/root/pwnagotchi.png", format='PNG')
def log_message(self, format, *args):
return
def do_GET(self):
if self.path == '/':
self.send_response(200)
self.send_header('Content-type', 'text/html')
self.end_headers()
try:
self.wfile.write(bytes(self._index % (pwnagotchi.name(), 1000), "utf8"))
except:
pass
elif self.path.startswith('/shutdown'):
pwnagotchi.shutdown()
elif self.path.startswith('/ui'):
with self._lock:
self.send_response(200)
self.send_header('Content-type', 'image/png')
self.end_headers()
try:
with open("/root/pwnagotchi.png", 'rb') as fp:
shutil.copyfileobj(fp, self.wfile)
except:
pass
else:
self.send_response(404)
class Display(View):
def __init__(self, config, state={}):
super(Display, self).__init__(config, hw.display_for(config), state)
self._enabled = config['ui']['display']['enabled']
self._rotation = config['ui']['display']['rotation']
self._video_enabled = config['ui']['display']['video']['enabled']
self._video_port = config['ui']['display']['video']['port']
self._video_address = config['ui']['display']['video']['address']
self._httpd = None
config = config['ui']['display']
self._enabled = config['enabled']
self._rotation = config['rotation']
self._webui = web.Server(config)
self.init_display()
if self._video_enabled:
_thread.start_new_thread(self._http_serve, ())
def _http_serve(self):
if self._video_address is not None:
self._httpd = HTTPServer((self._video_address, self._video_port), VideoHandler)
logging.info("ui available at http://%s:%d/" % (self._video_address, self._video_port))
self._httpd.serve_forever()
else:
logging.info("could not get ip of usb0, video server not starting")
def is_inky(self):
return self._implementation.name == 'inky'
@ -126,7 +35,6 @@ class Display(View):
def is_lcdhat(self):
return self._implementation.name == 'lcdhat'
def is_waveshare_any(self):
return self.is_waveshare_v1() or self.is_waveshare_v2()
@ -148,7 +56,7 @@ class Display(View):
return img
def _on_view_rendered(self, img):
VideoHandler.render(img)
web.update_frame(img)
if self._enabled:
self._canvas = (img if self._rotation == 0 else img.rotate(self._rotation))
if self._implementation is not None:

144
pwnagotchi/ui/web.py Normal file
View File

@ -0,0 +1,144 @@
import _thread
from http.server import BaseHTTPRequestHandler, HTTPServer
from threading import Lock
import shutil
import logging
import pwnagotchi
frame_path = '/root/pwnagotchi.png'
frame_format = 'PNG'
frame_ctype = 'image/png'
frame_lock = Lock()
def update_frame(img):
global frame_lock, frame_path, frame_format
with frame_lock:
img.save(frame_path, format=frame_format)
STYLE = """
.block {
-webkit-appearance: button;
-moz-appearance: button;
appearance: button;
display: block;
cursor: pointer;
text-align: center;
}
"""
SCRIPT = """
window.onload = function() {
var image = document.getElementById("ui");
function updateImage() {
image.src = image.src.split("?")[0] + "?" + new Date().getTime();
}
setInterval(updateImage, %d);
}
"""
INDEX = """<html>
<head>
<title>%s</title>
<style>""" + STYLE + """</style>
</head>
<body>
<div style="position: absolute; top:0; left:0; width:100%%;">
<img src="/ui" id="ui" style="width:100%%"/>
<br/>
<hr/>
<form action="/shutdown" onsubmit="return confirm('This will halt the unit, continue?');">
<input type="submit" class="block" value="Shutdown"/>
</form>
</div>
<script type="text/javascript">""" + SCRIPT + """</script>
</body>
</html>"""
SHUTDOWN = """<html>
<head>
<title>%s</title>
<style>""" + STYLE + """</style>
</head>
<body>
<div style="position: absolute; top:0; left:0; width:100%%;">
Shutting down ...
</div>
</body>
</html>"""
class Handler(BaseHTTPRequestHandler):
# suppress internal logging
def log_message(self, format, *args):
return
# just render some html in a 200 response
def _html(self, html):
self.send_response(200)
self.send_header('Content-type', 'text/html')
self.end_headers()
try:
self.wfile.write(bytes(html, "utf8"))
except:
pass
# serve the main html page
def _index(self):
self._html(INDEX % (pwnagotchi.name(), 1000))
# serve a message and shuts down the unit
def _shutdown(self):
self._html(SHUTDOWN % pwnagotchi.name())
pwnagotchi.shutdown()
# serve the PNG file with the display image
def _image(self):
global frame_path, frame_ctype
with frame_lock:
self.send_response(200)
self.send_header('Content-type', frame_ctype)
self.end_headers()
try:
with open(frame_path, 'rb') as fp:
shutil.copyfileobj(fp, self.wfile)
except:
pass
# main entry point of the http server
def do_GET(self):
global frame_lock
if self.path == '/':
self._index()
elif self.path.startswith('/shutdown'):
self._shutdown()
elif self.path.startswith('/ui'):
self._image()
else:
self.send_response(404)
class Server(object):
def __init__(self, config):
self._enabled = config['video']['enabled']
self._port = config['video']['port']
self._address = config['video']['address']
self._httpd = None
if self._enabled:
_thread.start_new_thread(self._http_serve, ())
def _http_serve(self):
if self._address is not None:
self._httpd = HTTPServer((self._address, self._port), Handler)
logging.info("web ui available at http://%s:%d/" % (self._address, self._port))
self._httpd.serve_forever()
else:
logging.info("could not get ip of usb0, video server not starting")