Merge pull request #208 from dadav/feature/pcap_parsing
Put pcap parsing in utils class
This commit is contained in:
commit
2083b68f29
@ -11,6 +11,7 @@ import glob
|
||||
import subprocess
|
||||
import pwnagotchi
|
||||
import pwnagotchi.utils as utils
|
||||
from pwnagotchi.utils import WifiInfo, extract_from_pcap
|
||||
|
||||
OPTIONS = dict()
|
||||
AUTH = utils.StatusFile('/root/.api-enrollment.json', data_format='json')
|
||||
@ -67,17 +68,6 @@ def get_api_token(log, keys):
|
||||
return AUTH.data["token"]
|
||||
|
||||
|
||||
def parse_packet(packet, info):
|
||||
from scapy.all import Dot11Elt, Dot11Beacon, Dot11, Dot11ProbeResp, Dot11AssoReq, Dot11ReassoReq
|
||||
if packet.haslayer(Dot11Beacon):
|
||||
if packet.haslayer(Dot11ProbeResp) or packet.haslayer(Dot11AssoReq) or packet.haslayer(Dot11ReassoReq):
|
||||
if hasattr(packet[Dot11], 'addr3'):
|
||||
info['bssid'] = packet[Dot11].addr3
|
||||
if hasattr(packet[Dot11Elt], 'info'):
|
||||
info['essid'] = packet[Dot11Elt].info.decode('utf-8')
|
||||
return info
|
||||
|
||||
|
||||
def parse_pcap(filename):
|
||||
logging.info("api: parsing %s ..." % filename)
|
||||
|
||||
@ -94,20 +84,16 @@ def parse_pcap(filename):
|
||||
bssid = ':'.join([a + b for a, b in zip(it, it)])
|
||||
|
||||
info = {
|
||||
'essid': essid,
|
||||
'bssid': bssid
|
||||
WifiInfo.ESSID: essid,
|
||||
WifiInfo.BSSID: bssid,
|
||||
}
|
||||
|
||||
try:
|
||||
from scapy.all import rdpcap
|
||||
|
||||
for pkt in rdpcap(filename):
|
||||
info = parse_packet(pkt, info)
|
||||
|
||||
info = extract_from_pcap(filename, [WifiInfo.BSSID, WifiInfo.ESSID])
|
||||
except Exception as e:
|
||||
logging.error("api: %s" % e)
|
||||
|
||||
return info['essid'], info['bssid']
|
||||
return info[WifiInfo.ESSID], info[WifiInfo.BSSID]
|
||||
|
||||
|
||||
def api_report_ap(log, keys, token, essid, bssid):
|
||||
|
@ -12,6 +12,7 @@ import csv
|
||||
from datetime import datetime
|
||||
import requests
|
||||
from pwnagotchi.mesh.wifi import freq_to_channel
|
||||
from pwnagotchi.utils import WifiInfo, FieldNotFoundError, extract_from_pcap
|
||||
|
||||
READY = False
|
||||
ALREADY_UPLOADED = None
|
||||
@ -151,13 +152,13 @@ def _transform_wigle_entry(gps_data, pcap_data):
|
||||
|
||||
writer = csv.writer(dummy, delimiter=",", quoting=csv.QUOTE_NONE)
|
||||
writer.writerow([
|
||||
pcap_data['bssid'],
|
||||
pcap_data['essid'].decode('utf-8'),
|
||||
_format_auth(pcap_data['encryption']),
|
||||
pcap_data[WifiInfo.BSSID],
|
||||
pcap_data[WifiInfo.ESSID],
|
||||
_format_auth(pcap_data[WifiInfo.ENCRYPTION]),
|
||||
datetime.strptime(gps_data['Updated'].rsplit('.')[0],
|
||||
"%Y-%m-%dT%H:%M:%S").strftime('%Y-%m-%d %H:%M:%S'),
|
||||
pcap_data['channel'],
|
||||
pcap_data['rssi'],
|
||||
pcap_data[WifiInfo.CHANNEL],
|
||||
pcap_data[WifiInfo.RSSI],
|
||||
gps_data['Latitude'],
|
||||
gps_data['Longitude'],
|
||||
gps_data['Altitude'],
|
||||
@ -238,23 +239,17 @@ def on_internet_available(display, keypair, config, log):
|
||||
continue
|
||||
|
||||
try:
|
||||
pcap_data = _analyze_pcap(pcap_filename)
|
||||
except Scapy_Exception as sc_e:
|
||||
logging.error("WIGLE: %s", sc_e)
|
||||
pcap_data = extract_from_pcap(pcap_filename, [WifiInfo.BSSID,
|
||||
WifiInfo.ESSID,
|
||||
WifiInfo.ENCRYPTION,
|
||||
WifiInfo.CHANNEL,
|
||||
WifiInfo.RSSI])
|
||||
except FieldNotFoundError:
|
||||
logging.error("WIGLE: Could not extract all informations. Skip %s", gps_file)
|
||||
SKIP.append(gps_file)
|
||||
continue
|
||||
|
||||
# encrypption-key is only there if privacy-cap was set
|
||||
if 'encryption' in pcap_data:
|
||||
if not pcap_data['encryption']:
|
||||
pcap_data['encryption'].add('WEP')
|
||||
else:
|
||||
# no encryption, nothing to eat :(
|
||||
pcap_data['encryption'] = set()
|
||||
pcap_data['encryption'].add('OPN')
|
||||
|
||||
if len(pcap_data) < 5:
|
||||
# not enough data; try next time
|
||||
except Scapy_Exception as sc_e:
|
||||
logging.error("WIGLE: %s", sc_e)
|
||||
SKIP.append(gps_file)
|
||||
continue
|
||||
|
||||
|
@ -1,4 +1,5 @@
|
||||
from datetime import datetime
|
||||
from enum import Enum
|
||||
import logging
|
||||
import glob
|
||||
import os
|
||||
@ -83,6 +84,102 @@ def blink(times=1, delay=0.3):
|
||||
time.sleep(delay)
|
||||
led(True)
|
||||
|
||||
class WifiInfo(Enum):
|
||||
"""
|
||||
Fields you can extract from a pcap file
|
||||
"""
|
||||
BSSID = 0
|
||||
ESSID = 1
|
||||
ENCRYPTION = 2
|
||||
CHANNEL = 3
|
||||
RSSI = 4
|
||||
|
||||
class FieldNotFoundError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
def extract_from_pcap(path, fields):
|
||||
"""
|
||||
Search in pcap-file for specified information
|
||||
|
||||
path: Path to pcap file
|
||||
fields: Array of fields that should be extracted
|
||||
|
||||
If a field is not found, FieldNotFoundError is raised
|
||||
"""
|
||||
results = dict()
|
||||
for field in fields:
|
||||
if not isinstance(field, WifiInfo):
|
||||
raise TypeError("Invalid field")
|
||||
|
||||
subtypes = set()
|
||||
|
||||
if field == WifiInfo.BSSID:
|
||||
from scapy.all import Dot11Beacon, Dot11ProbeResp, Dot11AssoReq, Dot11ReassoReq, Dot11, sniff
|
||||
subtypes.add('beacon')
|
||||
bpf_filter = " or ".join([f"wlan type mgt subtype {subtype}" for subtype in subtypes])
|
||||
packets = sniff(offline=path, filter=bpf_filter)
|
||||
try:
|
||||
for packet in packets:
|
||||
if packet.haslayer(Dot11Beacon):
|
||||
if hasattr(packet[Dot11], 'addr3'):
|
||||
results[field] = packet[Dot11].addr3
|
||||
break
|
||||
else: # magic
|
||||
raise FieldNotFoundError("Could not find field [BSSID]")
|
||||
except Exception:
|
||||
raise FieldNotFoundError("Could not find field [BSSID]")
|
||||
elif field == WifiInfo.ESSID:
|
||||
from scapy.all import Dot11Beacon, Dot11ReassoReq, Dot11AssoReq, Dot11, sniff, Dot11Elt
|
||||
subtypes.add('beacon')
|
||||
subtypes.add('assoc-req')
|
||||
subtypes.add('reassoc-req')
|
||||
bpf_filter = " or ".join([f"wlan type mgt subtype {subtype}" for subtype in subtypes])
|
||||
packets = sniff(offline=path, filter=bpf_filter)
|
||||
try:
|
||||
for packet in packets:
|
||||
if packet.haslayer(Dot11Elt) and hasattr(packet[Dot11Elt], 'info'):
|
||||
results[field] = packet[Dot11Elt].info.decode('utf-8')
|
||||
break
|
||||
else: # magic
|
||||
raise FieldNotFoundError("Could not find field [ESSID]")
|
||||
except Exception:
|
||||
raise FieldNotFoundError("Could not find field [ESSID]")
|
||||
elif field == WifiInfo.ENCRYPTION:
|
||||
from scapy.all import Dot11Beacon, sniff
|
||||
subtypes.add('beacon')
|
||||
bpf_filter = " or ".join([f"wlan type mgt subtype {subtype}" for subtype in subtypes])
|
||||
packets = sniff(offline=path, filter=bpf_filter)
|
||||
try:
|
||||
for packet in packets:
|
||||
if packet.haslayer(Dot11Beacon) and hasattr(packet[Dot11Beacon], 'network_stats'):
|
||||
stats = packet[Dot11Beacon].network_stats()
|
||||
if 'crypto' in stats:
|
||||
results[field] = stats['crypto'] # set with encryption types
|
||||
break
|
||||
else: # magic
|
||||
raise FieldNotFoundError("Could not find field [ENCRYPTION]")
|
||||
except Exception:
|
||||
raise FieldNotFoundError("Could not find field [ENCRYPTION]")
|
||||
elif field == WifiInfo.CHANNEL:
|
||||
from scapy.all import sniff, RadioTap
|
||||
from pwnagotchi.mesh.wifi import freq_to_channel
|
||||
packets = sniff(offline=path, count=1)
|
||||
try:
|
||||
results[field] = freq_to_channel(packets[0][RadioTap].ChannelFrequency)
|
||||
except Exception:
|
||||
raise FieldNotFoundError("Could not find field [CHANNEL]")
|
||||
elif field == WifiInfo.RSSI:
|
||||
from scapy.all import sniff, RadioTap
|
||||
from pwnagotchi.mesh.wifi import freq_to_channel
|
||||
packets = sniff(offline=path, count=1)
|
||||
try:
|
||||
results[field] = packets[0][RadioTap].dBm_AntSignal
|
||||
except Exception:
|
||||
raise FieldNotFoundError("Could not find field [RSSI]")
|
||||
|
||||
return results
|
||||
|
||||
|
||||
class StatusFile(object):
|
||||
def __init__(self, path, data_format='raw'):
|
||||
|
Loading…
x
Reference in New Issue
Block a user