Nous allons voir dans ce tutoriel comment obtenir des streams vidéo synchronisés avec Python et OpenCV. Une des problématiques du streaming vidéo est d’émettre et d’acquérir des signaux vidéo de qualité et si possible avec le moins de délai possible. La capacité de synchroniser des flux vidéo est de pouvoir traité leurs données simultanément, comme pour la reconnaissance d’objets.
Pré-requis: Streaming vidéo entre deux machines
Matériel
- Ordinateur avec Python et OpenCV
- 2 sources vidéo (fichier, stream, webcam, etc.)
- Une connexion internet ou ethernet
Pour ce tutoriel, j’utilise deux Orange Pi Zero, qui génère les streams vidéo à partir de caméras USB (ArduCam), connectés sur le réseau de l’ordinateur via un switch Ethernet.
Émission des streams
Pour créer le stream à partir du flux vidéo de la caméra, nous utilisons FFMPEG. Nous utilisons le protocole UDP dans lequel nous spécifions l’adresse IP de l’ordinateur et le port sur lequel se trouve le streaming.
Flux vidéo 0
ffmpeg -video_size 640x480 -i /dev/video0 -f mpegts udp://{ip_address}:8553?pkt_size=1316
Flux vidéo 1
ffmpeg -video_size 640x480 -i /dev/video0 -f mpegts udp://{ip_address}:8554?pkt_size=1316
Pour tester le flux vidéo, vous pouvez utiliser la commande ffplay
ffplay upd://127.0.0.1:8553 #video streaming 0
ffplay upd://127.0.0.1:8554 #video streaming 1
Dans le tutoriel, nous utiliserons le filtre drawtext qui permet de rajouter du texte sur la vidéo. Cela nous permet d’afficher l’heure et d’observer facilement le délai.
-vf "drawtext=fontfile=/usr/share/fonts/truetype/ttf-dejavu/DejaVuSans-Bold.tff: text='%{{localtime\:%T}}': fontsize=24: fontcolor=white@0.8: x=7: y=7"
N.B.: il est possible de mettre la commande ffmpeg dans un script Python
Lancement des commandes via SSH
Pour des raisons de simplicité, nous lançons les commandes ffmpeg à partir du script Python via SSH. Pour cela, nous utilisons la librairie paramiko
import socket import paramiko #computer ip address hostname = socket.gethostname() ip_address = socket.gethostbyname(hostname) print(f"IP Address: {ip_address}") #ip_address= "192.168.1.70" ssh0 = paramiko.SSHClient() ssh0.set_missing_host_key_policy(paramiko.AutoAddPolicy()) ssh1 = paramiko.SSHClient() ssh1.set_missing_host_key_policy(paramiko.AutoAddPolicy()) ssh0.connect("192.168.1.32", username="root", password="root") ssh1.connect("192.168.1.33", username="root", password="root") #stream_cmd0 = "python3 video-stream.py {}:{}?pkt_size=1316".format(ip_address,8553) #stream_cmd1 = "python3 video-stream.py {}:{}?pkt_size=1316".format(ip_address,8554) stream_cmd0="""ffmpeg -video_size 640x480 -i /dev/video0 -vf "drawtext=fontfile=/usr/share/fonts/truetype/ttf-dejavu/DejaVuSans-Bold.tff: text='%{{localtime\:%T}}': fontsize=24: fontcolor=red@0.8: x=7: y=7" -f mpegts udp://{}:{}?pkt_size=1316""".format(ip_address,8553) stream_cmd1="""ffmpeg -video_size 640x480 -i /dev/video0 -vf "drawtext=fontfile=/usr/share/fonts/truetype/ttf-dejavu/DejaVuSans-Bold.tff: text='%{{localtime\:%T}}': fontsize=24: fontcolor=white@0.8: x=7: y=7" -f mpegts udp://{}:{}?pkt_size=1316""".format(ip_address,8554) ssh1_stdin, ssh1_stdout, ssh1_stderr = ssh1.exec_command(stream_cmd1) ssh0_stdin, ssh0_stdout, ssh0_stderr = ssh0.exec_command(stream_cmd0)
N.B.: ne pas oublié de fermer la connexion ssh en fin de programme ssh0.close(), ssh1.close()
Réception des streams non-synchronisés
Pour réceptionner les flux vidéo, nous utilisons OpenCV avec Python.
Nous ouvrons d’abord les deux streams vidéo, que nous lisons dans une boucle tant qu’ils sont ouvert
cap0 = cv2.VideoCapture("udp://127.0.0.1:8553")
cap1 = cv2.VideoCapture("udp://127.0.0.1:8554")
Pour des raisons pratique, nous concaténons l’image dans une même fenêtre en nous assurant qu’elles ont des dimensions compatibles
frame0 =cv2.resize(frame0, (640,480))
frame1 =cv2.resize(frame1, (640,480))
Hori = np.concatenate((frame0, frame1), axis=1)
Enfin nous affichons l’heure locale de l’ordinateur pour comparaisons
Hori = cv2.putText(Hori, date_time,(10, 100),font, 1,(210, 155, 155), 4, cv2.LINE_4)
Voici le code complet pour la capture des streams vidéo non-synchronisés
N.B.: Ce code fonctionne une fois que les commandes ffmpeg ont été lancé sur chaque machine
#!/usr/bin/env python # -*- coding: utf-8 -*- import numpy as np import cv2 import datetime #<add code to run ffmpeg command via ssh> cap0 = cv2.VideoCapture("udp://127.0.0.1:8553") cap1 = cv2.VideoCapture("udp://127.0.0.1:8554") while cap0.isOpened() and cap1.isOpened(): # Capture frame-by-frame ret0, frame0 = cap0.read() ret1, frame1 = cap1.read() # Get current date and time now=datetime.datetime.now() date_time = now.strftime("%H:%M:%S") font = cv2.FONT_HERSHEY_SIMPLEX # write the date time in the video frame #frame0 = cv2.putText(frame0, date_time,(10, 100),font, 1,(210, 155, 155), 4, cv2.LINE_4) #frame1 = cv2.putText(frame1, date_time,(10, 100),font, 1,(210, 155, 155), 4, cv2.LINE_4) #if (ret0): # # Display the resulting frame # cv2.imshow('Cam 0', frame0) #if (ret1): # # Display the resulting frame # cv2.imshow('Cam 1', frame1) if (ret0 and ret1): frame0 =cv2.resize(frame0, (640,480)) frame1 =cv2.resize(frame1, (640,480)) Hori = np.concatenate((frame0, frame1), axis=1) Hori = cv2.putText(Hori, date_time,(10, 100),font, 1,(210, 155, 155), 4, cv2.LINE_4) cv2.imshow('Cam 0&1', Hori) if cv2.waitKey(1) & 0xFF == ord('q'): break #release captures cap0.release() cap1.release() cv2.destroyAllWindows()
Nous observons un retard d’une seconde entre les deux flux vidéo. Ceci n’est pas acceptable si nous souhaitons traité les images de manière synchronisée.
Capture de streams avec Multithreading
La solution la plus simple est de dédier des threads à la capture des images. Pour cela nous utilisons le paquet threading. Nous allons créer une classe VideoStream qui va gérer la lecture du stream dans son propre thread
class VideoStream: def __init__(self, src=0): self.cap = cv2.VideoCapture(src) self.ret, self.frame = self.cap.read() self.started = False self.read_lock = Lock() def start(self): if self.started: return None self.started = True self.thread = Thread(target=self.update, args=()) self.thread.start() return self def update(self): while self.started: ret,frame = self.cap.read() self.read_lock.acquire() self.ret, self.frame = ret,frame self.read_lock.release() def isOpened(self): return self.cap.isOpened() def read(self): self.read_lock.acquire() ret = self.ret frame = self.frame.copy() self.read_lock.release() return ret, frame def release(self): self.started = False self.thread.join() def __exit__(self, exc_type, exc_value, traceback): self.cap.release()
Nous pouvons ensuite instancier nos deux objets cap0 et cap1
cap0 = VideoStream("udp://127.0.0.1:8553").start() cap1 = VideoStream("udp://127.0.0.1:8554").start() while cap0.isOpened() and cap1.isOpened(): ret0, frame0 = cap0.read() ret1, frame1 = cap1.read() # Get current date and time #date_time = str(datetime.datetime.now()) now=datetime.datetime.now() date_time = now.strftime("%H:%M:%S") font = cv2.FONT_HERSHEY_SIMPLEX if ret0 and ret1: frame0 =cv2.resize(frame0, (640,480)) frame1 =cv2.resize(frame1, (640,480)) Hori = np.concatenate((frame0, frame1), axis=1) Hori = cv2.putText(Hori, date_time,(10, 100),font, 1,(210, 155, 155), 4, cv2.LINE_4) cv2.imshow('Cam 0&1', Hori) if cv2.waitKey(1) & 0xFF == ord('q'): break #release capture cap0.release() cap1.release() cv2.destroyAllWindows()
Les deux flux vidéos sont maintenant synchronisés et il est possible de les enregistrer dans un fichier vidéo ou de faire du traitement d’image sur les deux streams en même temps.
Code complet la réception de streams vidéo synchronisés
#!/usr/bin/env python # -*- coding: utf-8 -*- # pip install opencv-python # pip install paramiko import numpy as np import cv2 import paramiko import time import datetime from threading import Thread, Lock #computer ip address import socket hostname = socket.gethostname() ip_address = socket.gethostbyname(hostname) print(f"IP Address: {ip_address}") #ip_address= "192.168.1.70" ssh0 = paramiko.SSHClient() ssh0.set_missing_host_key_policy(paramiko.AutoAddPolicy()) ssh0.connect("192.168.1.32", username="root", password="root") ssh1 = paramiko.SSHClient() ssh1.set_missing_host_key_policy(paramiko.AutoAddPolicy()) ssh1.connect("192.168.1.33", username="root", password="root") #python #stream_cmd0 = "python3 video-stream.py {}:{}?pkt_size=1316".format(ip_address,8553) #stream_cmd1 = "python3 video-stream.py {}:{}?pkt_size=1316".format(ip_address,8554) #ffmpeg stream_cmd0="""ffmpeg -video_size 640x480 -i /dev/video0 -vf "drawtext=fontfile=/usr/share/fonts/truetype/ttf-dejavu/DejaVuSans-Bold.tff: text='%{{localtime\:%T}}': fontsize=24: fontcolor=red@0.8: x=7: y=7" -f mpegts udp://{}:{}?pkt_size=1316""".format(ip_address,8553) stream_cmd1="""ffmpeg -video_size 640x480 -i /dev/video0 -vf "drawtext=fontfile=/usr/share/fonts/truetype/ttf-dejavu/DejaVuSans-Bold.tff: text='%{{localtime\:%T}}': fontsize=24: fontcolor=white@0.8: x=7: y=7" -f mpegts udp://{}:{}?pkt_size=1316""".format(ip_address,8554) #manual #ffmpeg -video_size 640x480 -i /dev/video0 -f mpegts udp://192.168.1.70:8553?pkt_size=1316 #ffmpeg -video_size 640x480 -i /dev/video0 -f mpegts udp://192.168.1.70:8554?pkt_size=1316 ssh1_stdin, ssh1_stdout, ssh1_stderr = ssh1.exec_command(stream_cmd1) ssh0_stdin, ssh0_stdout, ssh0_stderr = ssh0.exec_command(stream_cmd0) #print(ssh0_stdout.read().decode()) #print(ssh1_stdout.read().decode()) class VideoStream: def __init__(self, src=0): self.cap = cv2.VideoCapture(src) self.ret, self.frame = self.cap.read() self.started = False self.read_lock = Lock() def start(self): if self.started: return None self.started = True self.thread = Thread(target=self.update, args=()) self.thread.start() return self def update(self): while self.started: ret,frame = self.cap.read() self.read_lock.acquire() self.ret, self.frame = ret,frame self.read_lock.release() def isOpened(self): return self.cap.isOpened() def read(self): self.read_lock.acquire() ret = self.ret frame = self.frame.copy() self.read_lock.release() return ret, frame def release(self): self.started = False self.thread.join() def __exit__(self, exc_type, exc_value, traceback): self.cap.release() print("waiting for response...") #cap0 = cv2.VideoCapture("udp://127.0.0.1:8553") #cap1 = cv2.VideoCapture("udp://127.0.0.1:8554") cap0 = VideoStream("udp://127.0.0.1:8553").start() cap1 = VideoStream("udp://127.0.0.1:8554").start() while cap0.isOpened() and cap1.isOpened(): ret0, frame0 = cap0.read() ret1, frame1 = cap1.read() # Get current date and time #date_time = str(datetime.datetime.now()) now=datetime.datetime.now() date_time = now.strftime("%H:%M:%S") font = cv2.FONT_HERSHEY_SIMPLEX if ret0 and ret1: frame0 =cv2.resize(frame0, (640,480)) frame1 =cv2.resize(frame1, (640,480)) Hori = np.concatenate((frame0, frame1), axis=1) Hori = cv2.putText(Hori, date_time,(10, 100),font, 1,(210, 155, 155), 4, cv2.LINE_4) cv2.imshow('Cam 0&1', Hori) if cv2.waitKey(1) & 0xFF == ord('q'): break #release capture cap0.release() cap1.release() cv2.destroyAllWindows() #kill ffmpeg ssh1_stdin, ssh1_stdout, ssh1_stderr = ssh1.exec_command("killall ffmpeg") ssh0_stdin, ssh0_stdout, ssh0_stderr = ssh0.exec_command("killall ffmpeg") #close ssh ssh0.close() ssh1.close()
Applications
- Réseau de caméra de surveillance CCTV
- Reconnaissance d’objet sur des streams vidéos synchronisés