-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathobject_detection.py
More file actions
197 lines (150 loc) · 6.61 KB
/
object_detection.py
File metadata and controls
197 lines (150 loc) · 6.61 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
import datetime
import numpy as np
import cv2
from ultralytics import YOLO
import sqlite3
from motion_detection import analyze_and_log_context
def load_model(model_path="models/yolov8n-pose.pt"):
"""Loads the YOLOv8-Pose model."""
return YOLO(model_path, verbose=False)
def process_frame(model, frame):
"""Runs YOLOv8-Pose on a frame and returns the processed frame."""
results = model(frame, show=True)
return results[0].plot()
def process_image(model, image_path):
"""Processes a single image."""
image = cv2.imread(image_path)
processed_image = process_frame(model, image)
cv2.imshow("YOLOv8-Pose Detection", processed_image)
cv2.waitKey(0)
cv2.destroyAllWindows()
def log_detected_objects(logged_ids, object_id, label, conf, context):
"""Logs detected objects with timestamp and adds context to a SQLite database."""
# Create or connect to the database
conn = sqlite3.connect("detection_log.db")
cursor = conn.cursor()
# Create table if it doesn't exist
cursor.execute('''
CREATE TABLE IF NOT EXISTS detections (
id INTEGER PRIMARY KEY AUTOINCREMENT,
object_id TEXT,
label TEXT,
confidence REAL,
context TEXT,
timestamp TEXT
)
''')
# Get the current timestamp
timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# Insert the detected object data into the table
cursor.execute('''
INSERT INTO detections (object_id, label, confidence, context, timestamp)
VALUES (?, ?, ?, ?, ?)
''', (object_id, label, conf, context, timestamp))
# Commit the transaction and close the connection
conn.commit()
conn.close()
# Update the logged_ids dictionary with the new context
logged_ids[object_id] = context
# Log to a .txt file as well
log_entry = f"{timestamp} | Object ID: {object_id} | Label: {label} | Confidence: {conf} | Context: {context}\n"
# Append the log entry to the text file
with open("detection_log.txt", "a") as file:
file.write(log_entry)
print(f"Logged: {log_entry}")
def process_video(model, video_path):
"""Processes a video file and logs unique detected objects."""
cap = cv2.VideoCapture(video_path)
logged_ids = set() # Track logged objects
tracker = {} # Track object positions
min_confidence = 0.70 # Confidence threshold
if not cap.isOpened():
print("Error: Could not open video file.")
return
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
results = model(frame) # Run YOLO model
new_tracker = {}
detected_objects = set()
for result in results:
for box in result.boxes:
conf = box.conf[0].item()
if conf < min_confidence:
continue
x1, y1, x2, y2 = map(int, box.xyxy[0])
label = model.names[int(box.cls[0])]
centroid = ((x1 + x2) // 2, (y1 + y2) // 2)
# Assign or update object ID
object_id = None
for existing_id, prev_centroid in tracker.items():
if np.linalg.norm(np.array(prev_centroid) - np.array(centroid)) < 50:
object_id = existing_id
break
if object_id is None:
object_id = len(tracker) + 1
new_tracker[object_id] = centroid
detected_objects.add((object_id, label, conf))
# Draw bounding box
cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
cv2.putText(frame, f"ID {object_id} - {label} {conf:.2f}", (x1, y1 - 10),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)
logged_ids = {}
# Log only new objects with context
context = "Video object detection in progress"
for obj in detected_objects:
object_id, label, conf = obj # Unpack tuple
log_detected_objects(logged_ids, object_id, label, conf, context)
tracker = new_tracker # Update tracker with new frame info
cv2.imshow("YOLOv8-Pose Detection", frame)
if cv2.waitKey(1) & 0xFF == ord("q"):
break
cap.release()
cv2.destroyAllWindows()
global_tracker = {} # Ensure tracking persists across frames
def process_webcam(model):
global global_tracker # Use persistent tracking dictionary
cap = cv2.VideoCapture(0)
logged_ids = set()
next_id = 0
min_confidence = 0.70
if not cap.isOpened():
print("Error: Could not open webcam.")
return
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
results = model(frame, show=True)
new_tracker = {}
detected_objects = set()
for result in results:
for box in result.boxes:
conf = box.conf[0].item()
if conf < min_confidence:
continue
x1, y1, x2, y2 = map(int, box.xyxy[0])
label = model.names[int(box.cls[0])]
centroid = ((x1 + x2) // 2, (y1 + y2) // 2)
object_id = None
for existing_id, prev_centroid in global_tracker.items():
if np.linalg.norm(np.array(prev_centroid) - np.array(centroid)) < 80:
object_id = existing_id
break
if object_id is None:
object_id = next_id
next_id += 1
new_tracker[object_id] = centroid
detected_objects.add((object_id, label, conf))
logged_ids = {} # Dictionary to track the last logged context for each object
# Call the function with debug prints
for obj in detected_objects:
object_id, label, conf = obj
analyze_and_log_context(global_tracker, new_tracker, logged_ids, object_id, label, conf, log_detected_objects)
global_tracker = new_tracker # Update global tracker
cv2.imshow("Webcam Detection", frame)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
cap.release()
cv2.destroyAllWindows()