from ultralytics import YOLO
import cv2
import BlynkLib
import time
from datetime import datetime
import requests
# Blynk Configuration
BLYNK_AUTH = "YOUR_BLYNK_AUTH_TOKEN" # Replace with your Blynk auth token
BLYNK_SERVER = "blynk.cloud"
# Initialize Blynk
blynk = BlynkLib.Blynk(BLYNK_AUTH)
# Virtual Pins
VPIN_ALERT = 0 # V0 - Alert text display
VPIN_DETECTION_LED = 1 # V1 - LED indicator
VPIN_LAST_SEEN = 2 # V2 - Last detection timestamp
VPIN_ANIMAL_COUNT = 3 # V3 - Number of animals detected
VPIN_IMAGE = 4 # V4 - Image widget for detection snapshots
# Load YOLO model
model = YOLO("yolov8n.pt")
# Animal classes to detect
animal_classes = {
0: "person",
14: "bird",
15: "cat",
16: "dog",
17: "horse",
18: "sheep",
19: "cow",
20: "elephant",
21: "bear"
}
animal_ids = list(animal_classes.keys())
# Initialize webcam (USB webcam on Raspberry Pi)
cap = cv2.VideoCapture(0)
# Set camera resolution (lower for better performance on Pi)
cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480)
print("Animal Detection System Started on Raspberry Pi")
print("Press Ctrl+C to exit")
last_alert_time = 0
alert_cooldown = 2 # Send Blynk alert every 2 seconds max
def send_image_to_blynk(frame):
"""Send captured image to Blynk when animal detected"""
try:
# Encode frame to JPEG with compression for faster upload
_, buffer = cv2.imencode('.jpg', frame, [cv2.IMWRITE_JPEG_QUALITY, 70])
# Save temporarily
filename = "/tmp/detection.jpg"
with open(filename, 'wb') as f:
f.write(buffer)
# Upload to Blynk
url = f"https://{BLYNK_SERVER}/external/api/upload/image"
with open(filename, 'rb') as img_file:
files = {'image': img_file}
data = {'token': BLYNK_AUTH, 'pin': f'V{VPIN_IMAGE}'}
response = requests.post(url, files=files, data=data, timeout=5)
if response.status_code == 200:
print(" ✓ Image uploaded to Blynk")
else:
print(f" ✗ Failed to upload image: {response.status_code}")
except Exception as e:
print(f" ✗ Error uploading image: {e}")
try:
while True:
# Run Blynk
blynk.run()
ret, frame = cap.read()
if not ret:
print("Failed to grab frame from webcam")
time.sleep(1)
continue
# Run YOLO detection
results = model(frame, classes=animal_ids, verbose=False)
intruder_detected = False
detected_labels = []
# Process detections
for box in results[0].boxes:
cls_id = int(box.cls[0])
if cls_id in animal_classes:
intruder_detected = True
detected_labels.append(animal_classes[cls_id])
# Handle detection alerts
current_time = time.time()
if intruder_detected:
detected_labels = list(set(detected_labels))
alert_text = "INTRUDER: " + ", ".join(detected_labels)
animal_count = len(detected_labels)
# Send to Blynk (with cooldown to avoid spam)
if current_time - last_alert_time > alert_cooldown:
blynk.virtual_write(VPIN_ALERT, alert_text)
blynk.virtual_write(VPIN_DETECTION_LED, 255) # Turn on LED
blynk.virtual_write(VPIN_ANIMAL_COUNT, animal_count)
blynk.virtual_write(VPIN_LAST_SEEN, datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
blynk.log_event("animal_detected", alert_text) # Create "animal_detected" event in Blynk
# Send snapshot to Blynk
send_image_to_blynk(frame)
last_alert_time = current_time
print(f"[{datetime.now().strftime('%H:%M:%S')}] {alert_text}")
else:
# No detection - turn off LED
blynk.virtual_write(VPIN_DETECTION_LED, 0)
blynk.virtual_write(VPIN_ALERT, "No intruders")
blynk.virtual_write(VPIN_ANIMAL_COUNT, 0)
# Small delay to reduce CPU usage
time.sleep(0.1)
except KeyboardInterrupt:
print("\nShutting down...")
finally:
cap.release()
print("Camera released. Exiting.")
Comments
Post a Comment