Neste tutorial, veremos como obter fluxos de vídeo sincronizados usando Python e OpenCV. Um dos problemas do streaming de vídeo é transmitir e adquirir sinais de vídeo de alta qualidade com o menor atraso possível. A capacidade de sincronizar fluxos de vídeo é poder processar os seus dados simultaneamente, como no reconhecimento de objectos.
Pré-requisitos: Transmissão de vídeo entre duas máquinas
Hardware
- Computador com Python e OpenCV
- 2 fontes de vídeo (ficheiro, fluxo, webcam, etc.)
- Uma ligação à Internet ou ethernet
Para este tutorial, estou a utilizar dois Orange Pi Zero, que geram fluxos de vídeo a partir de câmaras USB (ArduCam), ligadas à rede do computador através de um comutador Ethernet.
Transmissão em fluxo contínuo
Para criar o fluxo a partir do fluxo de vídeo da câmara, utilizamos o FFMPEG. Utilizamos o protocolo UDP, no qual especificamos o endereço IP do computador e a porta em que se encontra o fluxo.
Fluxo de vídeo 0
ffmpeg -video_size 640x480 -i /dev/video0 -f mpegts udp://{ip_address}:8553?pkt_size=1316
Fluxo de vídeo 1
ffmpeg -video_size 640x480 -i /dev/video0 -f mpegts udp://{ip_address}:8554?pkt_size=1316
Para testar o fluxo de vídeo, pode utilizar o comando ffplay
ffplay upd://127.0.0.1:8553 #video streaming 0 ffplay upd://127.0.0.1:8554 #video streaming 1
No tutorial, vamos utilizar o filtro drawtext, que permite adicionar texto ao vídeo. Isto permite-nos mostrar a hora e observar facilmente o atraso.
-vf "drawtext=fontfile=/usr/share/fonts/truetype/ttf-dejavu/DejaVuSans-Bold.tff: text='%{{localtime\:%T}}': fontsize=24: fontcolor=white@0.8: x=7: y=7"
N.B.: é possível colocar o comando ffmpeg num script Python
Iniciar comandos via SSH
Por uma questão de simplicidade, vamos executar os comandos ffmpeg a partir do script Python via SSH. Para fazer isso, usamos a biblioteca paramiko
import socket
import paramiko
#computer ip address
hostname = socket.gethostname()
ip_address = socket.gethostbyname(hostname)
print(f"IP Address: {ip_address}")
#ip_address= "192.168.1.70"
ssh0 = paramiko.SSHClient()
ssh0.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh1 = paramiko.SSHClient()
ssh1.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh0.connect("192.168.1.32", username="root", password="root")
ssh1.connect("192.168.1.33", username="root", password="root")
#stream_cmd0 = "python3 video-stream.py {}:{}?pkt_size=1316".format(ip_address,8553)
#stream_cmd1 = "python3 video-stream.py {}:{}?pkt_size=1316".format(ip_address,8554)
stream_cmd0="""ffmpeg -video_size 640x480 -i /dev/video0 -vf "drawtext=fontfile=/usr/share/fonts/truetype/ttf-dejavu/DejaVuSans-Bold.tff: text='%{{localtime\:%T}}': fontsize=24: fontcolor=red@0.8: x=7: y=7" -f mpegts udp://{}:{}?pkt_size=1316""".format(ip_address,8553)
stream_cmd1="""ffmpeg -video_size 640x480 -i /dev/video0 -vf "drawtext=fontfile=/usr/share/fonts/truetype/ttf-dejavu/DejaVuSans-Bold.tff: text='%{{localtime\:%T}}': fontsize=24: fontcolor=white@0.8: x=7: y=7" -f mpegts udp://{}:{}?pkt_size=1316""".format(ip_address,8554)
ssh1_stdin, ssh1_stdout, ssh1_stderr = ssh1.exec_command(stream_cmd1)
ssh0_stdin, ssh0_stdout, ssh0_stderr = ssh0.exec_command(stream_cmd0)
Nota: não te esqueças de fechar a ligação ssh no final do programa ssh0.close(), ssh1.close()
Receção de fluxos não sincronizados
Para receber os fluxos de vídeo, utilizamos o OpenCV com Python.
Primeiro, abrimos os dois fluxos de vídeo, que reproduzimos em ciclo enquanto estiverem abertos
cap0 = cv2.VideoCapture("udp://127.0.0.1:8553")
cap1 = cv2.VideoCapture("udp://127.0.0.1:8554")
Por razões práticas, concatenamos a imagem na mesma janela, garantindo que têm dimensões compatíveis
frame0 =cv2.resize(frame0, (640,480)) frame1 =cv2.resize(frame1, (640,480)) Hori = np.concatenate((frame0, frame1), axis=1)
Por fim, apresentamos a hora local do computador para efeitos de comparação
Hori = cv2.putText(Hori, date_time,(10, 100),font, 1,(210, 155, 155), 4, cv2.LINE_4)
Eis o código completo para a captura de fluxos de vídeo não sincronizados
N.B.: Este código funciona depois de os comandos ffmpeg terem sido executados em cada máquina.
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import numpy as np
import cv2
import datetime
#<add code to run ffmpeg command via ssh>
cap0 = cv2.VideoCapture("udp://127.0.0.1:8553")
cap1 = cv2.VideoCapture("udp://127.0.0.1:8554")
while cap0.isOpened() and cap1.isOpened():
# Capture frame-by-frame
ret0, frame0 = cap0.read()
ret1, frame1 = cap1.read()
# Get current date and time
now=datetime.datetime.now()
date_time = now.strftime("%H:%M:%S")
font = cv2.FONT_HERSHEY_SIMPLEX
# write the date time in the video frame
#frame0 = cv2.putText(frame0, date_time,(10, 100),font, 1,(210, 155, 155), 4, cv2.LINE_4)
#frame1 = cv2.putText(frame1, date_time,(10, 100),font, 1,(210, 155, 155), 4, cv2.LINE_4)
#if (ret0):
# # Display the resulting frame
# cv2.imshow('Cam 0', frame0)
#if (ret1):
# # Display the resulting frame
# cv2.imshow('Cam 1', frame1)
if (ret0 and ret1):
frame0 =cv2.resize(frame0, (640,480))
frame1 =cv2.resize(frame1, (640,480))
Hori = np.concatenate((frame0, frame1), axis=1)
Hori = cv2.putText(Hori, date_time,(10, 100),font, 1,(210, 155, 155), 4, cv2.LINE_4)
cv2.imshow('Cam 0&1', Hori)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
#release captures
cap0.release()
cap1.release()
cv2.destroyAllWindows()
Existe um atraso de um segundo entre os dois fluxos de vídeo. Isto não é aceitável se quisermos processar as imagens de forma síncrona.
Captura de fluxo com multithreading
A solução mais simples é dedicar threads à captura de imagens. Para isso, usamos o pacote threading. Vamos criar uma classe VideoStream que irá gerir a reprodução do fluxo na sua própria thread
class VideoStream: def __init__(self, src=0): self.cap = cv2.VideoCapture(src) self.ret, self.frame = self.cap.read() self.started = False self.read_lock = Lock() def start(self): if self.started: return None self.started = True self.thread = Thread(target=self.update, args=()) self.thread.start() return self def update(self): while self.started: ret,frame = self.cap.read() self.read_lock.acquire() self.ret, self.frame = ret,frame self.read_lock.release() def isOpened(self): return self.cap.isOpened() def read(self): self.read_lock.acquire() ret = self.ret frame = self.frame.copy() self.read_lock.release() return ret, frame def release(self): self.started = False self.thread.join() def __exit__(self, exc_type, exc_value, traceback): self.cap.release()
Podemos então instanciar os nossos dois objectos cap0 e cap1
cap0 = VideoStream("udp://127.0.0.1:8553").start()
cap1 = VideoStream("udp://127.0.0.1:8554").start()
while cap0.isOpened() and cap1.isOpened():
ret0, frame0 = cap0.read()
ret1, frame1 = cap1.read()
# Get current date and time
#date_time = str(datetime.datetime.now())
now=datetime.datetime.now()
date_time = now.strftime("%H:%M:%S")
font = cv2.FONT_HERSHEY_SIMPLEX
if ret0 and ret1:
frame0 =cv2.resize(frame0, (640,480))
frame1 =cv2.resize(frame1, (640,480))
Hori = np.concatenate((frame0, frame1), axis=1)
Hori = cv2.putText(Hori, date_time,(10, 100),font, 1,(210, 155, 155), 4, cv2.LINE_4)
cv2.imshow('Cam 0&1', Hori)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
#release capture
cap0.release()
cap1.release()
cv2.destroyAllWindows()
Os dois fluxos de vídeo estão agora sincronizados e podem ser guardados num ficheiro de vídeo ou processados por imagem em ambos os fluxos ao mesmo tempo.
Código completo para receção de fluxos de vídeo sincronizados
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# pip install opencv-python
# pip install paramiko
import numpy as np
import cv2
import paramiko
import time
import datetime
from threading import Thread, Lock
#computer ip address
import socket
hostname = socket.gethostname()
ip_address = socket.gethostbyname(hostname)
print(f"IP Address: {ip_address}")
#ip_address= "192.168.1.70"
ssh0 = paramiko.SSHClient()
ssh0.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh0.connect("192.168.1.32", username="root", password="root")
ssh1 = paramiko.SSHClient()
ssh1.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh1.connect("192.168.1.33", username="root", password="root")
#python
#stream_cmd0 = "python3 video-stream.py {}:{}?pkt_size=1316".format(ip_address,8553)
#stream_cmd1 = "python3 video-stream.py {}:{}?pkt_size=1316".format(ip_address,8554)
#ffmpeg
stream_cmd0="""ffmpeg -video_size 640x480 -i /dev/video0 -vf "drawtext=fontfile=/usr/share/fonts/truetype/ttf-dejavu/DejaVuSans-Bold.tff: text='%{{localtime\:%T}}': fontsize=24: fontcolor=red@0.8: x=7: y=7" -f mpegts udp://{}:{}?pkt_size=1316""".format(ip_address,8553)
stream_cmd1="""ffmpeg -video_size 640x480 -i /dev/video0 -vf "drawtext=fontfile=/usr/share/fonts/truetype/ttf-dejavu/DejaVuSans-Bold.tff: text='%{{localtime\:%T}}': fontsize=24: fontcolor=white@0.8: x=7: y=7" -f mpegts udp://{}:{}?pkt_size=1316""".format(ip_address,8554)
#manual
#ffmpeg -video_size 640x480 -i /dev/video0 -f mpegts udp://192.168.1.70:8553?pkt_size=1316
#ffmpeg -video_size 640x480 -i /dev/video0 -f mpegts udp://192.168.1.70:8554?pkt_size=1316
ssh1_stdin, ssh1_stdout, ssh1_stderr = ssh1.exec_command(stream_cmd1)
ssh0_stdin, ssh0_stdout, ssh0_stderr = ssh0.exec_command(stream_cmd0)
#print(ssh0_stdout.read().decode())
#print(ssh1_stdout.read().decode())
class VideoStream:
def __init__(self, src=0):
self.cap = cv2.VideoCapture(src)
self.ret, self.frame = self.cap.read()
self.started = False
self.read_lock = Lock()
def start(self):
if self.started:
return None
self.started = True
self.thread = Thread(target=self.update, args=())
self.thread.start()
return self
def update(self):
while self.started:
ret,frame = self.cap.read()
self.read_lock.acquire()
self.ret, self.frame = ret,frame
self.read_lock.release()
def isOpened(self):
return self.cap.isOpened()
def read(self):
self.read_lock.acquire()
ret = self.ret
frame = self.frame.copy()
self.read_lock.release()
return ret, frame
def release(self):
self.started = False
self.thread.join()
def __exit__(self, exc_type, exc_value, traceback):
self.cap.release()
print("waiting for response...")
#cap0 = cv2.VideoCapture("udp://127.0.0.1:8553")
#cap1 = cv2.VideoCapture("udp://127.0.0.1:8554")
cap0 = VideoStream("udp://127.0.0.1:8553").start()
cap1 = VideoStream("udp://127.0.0.1:8554").start()
while cap0.isOpened() and cap1.isOpened():
ret0, frame0 = cap0.read()
ret1, frame1 = cap1.read()
# Get current date and time
#date_time = str(datetime.datetime.now())
now=datetime.datetime.now()
date_time = now.strftime("%H:%M:%S")
font = cv2.FONT_HERSHEY_SIMPLEX
if ret0 and ret1:
frame0 =cv2.resize(frame0, (640,480))
frame1 =cv2.resize(frame1, (640,480))
Hori = np.concatenate((frame0, frame1), axis=1)
Hori = cv2.putText(Hori, date_time,(10, 100),font, 1,(210, 155, 155), 4, cv2.LINE_4)
cv2.imshow('Cam 0&1', Hori)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
#release capture
cap0.release()
cap1.release()
cv2.destroyAllWindows()
#kill ffmpeg
ssh1_stdin, ssh1_stdout, ssh1_stderr = ssh1.exec_command("killall ffmpeg")
ssh0_stdin, ssh0_stdout, ssh0_stderr = ssh0.exec_command("killall ffmpeg")
#close ssh
ssh0.close()
ssh1.close()
Aplicações
- Rede de câmaras de vigilância CCTV
- Reconhecimento de objectos em fluxos de vídeo sincronizados
