diff --git a/README.md b/README.md index a501534..71abbd5 100644 --- a/README.md +++ b/README.md @@ -17,10 +17,9 @@ The tool features a graphical user interface (GUI) built with **PySide6** (Qt fr ## Features - **Network Scanning**: Identifies devices on the network via ARP requests. - **Device Details**: Displays IP address, MAC address, hostname, and vendor information. -- **Real-time Sniffing**: Captures and lists ARP packets in real-time. - **Graphical User Interface**: Easy-to-use UI to display the scanned devices and packet information. - **Multithreading**: Ensures non-blocking scans using Python's `QThread`. - +- **C extension**: for MacOSX there is a C extension that allows slow sequential but very accurate arp scanning --- ## Prerequisites @@ -29,7 +28,7 @@ Ensure the following dependencies are installed: 1. **Python 3.12 or higher** 2. **scapy**: Used for ARP scanning. -3. **PySide6 or PyQt6**: For building the GUI. +3. **PySide6**: For building the GUI. 4. **netifaces**: To retrieve network interface details. ## Requirements @@ -70,11 +69,6 @@ Ensure the following dependencies are installed: sudo `which python3` main.py --interface ``` - Example: - ```bash - sudo `which python3` main.py --interface wlp0s20f3 - ``` - On Ubuntu in case you run into this error: ``` (env) alessandro@alessandro-XPS-9315:~/Development/phantom$ sudo /home/alessandro/Development/phantom/env/bin/python3 main.py --interface wlp0s20f3 @@ -87,6 +81,13 @@ Ensure the following dependencies are installed: ``` sudo apt install libxcb-cursor0 ``` + On Macos there is a C extension that allows accurate but slow arpscan. To build and install the extension: + ``` + pip install setuptools + cd c_extension + python setup.py build + python setup.py install + ``` ## Usage Instructions diff --git a/c_extension/arpscanner.c b/c_extension/arpscanner.c new file mode 100644 index 0000000..563670a --- /dev/null +++ b/c_extension/arpscanner.c @@ -0,0 +1,256 @@ +#define _GNU_SOURCE + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +// Define missing ARP constants for macOS +#define ARPHRD_ETHER 1 /* Ethernet hardware format */ +#define ARPOP_REQUEST 1 /* ARP request */ +#define ARPOP_REPLY 2 /* ARP reply */ + +#ifdef __APPLE__ + struct arphdr { + unsigned short ar_hrd; /* Format of hardware address */ + unsigned short ar_pro; /* Format of protocol address */ + unsigned char ar_hln; /* Length of hardware address */ + unsigned char ar_pln; /* Length of protocol address */ + unsigned short ar_op; /* ARP opcode (command) */ + }; + + struct ether_arp { + struct arphdr ea_hdr; /* Fixed-size header */ + unsigned char arp_sha[6];/* Sender hardware address */ + unsigned char arp_spa[4];/* Sender protocol address */ + unsigned char arp_tha[6];/* Target hardware address */ + unsigned char arp_tpa[4];/* Target protocol address */ + }; +#else + #include +#endif + +// Structure to hold ARP response data +typedef struct { + char ip_addr[16]; + unsigned char mac_addr[6]; + int found; +} arp_response_t; + +// Structure for packet capture thread +typedef struct { + pcap_t *handle; + arp_response_t *response; + struct in_addr target_ip; + int timeout_ms; + int finished; +} capture_thread_args_t; + +/* + * Function to get MAC address for interface + */ +static int get_mac_address(const char *iface, unsigned char *mac) { + struct ifaddrs *ifap, *ifa; + int found = 0; + + if (getifaddrs(&ifap) != 0) { + return -1; + } + + for (ifa = ifap; ifa != NULL; ifa = ifa->ifa_next) { + if (ifa->ifa_addr == NULL) continue; + + // Check if this is the interface we want and it's a link-layer address + if ((ifa->ifa_addr->sa_family == AF_LINK) && + (strcmp(ifa->ifa_name, iface) == 0)) { + struct sockaddr_dl *sdl = (struct sockaddr_dl *)ifa->ifa_addr; + + if (sdl->sdl_alen == 6) { // MAC address is 6 bytes + memcpy(mac, LLADDR(sdl), 6); + found = 1; + break; + } + } + } + + freeifaddrs(ifap); + return found ? 0 : -1; +} + +// Function to convert MAC address to string +static void mac_to_string(unsigned char *mac, char *str) { + snprintf(str, 18, "%02x:%02x:%02x:%02x:%02x:%02x", + mac[0], mac[1], mac[2], mac[3], mac[4], mac[5]); +} + +// Packet capture callback function +static void packet_handler(u_char *user_data, const struct pcap_pkthdr *pkthdr, const u_char *packet) { + capture_thread_args_t *args = (capture_thread_args_t *)user_data; + + struct ether_header *eth_header = (struct ether_header *)packet; + if (ntohs(eth_header->ether_type) != ETHERTYPE_ARP) + return; + + struct ether_arp *arp_packet = (struct ether_arp *)(packet + sizeof(struct ether_header)); + if (ntohs(arp_packet->ea_hdr.ar_op) != ARPOP_REPLY) + return; + + // Check if this is the response we're looking for + if (memcmp(arp_packet->arp_spa, &args->target_ip.s_addr, 4) == 0) { + memcpy(args->response->mac_addr, arp_packet->arp_sha, 6); + inet_ntop(AF_INET, arp_packet->arp_spa, args->response->ip_addr, 16); + args->response->found = 1; + args->finished = 1; + pcap_breakloop(args->handle); + } +} + +// Packet capture thread function +static void *capture_thread(void *arg) { + capture_thread_args_t *args = (capture_thread_args_t *)arg; + pcap_loop(args->handle, -1, packet_handler, (u_char *)args); + return NULL; +} + +static PyObject *perform_arp_scan(PyObject *self, PyObject *args) { + char *iface, *src_ip_str, *dst_ip_str; + int timeout_ms = 1000; // Default timeout 1 second + + if (!PyArg_ParseTuple(args, "sss|i", &iface, &src_ip_str, &dst_ip_str, &timeout_ms)) { + return NULL; + } + + // Get interface MAC address + unsigned char src_mac[6]; + if (get_mac_address(iface, src_mac) < 0) { + PyErr_SetString(PyExc_RuntimeError, "Failed to get MAC address"); + return NULL; + } + + // Convert IP addresses + struct in_addr src_ip, dst_ip; + if (inet_aton(src_ip_str, &src_ip) == 0 || inet_aton(dst_ip_str, &dst_ip) == 0) { + PyErr_SetString(PyExc_ValueError, "Invalid IP address"); + return NULL; + } + + // Open pcap handle + char errbuf[PCAP_ERRBUF_SIZE]; + pcap_t *handle = pcap_open_live(iface, 65536, 1, timeout_ms, errbuf); + if (!handle) { + PyErr_SetString(PyExc_RuntimeError, errbuf); + return NULL; + } + + // Set up packet capture filter for ARP + struct bpf_program fp; + char filter_exp[64]; + snprintf(filter_exp, sizeof(filter_exp), "arp src host %s", dst_ip_str); + if (pcap_compile(handle, &fp, filter_exp, 0, PCAP_NETMASK_UNKNOWN) == -1) { + pcap_close(handle); + PyErr_SetString(PyExc_RuntimeError, "Failed to compile filter"); + return NULL; + } + if (pcap_setfilter(handle, &fp) == -1) { + pcap_freecode(&fp); + pcap_close(handle); + PyErr_SetString(PyExc_RuntimeError, "Failed to set filter"); + return NULL; + } + pcap_freecode(&fp); + + // Prepare ARP request packet + unsigned char packet[42]; + memset(packet, 0, sizeof(packet)); + + // Fill Ethernet header + struct ether_header *eth_hdr = (struct ether_header *)packet; + memset(eth_hdr->ether_dhost, 0xff, 6); + memcpy(eth_hdr->ether_shost, src_mac, 6); + eth_hdr->ether_type = htons(ETHERTYPE_ARP); + + // Fill ARP header + struct ether_arp *arp_hdr = (struct ether_arp *)(packet + sizeof(struct ether_header)); + arp_hdr->ea_hdr.ar_hrd = htons(ARPHRD_ETHER); + arp_hdr->ea_hdr.ar_pro = htons(ETHERTYPE_IP); + arp_hdr->ea_hdr.ar_hln = 6; + arp_hdr->ea_hdr.ar_pln = 4; + arp_hdr->ea_hdr.ar_op = htons(ARPOP_REQUEST); + memcpy(arp_hdr->arp_sha, src_mac, 6); + memcpy(arp_hdr->arp_spa, &src_ip.s_addr, 4); + memset(arp_hdr->arp_tha, 0, 6); + memcpy(arp_hdr->arp_tpa, &dst_ip.s_addr, 4); + + // Set up response structure and capture thread + arp_response_t response = {0}; + capture_thread_args_t thread_args = { + .handle = handle, + .response = &response, + .target_ip = dst_ip, + .timeout_ms = timeout_ms, + .finished = 0 + }; + + // Start capture thread + pthread_t tid; + if (pthread_create(&tid, NULL, capture_thread, &thread_args) != 0) { + pcap_close(handle); + PyErr_SetString(PyExc_RuntimeError, "Failed to create capture thread"); + return NULL; + } + + // Send ARP request + if (pcap_sendpacket(handle, packet, sizeof(packet)) != 0) { + pthread_cancel(tid); + pcap_close(handle); + PyErr_SetString(PyExc_RuntimeError, "Failed to send ARP packet"); + return NULL; + } + + // Wait for response or timeout + while (!thread_args.finished && timeout_ms > 0) { + usleep(10000); // Sleep for 10ms + timeout_ms -= 10; + } + + // Clean up + pthread_cancel(tid); + pthread_join(tid, NULL); + pcap_close(handle); + + // Return results + if (response.found) { + char mac_str[18]; + mac_to_string(response.mac_addr, mac_str); + return Py_BuildValue("{s:s,s:s}", "ip", response.ip_addr, "mac", mac_str); + } + + Py_RETURN_NONE; +} + +// Module method definitions +static PyMethodDef ArpScannerMethods[] = { + {"perform_arp_scan", perform_arp_scan, METH_VARARGS, + "Perform an ARP scan with response handling. Args: interface, src_ip, target_ip, [timeout_ms]"}, + {NULL, NULL, 0, NULL} +}; + +static struct PyModuleDef arpscannermodule = { + PyModuleDef_HEAD_INIT, + "arpscanner", + NULL, + -1, + ArpScannerMethods +}; + +PyMODINIT_FUNC PyInit_arpscanner(void) { + return PyModule_Create(&arpscannermodule); +} \ No newline at end of file diff --git a/c_extension/setup.py b/c_extension/setup.py new file mode 100644 index 0000000..632d2e0 --- /dev/null +++ b/c_extension/setup.py @@ -0,0 +1,23 @@ +from setuptools import setup, Extension +import sysconfig +import os + +# For macOS, we need to explicitly include libpcap +extra_compile_args = [] +extra_link_args = [] + +if os.uname().sysname == 'Darwin': # macOS + extra_compile_args = ['-I/opt/homebrew/include'] + extra_link_args = ['-L/opt/homebrew/lib', '-lpcap'] + +module = Extension('arpscanner', + sources=['arpscanner.c'], + include_dirs=[sysconfig.get_path('include')], + extra_compile_args=extra_compile_args, + extra_link_args=extra_link_args) + +setup( + name='ArpScanner', + version='1.0', + ext_modules=[module] +) \ No newline at end of file diff --git a/core/arp_scanner.py b/core/arp_scanner.py index e26cea0..e476548 100644 --- a/core/arp_scanner.py +++ b/core/arp_scanner.py @@ -1,6 +1,7 @@ """ Module Arp Scanner """ +import ipaddress import netifaces from scapy.all import arping, ARP, get_if_addr # pylint: disable=E0611 from PySide6.QtWidgets import ( # pylint: disable=E0611 @@ -13,12 +14,18 @@ ) from PySide6.QtGui import QIcon, QFont, QColor # pylint: disable=E0611 from PySide6.QtCore import Slot, Qt, QTimer # pylint: disable=E0611 -from PyQt6.QtCore import QThread, pyqtSignal # pylint: disable=E0611 +from PySide6.QtCore import QThread, Signal # pylint: disable=E0611 from ui.ui_arpscan import Ui_DeviceDiscovery from core import vendor from core.platform import get_os import core.networking as net +try: + import arpscanner + NATIVE_ARP_AVAILABLE = True +except ImportError: + NATIVE_ARP_AVAILABLE = False + class DeviceDetailsWindow(QMainWindow): # pylint: disable=too-few-public-methods """ @@ -173,11 +180,15 @@ def start_scan(self): # Create and start a new ARP scan thread self.arp_scanner_thread = ARPScannerThread(self.interface, self.mac_vendor_lookup) - # Connect signals to handle thread completion and verbose output + self.arp_scanner_thread.partialResults.connect(self.handle_partial_results) self.arp_scanner_thread.finished.connect(self.handle_scan_results) - # Start the thread self.arp_scanner_thread.start() print("Started ARP scan.") + + @Slot(list) + def handle_partial_results(self, partial_results): + for ip_address, mac, hostname, device_vendor, packet in partial_results: + self.add_device_to_list(ip_address, mac, hostname, device_vendor) @Slot(list) def handle_scan_results(self, results): @@ -190,7 +201,6 @@ def handle_scan_results(self, results): packet_label = str(packet) self.add_packet_if_new(packet_label) - def add_device_to_list(self, ip_address, mac, hostname, device_vendor): """Adds a device to the list widget in the UI.""" label = f"{ip_address} {mac} {hostname}, {device_vendor}" @@ -219,38 +229,79 @@ def quit_application(self): self.arp_scanner_thread.wait() self.close() - -class ARPScannerThread(QThread): # pylint: disable=too-few-public-methods - """Executing arp scan in separate thread""" - finished = pyqtSignal(list) +class ARPScannerThread(QThread): + finished = Signal(list) # Final results + partialResults = Signal(list) # Intermediate results def __init__(self, interface, mac_vendor_lookup, timeout=1): super().__init__() self.interface = interface self.mac_vendor_lookup = mac_vendor_lookup self.timeout = timeout + self.is_macos = get_os() == 'mac' + self.use_native = self.is_macos and NATIVE_ARP_AVAILABLE + + def _scan_ip_native(self, src_ip, target_ip): + try: + result = arpscanner.perform_arp_scan( + self.interface, + str(src_ip), + str(target_ip), + int(self.timeout * 300) # 300ms timeout per scan + ) + return target_ip, result + except Exception as e: + print(f"Error scanning {target_ip}: {e}") + return target_ip, None + + def _create_arp_response(self, ip_addr, mac): + return type('ARPResponse', (), { + 'psrc': ip_addr, + 'hwsrc': mac, + '__str__': lambda self: f"ARP {self.psrc} is-at {self.hwsrc}" + })() def run(self): - "run the scan" - ip_address = get_if_addr(self.interface) + src_ip = get_if_addr(self.interface) try: netmask = netifaces.ifaddresses(self.interface)[netifaces.AF_INET][0]['netmask'] - network = net.calculate_network_cidr(ip_address, netmask) + network_cidr = net.calculate_network_cidr(src_ip, netmask) except KeyError: self.finished.emit([]) return arp_results = [] - try: - arp_packets = arping(network, timeout=self.timeout, verbose=1)[0] - except Exception as e: # pylint: disable=broad-exception-caught - print(f"Error during ARP scan: {e}") - self.finished.emit([]) - return - for packet in arp_packets: - ip_address = packet[1][ARP].psrc - mac = packet[1][ARP].hwsrc - device_vendor = self.mac_vendor_lookup.lookup_vendor(mac) - hostname = net.get_hostname(ip_address) - arp_results.append((ip_address, mac, hostname, device_vendor, packet[1][ARP])) - self.finished.emit(arp_results) + if self.use_native: + print("Using native ARP scanner") + network = ipaddress.IPv4Network(network_cidr) + count = 0 + for ip in network.hosts(): + if str(ip) == src_ip: + continue # Skip scanning our own IP + target_ip, result = self._scan_ip_native(src_ip, str(ip)) + if result: + device_vendor = self.mac_vendor_lookup.lookup_vendor(result['mac']) + hostname = net.get_hostname(target_ip) + arp_response = self._create_arp_response(target_ip, result['mac']) + arp_results.append((target_ip, result['mac'], hostname, device_vendor, arp_response)) + count += 1 + # Every 10 IPs (or any chosen interval), emit partial results + if count % 10 == 0: + self.partialResults.emit(arp_results) + self.finished.emit(arp_results) + else: + print("Using Scapy ARP scanner") + try: + arp_packets = arping(network_cidr, timeout=self.timeout, verbose=1)[0] + except Exception as e: + print(f"Error during ARP scan: {e}") + self.finished.emit([]) + return + + for packet in arp_packets: + ip_addr = packet[1][ARP].psrc + mac = packet[1][ARP].hwsrc + device_vendor = self.mac_vendor_lookup.lookup_vendor(mac) + hostname = net.get_hostname(ip_addr) + arp_results.append((ip_addr, mac, hostname, device_vendor, packet[1][ARP])) + self.finished.emit(arp_results)