forked from darpan-b/ebpf
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathblock_chatgpt7.py
More file actions
150 lines (122 loc) · 3.9 KB
/
block_chatgpt7.py
File metadata and controls
150 lines (122 loc) · 3.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
#!/usr/bin/python3
from bcc import BPF
import socket
import struct
import ctypes
import time
import sys
# --- CONFIGURATION ---
device = "wlo1"
target_domain = "chatgpt.com"
# --- Kernel Code (C) ---
program = """
#include <uapi/linux/bpf.h>
#include <linux/if_ether.h>
#include <linux/ip.h>
#include <linux/in.h>
#include <uapi/linux/ipv6.h>
// Map 1: IPv4 (4 bytes)
BPF_HASH(blacklist_v4, u32);
// Map 2: IPv6 (16 bytes)
struct ipv6_key_t {
unsigned char addr[16];
};
BPF_HASH(blacklist_v6, struct ipv6_key_t);
int block_ingress(struct xdp_md *ctx) {
void *data = (void *)(long)ctx->data;
void *data_end = (void *)(long)ctx->data_end;
struct ethhdr *eth = data;
if (data + sizeof(*eth) > data_end) return XDP_PASS;
// --- BRANCH 1: IPv4 ---
if (eth->h_proto == htons(ETH_P_IP)) {
struct iphdr *ip = data + sizeof(*eth);
if (data + sizeof(*eth) + sizeof(*ip) > data_end) return XDP_PASS;
u32 src_ip = ip->saddr;
u64 *rule_exists = blacklist_v4.lookup(&src_ip);
if (rule_exists) {
bpf_trace_printk("BLOCKED IPv4 source: %x\\n", src_ip);
return XDP_DROP;
}
}
// --- BRANCH 2: IPv6 ---
else if (eth->h_proto == htons(ETH_P_IPV6)) {
struct ipv6hdr *ip6 = data + sizeof(*eth);
if (data + sizeof(*eth) + sizeof(struct ipv6hdr) > data_end) return XDP_PASS;
struct ipv6_key_t key = {};
__builtin_memcpy(key.addr, &ip6->saddr, 16);
u64 *rule_exists = blacklist_v6.lookup(&key);
if (rule_exists) {
bpf_trace_printk("BLOCKED IPv6 source!\\n");
return XDP_DROP;
}
}
return XDP_PASS;
}
"""
# --- User Space (Python) ---
print(f"Loading eBPF program on {device}...")
try:
b = BPF(text=program)
except Exception as e:
print(f"Compilation Error: {e}")
sys.exit(1)
fn = b.load_func("block_ingress", BPF.XDP)
print("Attaching in Generic/SKB Mode...")
try:
b.attach_xdp(device, fn, flags=BPF.XDP_FLAGS_SKB_MODE)
except Exception as e:
print(f"Error attaching XDP: {e}")
sys.exit(1)
blacklist_v4 = b.get_table("blacklist_v4")
blacklist_v6 = b.get_table("blacklist_v6")
def ip_to_native(ip_str):
packed = socket.inet_aton(ip_str)
return struct.unpack("I", packed)[0]
# --- THE HARVESTER ---
# We collect unique IPs by querying DNS multiple times
unique_ips = set()
# 1. Add known manual overrides (The specific one that leaked earlier)
unique_ips.add("2606:4700:8d92:7cbc:c2d9:bd:5ff2:75c4")
print(f"Harvesting IPs for {target_domain} (this may take a moment)...")
# Loop 20 times to catch Round-Robin IPs
for i in range(20):
try:
# socket.AF_UNSPEC gets both IPv4 and IPv6
infos = socket.getaddrinfo(target_domain, None, socket.AF_UNSPEC)
for info in infos:
ip = info[4][0]
unique_ips.add(ip)
except:
pass
time.sleep(0.1) # Be nice to the DNS server
print(f"-> Found {len(unique_ips)} unique IPs.")
# Add all found IPs to the maps
count_v4 = 0
count_v6 = 0
for ip_addr in unique_ips:
# Basic check to see if it's IPv4 or IPv6
if '.' in ip_addr:
# IPv4
try:
blacklist_v4[blacklist_v4.Key(ip_to_native(ip_addr))] = blacklist_v4.Leaf(1)
count_v4 += 1
except: pass
elif ':' in ip_addr:
# IPv6
try:
ipv6_bytes = socket.inet_pton(socket.AF_INET6, ip_addr)
k = blacklist_v6.Key()
ctypes.memmove(ctypes.addressof(k), ipv6_bytes, 16)
blacklist_v6[k] = blacklist_v6.Leaf(1)
count_v6 += 1
except: pass
print(f"-> Added {count_v4} IPv4 addresses and {count_v6} IPv6 addresses to the firewall.")
print("\nRunning! Logs: sudo cat /sys/kernel/debug/tracing/trace_pipe")
print("Press Ctrl+C to stop.")
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
print("\nDetaching...")
b.remove_xdp(device, 0)
print("Done.")