Neste tutorial, veremos como obter fluxos de vídeo sincronizados usando Python e OpenCV. Um dos problemas do streaming de vídeo é transmitir e adquirir sinais de vídeo de alta qualidade com o menor atraso possível. A capacidade de sincronizar fluxos de vídeo é poder processar os seus dados simultaneamente, como no reconhecimento de objectos.
Pré-requisitos: Transmissão de vídeo entre duas máquinas
Hardware
- Computador com Python e OpenCV
- 2 fontes de vídeo (ficheiro, fluxo, webcam, etc.)
- Uma ligação à Internet ou ethernet
Para este tutorial, estou a utilizar dois Orange Pi Zero, que geram fluxos de vídeo a partir de câmaras USB (ArduCam), ligadas à rede do computador através de um comutador Ethernet.
Transmissão em fluxo contínuo
Para criar o fluxo a partir do fluxo de vídeo da câmara, utilizamos o FFMPEG. Utilizamos o protocolo UDP, no qual especificamos o endereço IP do computador e a porta em que se encontra o fluxo.
Fluxo de vídeo 0
ffmpeg -video_size 640x480 -i /dev/video0 -f mpegts udp://{ip_address}:8553?pkt_size=1316
Fluxo de vídeo 1
ffmpeg -video_size 640x480 -i /dev/video0 -f mpegts udp://{ip_address}:8554?pkt_size=1316
Para testar o fluxo de vídeo, pode utilizar o comando ffplay
ffplay upd://127.0.0.1:8553 #video streaming 0 ffplay upd://127.0.0.1:8554 #video streaming 1
No tutorial, vamos utilizar o filtro drawtext, que permite adicionar texto ao vídeo. Isto permite-nos mostrar a hora e observar facilmente o atraso.
-vf "drawtext=fontfile=/usr/share/fonts/truetype/ttf-dejavu/DejaVuSans-Bold.tff: text='%{{localtime\:%T}}': fontsize=24: fontcolor=white@0.8: x=7: y=7"
N.B.: é possível colocar o comando ffmpeg num script Python
Iniciar comandos via SSH
Por uma questão de simplicidade, vamos executar os comandos ffmpeg a partir do script Python via SSH. Para fazer isso, usamos a biblioteca paramiko
import socket import paramiko #computer ip address hostname = socket.gethostname() ip_address = socket.gethostbyname(hostname) print(f"IP Address: {ip_address}") #ip_address= "192.168.1.70" ssh0 = paramiko.SSHClient() ssh0.set_missing_host_key_policy(paramiko.AutoAddPolicy()) ssh1 = paramiko.SSHClient() ssh1.set_missing_host_key_policy(paramiko.AutoAddPolicy()) ssh0.connect("192.168.1.32", username="root", password="root") ssh1.connect("192.168.1.33", username="root", password="root") #stream_cmd0 = "python3 video-stream.py {}:{}?pkt_size=1316".format(ip_address,8553) #stream_cmd1 = "python3 video-stream.py {}:{}?pkt_size=1316".format(ip_address,8554) stream_cmd0="""ffmpeg -video_size 640x480 -i /dev/video0 -vf "drawtext=fontfile=/usr/share/fonts/truetype/ttf-dejavu/DejaVuSans-Bold.tff: text='%{{localtime\:%T}}': fontsize=24: fontcolor=red@0.8: x=7: y=7" -f mpegts udp://{}:{}?pkt_size=1316""".format(ip_address,8553) stream_cmd1="""ffmpeg -video_size 640x480 -i /dev/video0 -vf "drawtext=fontfile=/usr/share/fonts/truetype/ttf-dejavu/DejaVuSans-Bold.tff: text='%{{localtime\:%T}}': fontsize=24: fontcolor=white@0.8: x=7: y=7" -f mpegts udp://{}:{}?pkt_size=1316""".format(ip_address,8554) ssh1_stdin, ssh1_stdout, ssh1_stderr = ssh1.exec_command(stream_cmd1) ssh0_stdin, ssh0_stdout, ssh0_stderr = ssh0.exec_command(stream_cmd0)
Nota: não te esqueças de fechar a ligação ssh no final do programa ssh0.close(), ssh1.close()
Receção de fluxos não sincronizados
Para receber os fluxos de vídeo, utilizamos o OpenCV com Python.
Primeiro, abrimos os dois fluxos de vídeo, que reproduzimos em ciclo enquanto estiverem abertos
cap0 = cv2.VideoCapture("udp://127.0.0.1:8553") cap1 = cv2.VideoCapture("udp://127.0.0.1:8554")
Por razões práticas, concatenamos a imagem na mesma janela, garantindo que têm dimensões compatíveis
frame0 =cv2.resize(frame0, (640,480)) frame1 =cv2.resize(frame1, (640,480)) Hori = np.concatenate((frame0, frame1), axis=1)
Por fim, apresentamos a hora local do computador para efeitos de comparação
Hori = cv2.putText(Hori, date_time,(10, 100),font, 1,(210, 155, 155), 4, cv2.LINE_4)
Eis o código completo para a captura de fluxos de vídeo não sincronizados
N.B.: Este código funciona depois de os comandos ffmpeg terem sido executados em cada máquina.
#!/usr/bin/env python # -*- coding: utf-8 -*- import numpy as np import cv2 import datetime #<add code to run ffmpeg command via ssh> cap0 = cv2.VideoCapture("udp://127.0.0.1:8553") cap1 = cv2.VideoCapture("udp://127.0.0.1:8554") while cap0.isOpened() and cap1.isOpened(): # Capture frame-by-frame ret0, frame0 = cap0.read() ret1, frame1 = cap1.read() # Get current date and time now=datetime.datetime.now() date_time = now.strftime("%H:%M:%S") font = cv2.FONT_HERSHEY_SIMPLEX # write the date time in the video frame #frame0 = cv2.putText(frame0, date_time,(10, 100),font, 1,(210, 155, 155), 4, cv2.LINE_4) #frame1 = cv2.putText(frame1, date_time,(10, 100),font, 1,(210, 155, 155), 4, cv2.LINE_4) #if (ret0): # # Display the resulting frame # cv2.imshow('Cam 0', frame0) #if (ret1): # # Display the resulting frame # cv2.imshow('Cam 1', frame1) if (ret0 and ret1): frame0 =cv2.resize(frame0, (640,480)) frame1 =cv2.resize(frame1, (640,480)) Hori = np.concatenate((frame0, frame1), axis=1) Hori = cv2.putText(Hori, date_time,(10, 100),font, 1,(210, 155, 155), 4, cv2.LINE_4) cv2.imshow('Cam 0&1', Hori) if cv2.waitKey(1) & 0xFF == ord('q'): break #release captures cap0.release() cap1.release() cv2.destroyAllWindows()
Existe um atraso de um segundo entre os dois fluxos de vídeo. Isto não é aceitável se quisermos processar as imagens de forma síncrona.
Captura de fluxo com multithreading
A solução mais simples é dedicar threads à captura de imagens. Para isso, usamos o pacote threading. Vamos criar uma classe VideoStream que irá gerir a reprodução do fluxo na sua própria thread
class VideoStream: def __init__(self, src=0): self.cap = cv2.VideoCapture(src) self.ret, self.frame = self.cap.read() self.started = False self.read_lock = Lock() def start(self): if self.started: return None self.started = True self.thread = Thread(target=self.update, args=()) self.thread.start() return self def update(self): while self.started: ret,frame = self.cap.read() self.read_lock.acquire() self.ret, self.frame = ret,frame self.read_lock.release() def isOpened(self): return self.cap.isOpened() def read(self): self.read_lock.acquire() ret = self.ret frame = self.frame.copy() self.read_lock.release() return ret, frame def release(self): self.started = False self.thread.join() def __exit__(self, exc_type, exc_value, traceback): self.cap.release()
Podemos então instanciar os nossos dois objectos cap0 e cap1
cap0 = VideoStream("udp://127.0.0.1:8553").start() cap1 = VideoStream("udp://127.0.0.1:8554").start() while cap0.isOpened() and cap1.isOpened(): ret0, frame0 = cap0.read() ret1, frame1 = cap1.read() # Get current date and time #date_time = str(datetime.datetime.now()) now=datetime.datetime.now() date_time = now.strftime("%H:%M:%S") font = cv2.FONT_HERSHEY_SIMPLEX if ret0 and ret1: frame0 =cv2.resize(frame0, (640,480)) frame1 =cv2.resize(frame1, (640,480)) Hori = np.concatenate((frame0, frame1), axis=1) Hori = cv2.putText(Hori, date_time,(10, 100),font, 1,(210, 155, 155), 4, cv2.LINE_4) cv2.imshow('Cam 0&1', Hori) if cv2.waitKey(1) & 0xFF == ord('q'): break #release capture cap0.release() cap1.release() cv2.destroyAllWindows()
Os dois fluxos de vídeo estão agora sincronizados e podem ser guardados num ficheiro de vídeo ou processados por imagem em ambos os fluxos ao mesmo tempo.
Código completo para receção de fluxos de vídeo sincronizados
#!/usr/bin/env python # -*- coding: utf-8 -*- # pip install opencv-python # pip install paramiko import numpy as np import cv2 import paramiko import time import datetime from threading import Thread, Lock #computer ip address import socket hostname = socket.gethostname() ip_address = socket.gethostbyname(hostname) print(f"IP Address: {ip_address}") #ip_address= "192.168.1.70" ssh0 = paramiko.SSHClient() ssh0.set_missing_host_key_policy(paramiko.AutoAddPolicy()) ssh0.connect("192.168.1.32", username="root", password="root") ssh1 = paramiko.SSHClient() ssh1.set_missing_host_key_policy(paramiko.AutoAddPolicy()) ssh1.connect("192.168.1.33", username="root", password="root") #python #stream_cmd0 = "python3 video-stream.py {}:{}?pkt_size=1316".format(ip_address,8553) #stream_cmd1 = "python3 video-stream.py {}:{}?pkt_size=1316".format(ip_address,8554) #ffmpeg stream_cmd0="""ffmpeg -video_size 640x480 -i /dev/video0 -vf "drawtext=fontfile=/usr/share/fonts/truetype/ttf-dejavu/DejaVuSans-Bold.tff: text='%{{localtime\:%T}}': fontsize=24: fontcolor=red@0.8: x=7: y=7" -f mpegts udp://{}:{}?pkt_size=1316""".format(ip_address,8553) stream_cmd1="""ffmpeg -video_size 640x480 -i /dev/video0 -vf "drawtext=fontfile=/usr/share/fonts/truetype/ttf-dejavu/DejaVuSans-Bold.tff: text='%{{localtime\:%T}}': fontsize=24: fontcolor=white@0.8: x=7: y=7" -f mpegts udp://{}:{}?pkt_size=1316""".format(ip_address,8554) #manual #ffmpeg -video_size 640x480 -i /dev/video0 -f mpegts udp://192.168.1.70:8553?pkt_size=1316 #ffmpeg -video_size 640x480 -i /dev/video0 -f mpegts udp://192.168.1.70:8554?pkt_size=1316 ssh1_stdin, ssh1_stdout, ssh1_stderr = ssh1.exec_command(stream_cmd1) ssh0_stdin, ssh0_stdout, ssh0_stderr = ssh0.exec_command(stream_cmd0) #print(ssh0_stdout.read().decode()) #print(ssh1_stdout.read().decode()) class VideoStream: def __init__(self, src=0): self.cap = cv2.VideoCapture(src) self.ret, self.frame = self.cap.read() self.started = False self.read_lock = Lock() def start(self): if self.started: return None self.started = True self.thread = Thread(target=self.update, args=()) self.thread.start() return self def update(self): while self.started: ret,frame = self.cap.read() self.read_lock.acquire() self.ret, self.frame = ret,frame self.read_lock.release() def isOpened(self): return self.cap.isOpened() def read(self): self.read_lock.acquire() ret = self.ret frame = self.frame.copy() self.read_lock.release() return ret, frame def release(self): self.started = False self.thread.join() def __exit__(self, exc_type, exc_value, traceback): self.cap.release() print("waiting for response...") #cap0 = cv2.VideoCapture("udp://127.0.0.1:8553") #cap1 = cv2.VideoCapture("udp://127.0.0.1:8554") cap0 = VideoStream("udp://127.0.0.1:8553").start() cap1 = VideoStream("udp://127.0.0.1:8554").start() while cap0.isOpened() and cap1.isOpened(): ret0, frame0 = cap0.read() ret1, frame1 = cap1.read() # Get current date and time #date_time = str(datetime.datetime.now()) now=datetime.datetime.now() date_time = now.strftime("%H:%M:%S") font = cv2.FONT_HERSHEY_SIMPLEX if ret0 and ret1: frame0 =cv2.resize(frame0, (640,480)) frame1 =cv2.resize(frame1, (640,480)) Hori = np.concatenate((frame0, frame1), axis=1) Hori = cv2.putText(Hori, date_time,(10, 100),font, 1,(210, 155, 155), 4, cv2.LINE_4) cv2.imshow('Cam 0&1', Hori) if cv2.waitKey(1) & 0xFF == ord('q'): break #release capture cap0.release() cap1.release() cv2.destroyAllWindows() #kill ffmpeg ssh1_stdin, ssh1_stdout, ssh1_stderr = ssh1.exec_command("killall ffmpeg") ssh0_stdin, ssh0_stdout, ssh0_stderr = ssh0.exec_command("killall ffmpeg") #close ssh ssh0.close() ssh1.close()
Aplicações
- Rede de câmaras de vigilância CCTV
- Reconhecimento de objectos em fluxos de vídeo sincronizados