From 1f99a249c63b09152b89bdd5eb354b1d2c795391 Mon Sep 17 00:00:00 2001 From: dadav <33197631+dadav@users.noreply.github.com> Date: Mon, 7 Oct 2019 19:59:28 +0200 Subject: [PATCH] Put pcap parsing in utils class --- pwnagotchi/plugins/default/grid.py | 24 ++----- pwnagotchi/plugins/default/wigle.py | 35 +++++------ pwnagotchi/utils.py | 97 +++++++++++++++++++++++++++++ 3 files changed, 117 insertions(+), 39 deletions(-) diff --git a/pwnagotchi/plugins/default/grid.py b/pwnagotchi/plugins/default/grid.py index 6c151b3..dfc54ad 100644 --- a/pwnagotchi/plugins/default/grid.py +++ b/pwnagotchi/plugins/default/grid.py @@ -11,6 +11,7 @@ import glob import subprocess import pwnagotchi import pwnagotchi.utils as utils +from pwnagotchi.utils import WifiInfo, extract_from_pcap OPTIONS = dict() AUTH = utils.StatusFile('/root/.api-enrollment.json', data_format='json') @@ -67,17 +68,6 @@ def get_api_token(log, keys): return AUTH.data["token"] -def parse_packet(packet, info): - from scapy.all import Dot11Elt, Dot11Beacon, Dot11, Dot11ProbeResp, Dot11AssoReq, Dot11ReassoReq - if packet.haslayer(Dot11Beacon): - if packet.haslayer(Dot11ProbeResp) or packet.haslayer(Dot11AssoReq) or packet.haslayer(Dot11ReassoReq): - if hasattr(packet[Dot11], 'addr3'): - info['bssid'] = packet[Dot11].addr3 - if hasattr(packet[Dot11Elt], 'info'): - info['essid'] = packet[Dot11Elt].info.decode('utf-8') - return info - - def parse_pcap(filename): logging.info("api: parsing %s ..." % filename) @@ -94,20 +84,16 @@ def parse_pcap(filename): bssid = ':'.join([a + b for a, b in zip(it, it)]) info = { - 'essid': essid, - 'bssid': bssid + WifiInfo.ESSID: essid, + WifiInfo.BSSID: bssid, } try: - from scapy.all import rdpcap - - for pkt in rdpcap(filename): - info = parse_packet(pkt, info) - + info = extract_from_pcap(filename, [WifiInfo.BSSID, WifiInfo.ESSID]) except Exception as e: logging.error("api: %s" % e) - return info['essid'], info['bssid'] + return info[WifiInfo.ESSID], info[WifiInfo.BSSID] def api_report_ap(log, keys, token, essid, bssid): diff --git a/pwnagotchi/plugins/default/wigle.py b/pwnagotchi/plugins/default/wigle.py index da57f94..4f89ad4 100644 --- a/pwnagotchi/plugins/default/wigle.py +++ b/pwnagotchi/plugins/default/wigle.py @@ -12,6 +12,7 @@ import csv from datetime import datetime import requests from pwnagotchi.mesh.wifi import freq_to_channel +from pwnagotchi.utils import WifiInfo, FieldNotFoundError, extract_from_pcap READY = False ALREADY_UPLOADED = None @@ -151,13 +152,13 @@ def _transform_wigle_entry(gps_data, pcap_data): writer = csv.writer(dummy, delimiter=",", quoting=csv.QUOTE_NONE) writer.writerow([ - pcap_data['bssid'], - pcap_data['essid'].decode('utf-8'), - _format_auth(pcap_data['encryption']), + pcap_data[WifiInfo.BSSID], + pcap_data[WifiInfo.ESSID], + _format_auth(pcap_data[WifiInfo.ENCRYPTION]), datetime.strptime(gps_data['Updated'].rsplit('.')[0], "%Y-%m-%dT%H:%M:%S").strftime('%Y-%m-%d %H:%M:%S'), - pcap_data['channel'], - pcap_data['rssi'], + pcap_data[WifiInfo.CHANNEL], + pcap_data[WifiInfo.RSSI], gps_data['Latitude'], gps_data['Longitude'], gps_data['Altitude'], @@ -238,23 +239,17 @@ def on_internet_available(display, keypair, config, log): continue try: - pcap_data = _analyze_pcap(pcap_filename) - except Scapy_Exception as sc_e: - logging.error("WIGLE: %s", sc_e) + pcap_data = extract_from_pcap(pcap_filename, [WifiInfo.BSSID, + WifiInfo.ESSID, + WifiInfo.ENCRYPTION, + WifiInfo.CHANNEL, + WifiInfo.RSSI]) + except FieldNotFoundError: + logging.error("WIGLE: Could not extract all informations. Skip %s", gps_file) SKIP.append(gps_file) continue - - # encrypption-key is only there if privacy-cap was set - if 'encryption' in pcap_data: - if not pcap_data['encryption']: - pcap_data['encryption'].add('WEP') - else: - # no encryption, nothing to eat :( - pcap_data['encryption'] = set() - pcap_data['encryption'].add('OPN') - - if len(pcap_data) < 5: - # not enough data; try next time + except Scapy_Exception as sc_e: + logging.error("WIGLE: %s", sc_e) SKIP.append(gps_file) continue diff --git a/pwnagotchi/utils.py b/pwnagotchi/utils.py index 8c30e94..bde9fd9 100644 --- a/pwnagotchi/utils.py +++ b/pwnagotchi/utils.py @@ -1,4 +1,5 @@ from datetime import datetime +from enum import Enum import logging import glob import os @@ -83,6 +84,102 @@ def blink(times=1, delay=0.3): time.sleep(delay) led(True) +class WifiInfo(Enum): + """ + Fields you can extract from a pcap file + """ + BSSID = 0 + ESSID = 1 + ENCRYPTION = 2 + CHANNEL = 3 + RSSI = 4 + +class FieldNotFoundError(Exception): + pass + + +def extract_from_pcap(path, fields): + """ + Search in pcap-file for specified information + + path: Path to pcap file + fields: Array of fields that should be extracted + + If a field is not found, FieldNotFoundError is raised + """ + results = dict() + for field in fields: + if not isinstance(field, WifiInfo): + raise TypeError("Invalid field") + + subtypes = set() + + if field == WifiInfo.BSSID: + from scapy.all import Dot11Beacon, Dot11ProbeResp, Dot11AssoReq, Dot11ReassoReq, Dot11, sniff + subtypes.add('beacon') + bpf_filter = " or ".join([f"wlan type mgt subtype {subtype}" for subtype in subtypes]) + packets = sniff(offline=path, filter=bpf_filter) + try: + for packet in packets: + if packet.haslayer(Dot11Beacon): + if hasattr(packet[Dot11], 'addr3'): + results[field] = packet[Dot11].addr3 + break + else: # magic + raise FieldNotFoundError("Could not find field [BSSID]") + except Exception: + raise FieldNotFoundError("Could not find field [BSSID]") + elif field == WifiInfo.ESSID: + from scapy.all import Dot11Beacon, Dot11ReassoReq, Dot11AssoReq, Dot11, sniff, Dot11Elt + subtypes.add('beacon') + subtypes.add('assoc-req') + subtypes.add('reassoc-req') + bpf_filter = " or ".join([f"wlan type mgt subtype {subtype}" for subtype in subtypes]) + packets = sniff(offline=path, filter=bpf_filter) + try: + for packet in packets: + if packet.haslayer(Dot11Elt) and hasattr(packet[Dot11Elt], 'info'): + results[field] = packet[Dot11Elt].info.decode('utf-8') + break + else: # magic + raise FieldNotFoundError("Could not find field [ESSID]") + except Exception: + raise FieldNotFoundError("Could not find field [ESSID]") + elif field == WifiInfo.ENCRYPTION: + from scapy.all import Dot11Beacon, sniff + subtypes.add('beacon') + bpf_filter = " or ".join([f"wlan type mgt subtype {subtype}" for subtype in subtypes]) + packets = sniff(offline=path, filter=bpf_filter) + try: + for packet in packets: + if packet.haslayer(Dot11Beacon) and hasattr(packet[Dot11Beacon], 'network_stats'): + stats = packet[Dot11Beacon].network_stats() + if 'crypto' in stats: + results[field] = stats['crypto'] # set with encryption types + break + else: # magic + raise FieldNotFoundError("Could not find field [ENCRYPTION]") + except Exception: + raise FieldNotFoundError("Could not find field [ENCRYPTION]") + elif field == WifiInfo.CHANNEL: + from scapy.all import sniff, RadioTap + from pwnagotchi.mesh.wifi import freq_to_channel + packets = sniff(offline=path, count=1) + try: + results[field] = freq_to_channel(packets[0][RadioTap].ChannelFrequency) + except Exception: + raise FieldNotFoundError("Could not find field [CHANNEL]") + elif field == WifiInfo.RSSI: + from scapy.all import sniff, RadioTap + from pwnagotchi.mesh.wifi import freq_to_channel + packets = sniff(offline=path, count=1) + try: + results[field] = packets[0][RadioTap].dBm_AntSignal + except Exception: + raise FieldNotFoundError("Could not find field [RSSI]") + + return results + class StatusFile(object): def __init__(self, path, data_format='raw'):