forked from darpan-b/ebpf
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathblock_chatgpt5.py
More file actions
95 lines (76 loc) · 2.42 KB
/
block_chatgpt5.py
File metadata and controls
95 lines (76 loc) · 2.42 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
#!/usr/bin/python3
from bcc import BPF
import socket
import struct
import time
import sys
# --- CONFIGURATION ---
device = "wlo1"
target_domain = "chatgpt.com"
# --- Kernel Code (C) ---
program = """
#include <uapi/linux/bpf.h>
#include <linux/if_ether.h>
#include <linux/ip.h>
#include <linux/in.h>
BPF_HASH(blacklist, u32);
int block_ingress(struct xdp_md *ctx) {
void *data = (void *)(long)ctx->data;
void *data_end = (void *)(long)ctx->data_end;
// 1. Parse Ethernet
struct ethhdr *eth = data;
if (data + sizeof(*eth) > data_end) return XDP_PASS;
// 2. Check for IPv4
if (eth->h_proto != htons(ETH_P_IP)) return XDP_PASS;
// 3. Parse IP
struct iphdr *ip = data + sizeof(*eth);
if (data + sizeof(*eth) + sizeof(*ip) > data_end) return XDP_PASS;
// 4. Get Source IP (The raw bits as they sit in memory)
u32 src_ip = ip->saddr;
// 5. Check the Blacklist
u64 *rule_exists = blacklist.lookup(&src_ip);
if (rule_exists) {
bpf_trace_printk("BLOCKED traffic from: %x\\n", src_ip);
return XDP_DROP;
}
return XDP_PASS;
}
"""
# --- User Space (Python) ---
print(f"Loading eBPF program on {device}...")
b = BPF(text=program)
fn = b.load_func("block_ingress", BPF.XDP)
print("Attaching in Generic/SKB Mode...")
try:
b.attach_xdp(device, fn, flags=BPF.XDP_FLAGS_SKB_MODE)
except Exception as e:
print(f"Error: {e}")
sys.exit(1)
blacklist = b.get_table("blacklist")
# *** THE FIX: Native Byte Order ***
# We use "I" (Native) instead of "!I" (Network) so Python formats
# the number exactly how the CPU reads it from the packet memory.
def ip_to_native(ip_str):
packed = socket.inet_aton(ip_str)
return struct.unpack("I", packed)[0]
# Resolve and Block
print(f"Resolving IPs for {target_domain}...")
infos = socket.getaddrinfo(target_domain, None)
blocked_ips = set()
for info in infos:
ip_addr = info[4][0]
if '.' in ip_addr: # IPv4 only
if ip_addr not in blocked_ips:
print(f" -> Adding {ip_addr} to blacklist")
blacklist[blacklist.Key(ip_to_native(ip_addr))] = blacklist.Leaf(1)
blocked_ips.add(ip_addr)
print("\nRunning! Incoming packets from these IPs will be dropped.")
print("Logs: sudo cat /sys/kernel/debug/tracing/trace_pipe")
print("Press Ctrl+C to stop.")
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
print("\nDetaching...")
b.remove_xdp(device, 0)
print("Done.")