Merge pull request from xenDE/patch-2

cleanup, fixes and add handling of .paw-gps.json
This commit is contained in:
evilsocket 2019-11-21 17:03:08 +01:00 committed by GitHub
commit a779fb9b0b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -9,7 +9,7 @@ from functools import lru_cache
''' '''
2do: 2do:
- make the cache handling multiple clients - make+test the cache handling multiple clients
- cleanup the javascript in a class and handle "/newest" additions - cleanup the javascript in a class and handle "/newest" additions
- create map filters (only cracked APs, only last xx days, between 2 days with slider) - create map filters (only cracked APs, only last xx days, between 2 days with slider)
http://www.gistechsolutions.com/leaflet/DEMO/filter/filter.html http://www.gistechsolutions.com/leaflet/DEMO/filter/filter.html
@ -22,16 +22,10 @@ from functools import lru_cache
class Webgpsmap(plugins.Plugin): class Webgpsmap(plugins.Plugin):
__author__ = 'https://github.com/xenDE and https://github.com/dadav' __author__ = 'https://github.com/xenDE and https://github.com/dadav'
__version__ = '1.2.2' __version__ = '1.3.0'
__name__ = 'webgpsmap' __name__ = 'webgpsmap'
__license__ = 'GPL3' __license__ = 'GPL3'
__description__ = 'a plugin for pwnagotchi that shows a openstreetmap with positions of ap-handshakes in your webbrowser' __description__ = 'a plugin for pwnagotchi that shows a openstreetmap with positions of ap-handshakes in your webbrowser'
__help__ = """
- install: copy "webgpsmap.py" and "webgpsmap.html" to your configured "custom_plugins" directory
- add webgpsmap.yml to your config
- connect your PC/Smartphone/* with USB, BT or other to your pwnagotchi and browse to http://pwnagotchi.local:8080/plugins/webgpsmap/
(change pwnagotchi.local to your pwnagotchis IP, if needed)
"""
ALREADY_SENT = list() ALREADY_SENT = list()
SKIP = list() SKIP = list()
@ -47,7 +41,7 @@ class Webgpsmap(plugins.Plugin):
""" """
Plugin got loaded Plugin got loaded
""" """
logging.info("webgpsmap plugin loaded") logging.info("[webgpsmap]: plugin loaded")
def on_webhook(self, path, request): def on_webhook(self, path, request):
""" """
@ -68,8 +62,8 @@ class Webgpsmap(plugins.Plugin):
response_status = 500 response_status = 500
response_mimetype = "application/xhtml+xml" response_mimetype = "application/xhtml+xml"
response_header_contenttype = 'text/html' response_header_contenttype = 'text/html'
except Exception as ex: except Exception as error:
logging.error(ex) logging.error(f"[webgpsmap] error: {error}")
return return
else: else:
if request.method == "GET": if request.method == "GET":
@ -78,8 +72,8 @@ class Webgpsmap(plugins.Plugin):
self.ALREADY_SENT = list() self.ALREADY_SENT = list()
try: try:
response_data = bytes(self.get_html(), "utf-8") response_data = bytes(self.get_html(), "utf-8")
except Exception as ex: except Exception as error:
logging.error(ex) logging.error(f"[webgpsmap] error: {error}")
return return
response_status = 200 response_status = 200
response_mimetype = "application/xhtml+xml" response_mimetype = "application/xhtml+xml"
@ -92,8 +86,8 @@ class Webgpsmap(plugins.Plugin):
response_status = 200 response_status = 200
response_mimetype = "application/json" response_mimetype = "application/json"
response_header_contenttype = 'application/json' response_header_contenttype = 'application/json'
except Exception as ex: except Exception as error:
logging.error(ex) logging.error(f"[webgpsmap] error: {error}")
return return
# elif path.startswith('/newest'): # elif path.startswith('/newest'):
# # returns all positions newer then timestamp # # returns all positions newer then timestamp
@ -118,7 +112,7 @@ class Webgpsmap(plugins.Plugin):
<meta charset="utf-8"/> <meta charset="utf-8"/>
<style>body{font-size:1000%;}</style> <style>body{font-size:1000%;}</style>
</head> </head>
<body>4😋4</body> <body>4😋4 for bad boys</body>
</html>''', "utf-8") </html>''', "utf-8")
response_status = 404 response_status = 404
try: try:
@ -126,12 +120,12 @@ class Webgpsmap(plugins.Plugin):
if response_header_contenttype is not None: if response_header_contenttype is not None:
r.headers["Content-Type"] = response_header_contenttype r.headers["Content-Type"] = response_header_contenttype
return r return r
except Exception as ex: except Exception as error:
logging.error(ex) logging.error(f"[webgpsmap] error: {error}")
return return
# cache 1024 items # cache 2048 items
@lru_cache(maxsize=1024, typed=False) @lru_cache(maxsize=2048, typed=False)
def _get_pos_from_file(self, path): def _get_pos_from_file(self, path):
return PositionFile(path) return PositionFile(path)
@ -144,7 +138,7 @@ class Webgpsmap(plugins.Plugin):
handshake_dir = gpsdir handshake_dir = gpsdir
gps_data = dict() gps_data = dict()
logging.info("webgpsmap: scanning %s", handshake_dir) logging.info(f"[webgpsmap] scanning {handshake_dir}")
all_files = os.listdir(handshake_dir) all_files = os.listdir(handshake_dir)
@ -156,33 +150,40 @@ class Webgpsmap(plugins.Plugin):
all_geo_or_gps_files = [] all_geo_or_gps_files = []
for filename_pcap in all_pcap_files: for filename_pcap in all_pcap_files:
filename_base = filename_pcap[:-5] # remove ".pcap" filename_base = filename_pcap[:-5] # remove ".pcap"
logging.debug("webgpsmap: found: " + filename_base) logging.debug(f"[webgpsmap] found: {filename_base}")
filename_position = None filename_position = None
logging.debug("[webgpsmap] search for .gps.json")
check_for = os.path.basename(filename_base) + ".gps.json" check_for = os.path.basename(filename_base) + ".gps.json"
if check_for in all_files: if check_for in all_files:
filename_position = str(os.path.join(handshake_dir, check_for)) filename_position = str(os.path.join(handshake_dir, check_for))
logging.debug("[webgpsmap] search for .geo.json")
check_for = os.path.basename(filename_base) + ".geo.json" check_for = os.path.basename(filename_base) + ".geo.json"
if check_for in all_files: if check_for in all_files:
filename_position = str(os.path.join(handshake_dir, check_for)) filename_position = str(os.path.join(handshake_dir, check_for))
logging.debug("[webgpsmap] search for .paw-gps.json")
check_for = os.path.basename(filename_base) + ".paw-gps.json"
if check_for in all_files:
filename_position = str(os.path.join(handshake_dir, check_for))
logging.debug(f"[webgpsmap] end search for position data files and use {filename_position}")
if filename_position is not None: if filename_position is not None:
# logging.debug("webgpsmap: -- found: %s %d" % (check_for, len(all_geo_or_gps_files)) )
all_geo_or_gps_files.append(filename_position) all_geo_or_gps_files.append(filename_position)
# all_geo_or_gps_files = set(all_geo_or_gps_files) - set(SKIP) # remove skiped networks? No! # all_geo_or_gps_files = set(all_geo_or_gps_files) - set(SKIP) # remove skipped networks? No!
if newest_only: if newest_only:
all_geo_or_gps_files = set(all_geo_or_gps_files) - set(self.ALREADY_SENT) all_geo_or_gps_files = set(all_geo_or_gps_files) - set(self.ALREADY_SENT)
logging.info("webgpsmap: Found %d .(geo|gps).json files from %d handshakes. Fetching positions ...", logging.info(f"[webgpsmap] Found {len(all_geo_or_gps_files)} position-data files from {len(all_pcap_files)} handshakes. Fetching positions ...")
len(all_geo_or_gps_files), len(all_pcap_files))
for pos_file in all_geo_or_gps_files: for pos_file in all_geo_or_gps_files:
try: try:
pos = self._get_pos_from_file(pos_file) pos = self._get_pos_from_file(pos_file)
if not pos.type() == PositionFile.GPS and not pos.type() == PositionFile.GEO: if not pos.type() == PositionFile.GPS and not pos.type() == PositionFile.GEO and not pos.type() == PositionFile.PAWGPS:
continue continue
ssid, mac = pos.ssid(), pos.mac() ssid, mac = pos.ssid(), pos.mac()
@ -190,10 +191,17 @@ class Webgpsmap(plugins.Plugin):
# invalid mac is strange and should abort; ssid is ok # invalid mac is strange and should abort; ssid is ok
if not mac: if not mac:
raise ValueError("Mac can't be parsed from filename") raise ValueError("Mac can't be parsed from filename")
pos_type = 'unknown'
if pos.type() == PositionFile.GPS:
pos_type = 'gps'
elif pos.type() == PositionFile.GEO:
pos_type = 'geo'
elif pos.type() == PositionFile.PAWGPS:
pos_type = 'paw'
gps_data[ssid+"_"+mac] = { gps_data[ssid+"_"+mac] = {
'ssid': ssid, 'ssid': ssid,
'mac': mac, 'mac': mac,
'type': 'gps' if pos.type() == PositionFile.GPS else 'geo', 'type': pos_type,
'lng': pos.lng(), 'lng': pos.lng(),
'lat': pos.lat(), 'lat': pos.lat(),
'acc': pos.accuracy(), 'acc': pos.accuracy(),
@ -201,24 +209,25 @@ class Webgpsmap(plugins.Plugin):
'ts_last': pos.timestamp_last(), 'ts_last': pos.timestamp_last(),
} }
# get ap password if exist
check_for = os.path.basename(pos_file[:-9]) + ".pcap.cracked" check_for = os.path.basename(pos_file[:-9]) + ".pcap.cracked"
if check_for in all_files: if check_for in all_files:
gps_data[ssid + "_" + mac]["pass"] = pos.password() gps_data[ssid + "_" + mac]["pass"] = pos.password()
self.ALREADY_SENT += pos_file self.ALREADY_SENT += pos_file
except json.JSONDecodeError as js_e: except json.JSONDecodeError as error:
self.SKIP += pos_file self.SKIP += pos_file
logging.error(js_e) logging.error(f"[webgpsmap] JSONDecodeError in: {error}")
continue continue
except ValueError as v_e: except ValueError as error:
self.SKIP += pos_file self.SKIP += pos_file
logging.error(v_e) logging.error(f"[webgpsmap] ValueError: {error}")
continue continue
except OSError as os_e: except OSError as error:
self.SKIP += pos_file self.SKIP += pos_file
logging.error(os_e) logging.error(f"[webgpsmap] OSError: {error}")
continue continue
logging.info("webgpsmap loaded %d positions", len(gps_data)) logging.info(f"[webgpsmap] loaded {len(gps_data)} positions")
return gps_data return gps_data
def get_html(self): def get_html(self):
@ -226,11 +235,10 @@ class Webgpsmap(plugins.Plugin):
Returns the html page Returns the html page
""" """
try: try:
template_file = os.path.dirname(os.path.realpath(__file__))+"/"+"webgpsmap.html" template_file = os.path.dirname(os.path.realpath(__file__)) + "/" + "webgpsmap.html"
html_data = open(template_file, "r").read() html_data = open(template_file, "r").read()
except Exception as ex: except Exception as error:
logging.error("error loading template file: %s", template_file) logging.error(f"[webgpsmap] error loading template file {template_file} - error: {error}")
logging.error(ex)
return html_data return html_data
@ -238,15 +246,18 @@ class PositionFile:
""" """
Wraps gps / net-pos files Wraps gps / net-pos files
""" """
GPS = 0 GPS = 1
GEO = 1 GEO = 2
PAWGPS = 3
def __init__(self, path): def __init__(self, path):
self._file = path self._file = path
self._filename = os.path.basename(path) self._filename = os.path.basename(path)
try: try:
logging.debug(f"[webgpsmap] loading {path}")
with open(path, 'r') as json_file: with open(path, 'r') as json_file:
self._json = json.load(json_file) self._json = json.load(json_file)
logging.debug(f"[webgpsmap] loaded {path}")
except json.JSONDecodeError as js_e: except json.JSONDecodeError as js_e:
raise js_e raise js_e
@ -254,7 +265,7 @@ class PositionFile:
""" """
Returns the mac from filename Returns the mac from filename
""" """
parsed_mac = re.search(r'.*_?([a-zA-Z0-9]{12})\.(?:gps|geo)\.json', self._filename) parsed_mac = re.search(r'.*_?([a-zA-Z0-9]{12})\.(?:gps|geo|paw-gps)\.json', self._filename)
if parsed_mac: if parsed_mac:
mac = parsed_mac.groups()[0] mac = parsed_mac.groups()[0]
return mac return mac
@ -264,7 +275,7 @@ class PositionFile:
""" """
Returns the ssid from filename Returns the ssid from filename
""" """
parsed_ssid = re.search(r'(.+)_[a-zA-Z0-9]{12}\.(?:gps|geo)\.json', self._filename) parsed_ssid = re.search(r'(.+)_[a-zA-Z0-9]{12}\.(?:gps|geo|paw-gps)\.json', self._filename)
if parsed_ssid: if parsed_ssid:
return parsed_ssid.groups()[0] return parsed_ssid.groups()[0]
return None return None
@ -293,32 +304,37 @@ class PositionFile:
elif 'Updated' in self._json: elif 'Updated' in self._json:
# convert gps datetime to unix timestamp: "2019-10-05T23:12:40.422996+01:00" # convert gps datetime to unix timestamp: "2019-10-05T23:12:40.422996+01:00"
date_iso_formated = self._json['Updated'] date_iso_formated = self._json['Updated']
# fill/cut microseconds to 6 numbers # bad microseconds fix: fill/cut microseconds to 6 numbers
part1, part2, part3 = re.split('\.|\+', date_iso_formated) part1, part2, part3 = re.split('\.|\+', date_iso_formated)
part2 = part2.ljust(6, '0')[:6] part2 = part2.ljust(6, '0')[:6]
# timezone fix: 0200 >>> 02:00
if len(part3) == 4:
part3 = part3[1:2].rjust(2, '0') + ':' + part3[3:4].rjust(2, '0')
date_iso_formated = part1 + "." + part2 + "+" + part3 date_iso_formated = part1 + "." + part2 + "+" + part3
dateObj = datetime.datetime.fromisoformat(date_iso_formated) dateObj = datetime.datetime.fromisoformat(date_iso_formated)
return_ts = int("%.0f" % dateObj.timestamp()) return_ts = int("%.0f" % dateObj.timestamp())
else: else:
# use file timestamp last modification of the pcap file # use file timestamp last modification of the json file
return_ts = int("%.0f" % os.path.getmtime(self._file)) return_ts = int("%.0f" % os.path.getmtime(self._file))
return return_ts return return_ts
def password(self): def password(self):
""" """
returns the password from file.pcap.cracked od None returns the password from file.pcap.cracked or None
""" """
return_pass = None return_pass = None
password_file_path = self._file[:-9] + ".pcap.cracked" # 2do: make better filename split/remove extension because this one has problems with "." in path
base_filename, ext1, ext2 = re.split('\.', self._file)
password_file_path = base_filename + ".pcap.cracked"
if os.path.isfile(password_file_path): if os.path.isfile(password_file_path):
try: try:
password_file = open(password_file_path, 'r') password_file = open(password_file_path, 'r')
return_pass = password_file.read() return_pass = password_file.read()
password_file.close() password_file.close()
except OSError as err: except OSError as error:
print("OS error: {0}".format(err)) logging.error(f"[webgpsmap] OS error: {format(error)}")
except: except:
print("Unexpected error:", sys.exc_info()[0]) logging.error(f"[webgpsmap] Unexpected error: {sys.exc_info()[0]}")
raise raise
return return_pass return return_pass
@ -330,37 +346,57 @@ class PositionFile:
return PositionFile.GPS return PositionFile.GPS
if self._file.endswith('.geo.json'): if self._file.endswith('.geo.json'):
return PositionFile.GEO return PositionFile.GEO
if self._file.endswith('.paw-gps.json'):
return PositionFile.PAWGPS
return None return None
def lat(self): def lat(self):
try: try:
if self.type() == PositionFile.GPS: lat = None
# try to get value from known formats
if 'Latitude' in self._json:
lat = self._json['Latitude'] lat = self._json['Latitude']
if self.type() == PositionFile.GEO: if 'lat' in self._json:
lat = self._json['location']['lat'] lat = self._json['lat'] # an old paw-gps format: {"long": 14.693561, "lat": 40.806375}
if lat != 0: if 'location' in self._json:
return lat if 'lat' in self._json['location']:
raise ValueError("Lat is 0") lat = self._json['location']['lat']
# check value
if lat is None:
raise ValueError(f"Lat is None in {self._filename}")
if lat == 0:
raise ValueError(f"Lat is 0 in {self._filename}")
return lat
except KeyError: except KeyError:
pass pass
return None return None
def lng(self): def lng(self):
try: try:
if self.type() == PositionFile.GPS: lng = None
# try to get value from known formats
if 'Longitude' in self._json:
lng = self._json['Longitude'] lng = self._json['Longitude']
if self.type() == PositionFile.GEO: if 'long' in self._json:
lng = self._json['location']['lng'] lng = self._json['long'] # an old paw-gps format: {"long": 14.693561, "lat": 40.806375}
if lng != 0: if 'location' in self._json:
return lng if 'lng' in self._json['location']:
raise ValueError("Lng is 0") lng = self._json['location']['lng']
# check value
if lng is None:
raise ValueError(f"Lng is None in {self._filename}")
if lng == 0:
raise ValueError(f"Lng is 0 in {self._filename}")
return lng
except KeyError: except KeyError:
pass pass
return None return None
def accuracy(self): def accuracy(self):
if self.type() == PositionFile.GPS: if self.type() == PositionFile.GPS:
return 50.0 return 50.0 # a default
if self.type() == PositionFile.PAWGPS:
return 50.0 # a default
if self.type() == PositionFile.GEO: if self.type() == PositionFile.GEO:
try: try:
return self._json['accuracy'] return self._json['accuracy']