diff --git a/pwnagotchi/plugins/default/webgpsmap.py b/pwnagotchi/plugins/default/webgpsmap.py index cb1a2ad..85479f9 100644 --- a/pwnagotchi/plugins/default/webgpsmap.py +++ b/pwnagotchi/plugins/default/webgpsmap.py @@ -9,7 +9,7 @@ from functools import lru_cache ''' 2do: - - make the cache handling multiple clients + - make+test the cache handling multiple clients - cleanup the javascript in a class and handle "/newest" additions - create map filters (only cracked APs, only last xx days, between 2 days with slider) http://www.gistechsolutions.com/leaflet/DEMO/filter/filter.html @@ -22,16 +22,10 @@ from functools import lru_cache class Webgpsmap(plugins.Plugin): __author__ = 'https://github.com/xenDE and https://github.com/dadav' - __version__ = '1.2.2' + __version__ = '1.3.0' __name__ = 'webgpsmap' __license__ = 'GPL3' __description__ = 'a plugin for pwnagotchi that shows a openstreetmap with positions of ap-handshakes in your webbrowser' - __help__ = """ -- install: copy "webgpsmap.py" and "webgpsmap.html" to your configured "custom_plugins" directory -- add webgpsmap.yml to your config -- connect your PC/Smartphone/* with USB, BT or other to your pwnagotchi and browse to http://pwnagotchi.local:8080/plugins/webgpsmap/ - (change pwnagotchi.local to your pwnagotchis IP, if needed) -""" ALREADY_SENT = list() SKIP = list() @@ -47,7 +41,7 @@ class Webgpsmap(plugins.Plugin): """ Plugin got loaded """ - logging.info("webgpsmap plugin loaded") + logging.info("[webgpsmap]: plugin loaded") def on_webhook(self, path, request): """ @@ -68,8 +62,8 @@ class Webgpsmap(plugins.Plugin): response_status = 500 response_mimetype = "application/xhtml+xml" response_header_contenttype = 'text/html' - except Exception as ex: - logging.error(ex) + except Exception as error: + logging.error(f"[webgpsmap] error: {error}") return else: if request.method == "GET": @@ -78,8 +72,8 @@ class Webgpsmap(plugins.Plugin): self.ALREADY_SENT = list() try: response_data = bytes(self.get_html(), "utf-8") - except Exception as ex: - logging.error(ex) + except Exception as error: + logging.error(f"[webgpsmap] error: {error}") return response_status = 200 response_mimetype = "application/xhtml+xml" @@ -92,8 +86,8 @@ class Webgpsmap(plugins.Plugin): response_status = 200 response_mimetype = "application/json" response_header_contenttype = 'application/json' - except Exception as ex: - logging.error(ex) + except Exception as error: + logging.error(f"[webgpsmap] error: {error}") return # elif path.startswith('/newest'): # # returns all positions newer then timestamp @@ -118,7 +112,7 @@ class Webgpsmap(plugins.Plugin): <meta charset="utf-8"/> <style>body{font-size:1000%;}</style> </head> - <body>4😋4</body> + <body>4😋4 for bad boys</body> </html>''', "utf-8") response_status = 404 try: @@ -126,12 +120,12 @@ class Webgpsmap(plugins.Plugin): if response_header_contenttype is not None: r.headers["Content-Type"] = response_header_contenttype return r - except Exception as ex: - logging.error(ex) + except Exception as error: + logging.error(f"[webgpsmap] error: {error}") return - # cache 1024 items - @lru_cache(maxsize=1024, typed=False) + # cache 2048 items + @lru_cache(maxsize=2048, typed=False) def _get_pos_from_file(self, path): return PositionFile(path) @@ -144,7 +138,7 @@ class Webgpsmap(plugins.Plugin): handshake_dir = gpsdir gps_data = dict() - logging.info("webgpsmap: scanning %s", handshake_dir) + logging.info(f"[webgpsmap] scanning {handshake_dir}") all_files = os.listdir(handshake_dir) @@ -156,33 +150,40 @@ class Webgpsmap(plugins.Plugin): all_geo_or_gps_files = [] for filename_pcap in all_pcap_files: filename_base = filename_pcap[:-5] # remove ".pcap" - logging.debug("webgpsmap: found: " + filename_base) + logging.debug(f"[webgpsmap] found: {filename_base}") filename_position = None + logging.debug("[webgpsmap] search for .gps.json") check_for = os.path.basename(filename_base) + ".gps.json" if check_for in all_files: filename_position = str(os.path.join(handshake_dir, check_for)) + logging.debug("[webgpsmap] search for .geo.json") check_for = os.path.basename(filename_base) + ".geo.json" if check_for in all_files: filename_position = str(os.path.join(handshake_dir, check_for)) + logging.debug("[webgpsmap] search for .paw-gps.json") + check_for = os.path.basename(filename_base) + ".paw-gps.json" + if check_for in all_files: + filename_position = str(os.path.join(handshake_dir, check_for)) + + logging.debug(f"[webgpsmap] end search for position data files and use {filename_position}") + if filename_position is not None: - # logging.debug("webgpsmap: -- found: %s %d" % (check_for, len(all_geo_or_gps_files)) ) all_geo_or_gps_files.append(filename_position) - # all_geo_or_gps_files = set(all_geo_or_gps_files) - set(SKIP) # remove skiped networks? No! + # all_geo_or_gps_files = set(all_geo_or_gps_files) - set(SKIP) # remove skipped networks? No! if newest_only: all_geo_or_gps_files = set(all_geo_or_gps_files) - set(self.ALREADY_SENT) - logging.info("webgpsmap: Found %d .(geo|gps).json files from %d handshakes. Fetching positions ...", - len(all_geo_or_gps_files), len(all_pcap_files)) + logging.info(f"[webgpsmap] Found {len(all_geo_or_gps_files)} position-data files from {len(all_pcap_files)} handshakes. Fetching positions ...") for pos_file in all_geo_or_gps_files: try: pos = self._get_pos_from_file(pos_file) - if not pos.type() == PositionFile.GPS and not pos.type() == PositionFile.GEO: + if not pos.type() == PositionFile.GPS and not pos.type() == PositionFile.GEO and not pos.type() == PositionFile.PAWGPS: continue ssid, mac = pos.ssid(), pos.mac() @@ -190,10 +191,17 @@ class Webgpsmap(plugins.Plugin): # invalid mac is strange and should abort; ssid is ok if not mac: raise ValueError("Mac can't be parsed from filename") + pos_type = 'unknown' + if pos.type() == PositionFile.GPS: + pos_type = 'gps' + elif pos.type() == PositionFile.GEO: + pos_type = 'geo' + elif pos.type() == PositionFile.PAWGPS: + pos_type = 'paw' gps_data[ssid+"_"+mac] = { 'ssid': ssid, 'mac': mac, - 'type': 'gps' if pos.type() == PositionFile.GPS else 'geo', + 'type': pos_type, 'lng': pos.lng(), 'lat': pos.lat(), 'acc': pos.accuracy(), @@ -201,24 +209,25 @@ class Webgpsmap(plugins.Plugin): 'ts_last': pos.timestamp_last(), } + # get ap password if exist check_for = os.path.basename(pos_file[:-9]) + ".pcap.cracked" if check_for in all_files: gps_data[ssid + "_" + mac]["pass"] = pos.password() self.ALREADY_SENT += pos_file - except json.JSONDecodeError as js_e: + except json.JSONDecodeError as error: self.SKIP += pos_file - logging.error(js_e) + logging.error(f"[webgpsmap] JSONDecodeError in: {error}") continue - except ValueError as v_e: + except ValueError as error: self.SKIP += pos_file - logging.error(v_e) + logging.error(f"[webgpsmap] ValueError: {error}") continue - except OSError as os_e: + except OSError as error: self.SKIP += pos_file - logging.error(os_e) + logging.error(f"[webgpsmap] OSError: {error}") continue - logging.info("webgpsmap loaded %d positions", len(gps_data)) + logging.info(f"[webgpsmap] loaded {len(gps_data)} positions") return gps_data def get_html(self): @@ -226,11 +235,10 @@ class Webgpsmap(plugins.Plugin): Returns the html page """ try: - template_file = os.path.dirname(os.path.realpath(__file__))+"/"+"webgpsmap.html" + template_file = os.path.dirname(os.path.realpath(__file__)) + "/" + "webgpsmap.html" html_data = open(template_file, "r").read() - except Exception as ex: - logging.error("error loading template file: %s", template_file) - logging.error(ex) + except Exception as error: + logging.error(f"[webgpsmap] error loading template file {template_file} - error: {error}") return html_data @@ -238,15 +246,18 @@ class PositionFile: """ Wraps gps / net-pos files """ - GPS = 0 - GEO = 1 + GPS = 1 + GEO = 2 + PAWGPS = 3 def __init__(self, path): self._file = path self._filename = os.path.basename(path) try: + logging.debug(f"[webgpsmap] loading {path}") with open(path, 'r') as json_file: self._json = json.load(json_file) + logging.debug(f"[webgpsmap] loaded {path}") except json.JSONDecodeError as js_e: raise js_e @@ -254,7 +265,7 @@ class PositionFile: """ Returns the mac from filename """ - parsed_mac = re.search(r'.*_?([a-zA-Z0-9]{12})\.(?:gps|geo)\.json', self._filename) + parsed_mac = re.search(r'.*_?([a-zA-Z0-9]{12})\.(?:gps|geo|paw-gps)\.json', self._filename) if parsed_mac: mac = parsed_mac.groups()[0] return mac @@ -264,7 +275,7 @@ class PositionFile: """ Returns the ssid from filename """ - parsed_ssid = re.search(r'(.+)_[a-zA-Z0-9]{12}\.(?:gps|geo)\.json', self._filename) + parsed_ssid = re.search(r'(.+)_[a-zA-Z0-9]{12}\.(?:gps|geo|paw-gps)\.json', self._filename) if parsed_ssid: return parsed_ssid.groups()[0] return None @@ -293,32 +304,37 @@ class PositionFile: elif 'Updated' in self._json: # convert gps datetime to unix timestamp: "2019-10-05T23:12:40.422996+01:00" date_iso_formated = self._json['Updated'] - # fill/cut microseconds to 6 numbers + # bad microseconds fix: fill/cut microseconds to 6 numbers part1, part2, part3 = re.split('\.|\+', date_iso_formated) part2 = part2.ljust(6, '0')[:6] + # timezone fix: 0200 >>> 02:00 + if len(part3) == 4: + part3 = part3[1:2].rjust(2, '0') + ':' + part3[3:4].rjust(2, '0') date_iso_formated = part1 + "." + part2 + "+" + part3 dateObj = datetime.datetime.fromisoformat(date_iso_formated) return_ts = int("%.0f" % dateObj.timestamp()) else: - # use file timestamp last modification of the pcap file + # use file timestamp last modification of the json file return_ts = int("%.0f" % os.path.getmtime(self._file)) return return_ts def password(self): """ - returns the password from file.pcap.cracked od None + returns the password from file.pcap.cracked or None """ return_pass = None - password_file_path = self._file[:-9] + ".pcap.cracked" + # 2do: make better filename split/remove extension because this one has problems with "." in path + base_filename, ext1, ext2 = re.split('\.', self._file) + password_file_path = base_filename + ".pcap.cracked" if os.path.isfile(password_file_path): try: password_file = open(password_file_path, 'r') return_pass = password_file.read() password_file.close() - except OSError as err: - print("OS error: {0}".format(err)) + except OSError as error: + logging.error(f"[webgpsmap] OS error: {format(error)}") except: - print("Unexpected error:", sys.exc_info()[0]) + logging.error(f"[webgpsmap] Unexpected error: {sys.exc_info()[0]}") raise return return_pass @@ -330,37 +346,57 @@ class PositionFile: return PositionFile.GPS if self._file.endswith('.geo.json'): return PositionFile.GEO + if self._file.endswith('.paw-gps.json'): + return PositionFile.PAWGPS return None def lat(self): try: - if self.type() == PositionFile.GPS: + lat = None + # try to get value from known formats + if 'Latitude' in self._json: lat = self._json['Latitude'] - if self.type() == PositionFile.GEO: - lat = self._json['location']['lat'] - if lat != 0: - return lat - raise ValueError("Lat is 0") + if 'lat' in self._json: + lat = self._json['lat'] # an old paw-gps format: {"long": 14.693561, "lat": 40.806375} + if 'location' in self._json: + if 'lat' in self._json['location']: + lat = self._json['location']['lat'] + # check value + if lat is None: + raise ValueError(f"Lat is None in {self._filename}") + if lat == 0: + raise ValueError(f"Lat is 0 in {self._filename}") + return lat except KeyError: pass return None def lng(self): try: - if self.type() == PositionFile.GPS: + lng = None + # try to get value from known formats + if 'Longitude' in self._json: lng = self._json['Longitude'] - if self.type() == PositionFile.GEO: - lng = self._json['location']['lng'] - if lng != 0: - return lng - raise ValueError("Lng is 0") + if 'long' in self._json: + lng = self._json['long'] # an old paw-gps format: {"long": 14.693561, "lat": 40.806375} + if 'location' in self._json: + if 'lng' in self._json['location']: + lng = self._json['location']['lng'] + # check value + if lng is None: + raise ValueError(f"Lng is None in {self._filename}") + if lng == 0: + raise ValueError(f"Lng is 0 in {self._filename}") + return lng except KeyError: pass return None def accuracy(self): if self.type() == PositionFile.GPS: - return 50.0 + return 50.0 # a default + if self.type() == PositionFile.PAWGPS: + return 50.0 # a default if self.type() == PositionFile.GEO: try: return self._json['accuracy']