En este tutorial veremos cómo obtener secuencias de vídeo sincronizadas utilizando Python y OpenCV. Uno de los problemas del streaming de vídeo es transmitir y adquirir señales de vídeo de alta calidad con el menor retardo posible. La capacidad de sincronizar flujos de vídeo es poder procesar sus datos simultáneamente, como en el reconocimiento de objetos.
Requisitos previos: transmisión de vídeo entre dos máquinas
Hardware
- Informática con Python y OpenCV
- 2 fuentes de vídeo (archivo, stream, webcam, etc.)
- Conexión a Internet o Ethernet
Para este tutorial, estoy usando dos Orange Pi Zero, que genera secuencias de vídeo de cámaras USB (ArduCam), conectado a la red informática a través de un conmutador Ethernet.
Streaming
Para crear el flujo a partir del flujo de vídeo de la cámara, utilizamos FFMPEG. Utilizamos el protocolo UDP en el que especificamos la dirección IP del ordenador y el puerto en el que se encuentra el flujo.
Flujo de vídeo 0
ffmpeg -video_size 640x480 -i /dev/video0 -f mpegts udp://{ip_address}:8553?pkt_size=1316
Flujo de vídeo 1
ffmpeg -video_size 640x480 -i /dev/video0 -f mpegts udp://{ip_address}:8554?pkt_size=1316
Para probar el flujo de vídeo, puede utilizar el comando ffplay
ffplay upd://127.0.0.1:8553 #video streaming 0 ffplay upd://127.0.0.1:8554 #video streaming 1
En el tutorial, utilizaremos el filtro drawtext, que permite añadir texto al vídeo. Esto nos permite mostrar la hora y observar fácilmente el retraso.
-vf "drawtext=fontfile=/usr/share/fonts/truetype/ttf-dejavu/DejaVuSans-Bold.tff: text='%{{localtime\:%T}}': fontsize=24: fontcolor=white@0.8: x=7: y=7"
N.B.: es posible poner el comando ffmpeg en un script de Python
Lanzamiento de comandos a través de SSH
Para simplificar, ejecutaremos los comandos de ffmpeg desde el script de Python a través de SSH. Para ello, utilizaremos la librería paramiko
import socket import paramiko #computer ip address hostname = socket.gethostname() ip_address = socket.gethostbyname(hostname) print(f"IP Address: {ip_address}") #ip_address= "192.168.1.70" ssh0 = paramiko.SSHClient() ssh0.set_missing_host_key_policy(paramiko.AutoAddPolicy()) ssh1 = paramiko.SSHClient() ssh1.set_missing_host_key_policy(paramiko.AutoAddPolicy()) ssh0.connect("192.168.1.32", username="root", password="root") ssh1.connect("192.168.1.33", username="root", password="root") #stream_cmd0 = "python3 video-stream.py {}:{}?pkt_size=1316".format(ip_address,8553) #stream_cmd1 = "python3 video-stream.py {}:{}?pkt_size=1316".format(ip_address,8554) stream_cmd0="""ffmpeg -video_size 640x480 -i /dev/video0 -vf "drawtext=fontfile=/usr/share/fonts/truetype/ttf-dejavu/DejaVuSans-Bold.tff: text='%{{localtime\:%T}}': fontsize=24: fontcolor=red@0.8: x=7: y=7" -f mpegts udp://{}:{}?pkt_size=1316""".format(ip_address,8553) stream_cmd1="""ffmpeg -video_size 640x480 -i /dev/video0 -vf "drawtext=fontfile=/usr/share/fonts/truetype/ttf-dejavu/DejaVuSans-Bold.tff: text='%{{localtime\:%T}}': fontsize=24: fontcolor=white@0.8: x=7: y=7" -f mpegts udp://{}:{}?pkt_size=1316""".format(ip_address,8554) ssh1_stdin, ssh1_stdout, ssh1_stderr = ssh1.exec_command(stream_cmd1) ssh0_stdin, ssh0_stdout, ssh0_stderr = ssh0.exec_command(stream_cmd0)
N.B.: no olvides cerrar la conexión ssh al final del programa ssh0.close(), ssh1.close()
Recepción de flujos no sincronizados
Para recibir los flujos de vídeo, utilizamos OpenCV con Python.
Primero abrimos los dos flujos de vídeo, que reproducimos en bucle mientras estén abiertos
cap0 = cv2.VideoCapture("udp://127.0.0.1:8553") cap1 = cv2.VideoCapture("udp://127.0.0.1:8554")
Por razones prácticas, concatenamos la imagen en la misma ventana, asegurándonos de que tienen dimensiones compatibles
frame0 =cv2.resize(frame0, (640,480)) frame1 =cv2.resize(frame1, (640,480)) Hori = np.concatenate((frame0, frame1), axis=1)
Por último, mostramos la hora local del ordenador a efectos comparativos
Hori = cv2.putText(Hori, date_time,(10, 100),font, 1,(210, 155, 155), 4, cv2.LINE_4)
Este es el código completo para capturar secuencias de vídeo no sincronizadas
N.B.: Este código funciona una vez que se han ejecutado los comandos ffmpeg en cada máquina.
#!/usr/bin/env python # -*- coding: utf-8 -*- import numpy as np import cv2 import datetime #<add code to run ffmpeg command via ssh> cap0 = cv2.VideoCapture("udp://127.0.0.1:8553") cap1 = cv2.VideoCapture("udp://127.0.0.1:8554") while cap0.isOpened() and cap1.isOpened(): # Capture frame-by-frame ret0, frame0 = cap0.read() ret1, frame1 = cap1.read() # Get current date and time now=datetime.datetime.now() date_time = now.strftime("%H:%M:%S") font = cv2.FONT_HERSHEY_SIMPLEX # write the date time in the video frame #frame0 = cv2.putText(frame0, date_time,(10, 100),font, 1,(210, 155, 155), 4, cv2.LINE_4) #frame1 = cv2.putText(frame1, date_time,(10, 100),font, 1,(210, 155, 155), 4, cv2.LINE_4) #if (ret0): # # Display the resulting frame # cv2.imshow('Cam 0', frame0) #if (ret1): # # Display the resulting frame # cv2.imshow('Cam 1', frame1) if (ret0 and ret1): frame0 =cv2.resize(frame0, (640,480)) frame1 =cv2.resize(frame1, (640,480)) Hori = np.concatenate((frame0, frame1), axis=1) Hori = cv2.putText(Hori, date_time,(10, 100),font, 1,(210, 155, 155), 4, cv2.LINE_4) cv2.imshow('Cam 0&1', Hori) if cv2.waitKey(1) & 0xFF == ord('q'): break #release captures cap0.release() cap1.release() cv2.destroyAllWindows()
Hay un retraso de un segundo entre los dos flujos de vídeo. Esto no es aceptable si queremos procesar las imágenes de forma sincrónica.
Captura de secuencias con multihilo
La solución más sencilla es dedicar hilos a la captura de imágenes. Para ello, utilizaremos el paquete threading. Vamos a crear una clase VideoStream que gestionará la reproducción del flujo en su propio hilo
class VideoStream: def __init__(self, src=0): self.cap = cv2.VideoCapture(src) self.ret, self.frame = self.cap.read() self.started = False self.read_lock = Lock() def start(self): if self.started: return None self.started = True self.thread = Thread(target=self.update, args=()) self.thread.start() return self def update(self): while self.started: ret,frame = self.cap.read() self.read_lock.acquire() self.ret, self.frame = ret,frame self.read_lock.release() def isOpened(self): return self.cap.isOpened() def read(self): self.read_lock.acquire() ret = self.ret frame = self.frame.copy() self.read_lock.release() return ret, frame def release(self): self.started = False self.thread.join() def __exit__(self, exc_type, exc_value, traceback): self.cap.release()
Podemos entonces instanciar nuestros dos objetos cap0 y cap1
cap0 = VideoStream("udp://127.0.0.1:8553").start() cap1 = VideoStream("udp://127.0.0.1:8554").start() while cap0.isOpened() and cap1.isOpened(): ret0, frame0 = cap0.read() ret1, frame1 = cap1.read() # Get current date and time #date_time = str(datetime.datetime.now()) now=datetime.datetime.now() date_time = now.strftime("%H:%M:%S") font = cv2.FONT_HERSHEY_SIMPLEX if ret0 and ret1: frame0 =cv2.resize(frame0, (640,480)) frame1 =cv2.resize(frame1, (640,480)) Hori = np.concatenate((frame0, frame1), axis=1) Hori = cv2.putText(Hori, date_time,(10, 100),font, 1,(210, 155, 155), 4, cv2.LINE_4) cv2.imshow('Cam 0&1', Hori) if cv2.waitKey(1) & 0xFF == ord('q'): break #release capture cap0.release() cap1.release() cv2.destroyAllWindows()
Los dos flujos de vídeo están ahora sincronizados y pueden guardarse en un archivo de vídeo o procesarse imágenes en ambos flujos al mismo tiempo.
Código completo para la recepción de secuencias de vídeo sincronizadas
#!/usr/bin/env python # -*- coding: utf-8 -*- # pip install opencv-python # pip install paramiko import numpy as np import cv2 import paramiko import time import datetime from threading import Thread, Lock #computer ip address import socket hostname = socket.gethostname() ip_address = socket.gethostbyname(hostname) print(f"IP Address: {ip_address}") #ip_address= "192.168.1.70" ssh0 = paramiko.SSHClient() ssh0.set_missing_host_key_policy(paramiko.AutoAddPolicy()) ssh0.connect("192.168.1.32", username="root", password="root") ssh1 = paramiko.SSHClient() ssh1.set_missing_host_key_policy(paramiko.AutoAddPolicy()) ssh1.connect("192.168.1.33", username="root", password="root") #python #stream_cmd0 = "python3 video-stream.py {}:{}?pkt_size=1316".format(ip_address,8553) #stream_cmd1 = "python3 video-stream.py {}:{}?pkt_size=1316".format(ip_address,8554) #ffmpeg stream_cmd0="""ffmpeg -video_size 640x480 -i /dev/video0 -vf "drawtext=fontfile=/usr/share/fonts/truetype/ttf-dejavu/DejaVuSans-Bold.tff: text='%{{localtime\:%T}}': fontsize=24: fontcolor=red@0.8: x=7: y=7" -f mpegts udp://{}:{}?pkt_size=1316""".format(ip_address,8553) stream_cmd1="""ffmpeg -video_size 640x480 -i /dev/video0 -vf "drawtext=fontfile=/usr/share/fonts/truetype/ttf-dejavu/DejaVuSans-Bold.tff: text='%{{localtime\:%T}}': fontsize=24: fontcolor=white@0.8: x=7: y=7" -f mpegts udp://{}:{}?pkt_size=1316""".format(ip_address,8554) #manual #ffmpeg -video_size 640x480 -i /dev/video0 -f mpegts udp://192.168.1.70:8553?pkt_size=1316 #ffmpeg -video_size 640x480 -i /dev/video0 -f mpegts udp://192.168.1.70:8554?pkt_size=1316 ssh1_stdin, ssh1_stdout, ssh1_stderr = ssh1.exec_command(stream_cmd1) ssh0_stdin, ssh0_stdout, ssh0_stderr = ssh0.exec_command(stream_cmd0) #print(ssh0_stdout.read().decode()) #print(ssh1_stdout.read().decode()) class VideoStream: def __init__(self, src=0): self.cap = cv2.VideoCapture(src) self.ret, self.frame = self.cap.read() self.started = False self.read_lock = Lock() def start(self): if self.started: return None self.started = True self.thread = Thread(target=self.update, args=()) self.thread.start() return self def update(self): while self.started: ret,frame = self.cap.read() self.read_lock.acquire() self.ret, self.frame = ret,frame self.read_lock.release() def isOpened(self): return self.cap.isOpened() def read(self): self.read_lock.acquire() ret = self.ret frame = self.frame.copy() self.read_lock.release() return ret, frame def release(self): self.started = False self.thread.join() def __exit__(self, exc_type, exc_value, traceback): self.cap.release() print("waiting for response...") #cap0 = cv2.VideoCapture("udp://127.0.0.1:8553") #cap1 = cv2.VideoCapture("udp://127.0.0.1:8554") cap0 = VideoStream("udp://127.0.0.1:8553").start() cap1 = VideoStream("udp://127.0.0.1:8554").start() while cap0.isOpened() and cap1.isOpened(): ret0, frame0 = cap0.read() ret1, frame1 = cap1.read() # Get current date and time #date_time = str(datetime.datetime.now()) now=datetime.datetime.now() date_time = now.strftime("%H:%M:%S") font = cv2.FONT_HERSHEY_SIMPLEX if ret0 and ret1: frame0 =cv2.resize(frame0, (640,480)) frame1 =cv2.resize(frame1, (640,480)) Hori = np.concatenate((frame0, frame1), axis=1) Hori = cv2.putText(Hori, date_time,(10, 100),font, 1,(210, 155, 155), 4, cv2.LINE_4) cv2.imshow('Cam 0&1', Hori) if cv2.waitKey(1) & 0xFF == ord('q'): break #release capture cap0.release() cap1.release() cv2.destroyAllWindows() #kill ffmpeg ssh1_stdin, ssh1_stdout, ssh1_stderr = ssh1.exec_command("killall ffmpeg") ssh0_stdin, ssh0_stdout, ssh0_stderr = ssh0.exec_command("killall ffmpeg") #close ssh ssh0.close() ssh1.close()
Aplicaciones
- Red de cámaras de vigilancia CCTV
- Reconocimiento de objetos en secuencias de vídeo sincronizadas