forked from darpan-b/ebpf
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathblock_chatgpt3.py
More file actions
87 lines (67 loc) · 2.16 KB
/
block_chatgpt3.py
File metadata and controls
87 lines (67 loc) · 2.16 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
#!/usr/bin/python3
from bcc import BPF
import socket
import struct
import time
import sys
# --- CONFIGURATION ---
device = "wlo1"
target_domain = "example.com" # Let's use example.com first to prove it works
# --- 1. The Kernel Code (C) ---
program = """
#include <uapi/linux/bpf.h>
#include <linux/if_ether.h>
#include <linux/ip.h>
#include <linux/in.h>
BPF_HASH(blacklist, u32);
int block_ingress(struct xdp_md *ctx) {
void *data = (void *)(long)ctx->data;
void *data_end = (void *)(long)ctx->data_end;
// 1. Parse Ethernet
struct ethhdr *eth = data;
if (data + sizeof(*eth) > data_end) return XDP_PASS;
// 2. Check for IPv4
if (eth->h_proto != htons(ETH_P_IP)) return XDP_PASS;
// 3. Parse IP
struct iphdr *ip = data + sizeof(*eth);
if (data + sizeof(*eth) + sizeof(*ip) > data_end) return XDP_PASS;
// *** THE FIX: Check SOURCE Address (Who sent this?) ***
u32 src_ip = ip->saddr;
// Lookup in blacklist
u64 *rule_exists = blacklist.lookup(&src_ip);
if (rule_exists) {
// We found a match!
bpf_trace_printk("BLOCKED packet from IP: %x\\n", src_ip);
return XDP_DROP;
}
return XDP_PASS;
}
"""
# --- 2. User Space Logic (Python) ---
print(f"Compiling eBPF program...")
b = BPF(text=program)
fn = b.load_func("block_ingress", BPF.XDP)
print(f"Attaching to {device} (Generic Mode)...")
try:
b.attach_xdp(device, fn, flags=BPF.XDP_FLAGS_SKB_MODE)
except Exception as e:
print(f"Error: {e}")
sys.exit(1)
blacklist = b.get_table("blacklist")
def ip_to_int(ip_str):
return struct.unpack("!I", socket.inet_aton(ip_str))[0]
# Add the target to the map
print(f"Resolving {target_domain}...")
target_ip = socket.gethostbyname(target_domain)
print(f"Blocking traffic FROM: {target_ip}")
blacklist[blacklist.Key(ip_to_int(target_ip))] = blacklist.Leaf(1)
print("Running! Checking for INCOMING packets from target.")
print("Open another terminal and run: ping -4 example.com")
print("Check logs: sudo cat /sys/kernel/debug/tracing/trace_pipe")
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
print("\nDetaching...")
b.remove_xdp(device, 0)
print("Done.")