Site icon AranaCorp

Fluxos de vídeo sincronizados com OpenCV e Multithreading

Neste tutorial, veremos como obter fluxos de vídeo sincronizados usando Python e OpenCV. Um dos problemas do streaming de vídeo é transmitir e adquirir sinais de vídeo de alta qualidade com o menor atraso possível. A capacidade de sincronizar fluxos de vídeo é poder processar os seus dados simultaneamente, como no reconhecimento de objectos.

Pré-requisitos: Transmissão de vídeo entre duas máquinas

Hardware

Para este tutorial, estou a utilizar dois Orange Pi Zero, que geram fluxos de vídeo a partir de câmaras USB (ArduCam), ligadas à rede do computador através de um comutador Ethernet.

Transmissão em fluxo contínuo

Para criar o fluxo a partir do fluxo de vídeo da câmara, utilizamos o FFMPEG. Utilizamos o protocolo UDP, no qual especificamos o endereço IP do computador e a porta em que se encontra o fluxo.

Fluxo de vídeo 0

ffmpeg -video_size 640x480 -i /dev/video0 -f mpegts udp://{ip_address}:8553?pkt_size=1316

Fluxo de vídeo 1

ffmpeg -video_size 640x480 -i /dev/video0 -f mpegts udp://{ip_address}:8554?pkt_size=1316

Para testar o fluxo de vídeo, pode utilizar o comando ffplay

ffplay upd://127.0.0.1:8553 #video streaming 0
ffplay upd://127.0.0.1:8554 #video streaming 1

No tutorial, vamos utilizar o filtro drawtext, que permite adicionar texto ao vídeo. Isto permite-nos mostrar a hora e observar facilmente o atraso.

-vf "drawtext=fontfile=/usr/share/fonts/truetype/ttf-dejavu/DejaVuSans-Bold.tff: text='%{{localtime\:%T}}': fontsize=24: fontcolor=white@0.8: x=7: y=7"

N.B.: é possível colocar o comando ffmpeg num script Python

Iniciar comandos via SSH

Por uma questão de simplicidade, vamos executar os comandos ffmpeg a partir do script Python via SSH. Para fazer isso, usamos a biblioteca paramiko

import socket
import paramiko

#computer ip address
hostname = socket.gethostname()
ip_address = socket.gethostbyname(hostname)
print(f"IP Address: {ip_address}")
#ip_address= "192.168.1.70"

ssh0 = paramiko.SSHClient()
ssh0.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh1 = paramiko.SSHClient()
ssh1.set_missing_host_key_policy(paramiko.AutoAddPolicy())

ssh0.connect("192.168.1.32", username="root", password="root")
ssh1.connect("192.168.1.33", username="root", password="root")

#stream_cmd0 = "python3 video-stream.py {}:{}?pkt_size=1316".format(ip_address,8553)
#stream_cmd1 = "python3 video-stream.py {}:{}?pkt_size=1316".format(ip_address,8554)
stream_cmd0="""ffmpeg -video_size 640x480 -i /dev/video0 -vf "drawtext=fontfile=/usr/share/fonts/truetype/ttf-dejavu/DejaVuSans-Bold.tff: text='%{{localtime\:%T}}': fontsize=24: fontcolor=red@0.8: x=7: y=7" -f mpegts udp://{}:{}?pkt_size=1316""".format(ip_address,8553)
stream_cmd1="""ffmpeg -video_size 640x480 -i /dev/video0 -vf "drawtext=fontfile=/usr/share/fonts/truetype/ttf-dejavu/DejaVuSans-Bold.tff: text='%{{localtime\:%T}}': fontsize=24: fontcolor=white@0.8: x=7: y=7" -f mpegts udp://{}:{}?pkt_size=1316""".format(ip_address,8554)

ssh1_stdin, ssh1_stdout, ssh1_stderr = ssh1.exec_command(stream_cmd1)
ssh0_stdin, ssh0_stdout, ssh0_stderr = ssh0.exec_command(stream_cmd0)

Nota: não te esqueças de fechar a ligação ssh no final do programa ssh0.close(), ssh1.close()

Receção de fluxos não sincronizados

Para receber os fluxos de vídeo, utilizamos o OpenCV com Python.

Primeiro, abrimos os dois fluxos de vídeo, que reproduzimos em ciclo enquanto estiverem abertos

cap0 = cv2.VideoCapture("udp://127.0.0.1:8553")
cap1 = cv2.VideoCapture("udp://127.0.0.1:8554")

Por razões práticas, concatenamos a imagem na mesma janela, garantindo que têm dimensões compatíveis

frame0 =cv2.resize(frame0, (640,480))
frame1 =cv2.resize(frame1, (640,480))
Hori = np.concatenate((frame0, frame1), axis=1)

Por fim, apresentamos a hora local do computador para efeitos de comparação

Hori = cv2.putText(Hori, date_time,(10, 100),font, 1,(210, 155, 155), 4, cv2.LINE_4)

Eis o código completo para a captura de fluxos de vídeo não sincronizados

N.B.: Este código funciona depois de os comandos ffmpeg terem sido executados em cada máquina.

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import numpy as np
import cv2
import datetime

#<add code to run ffmpeg command via ssh>

cap0 = cv2.VideoCapture("udp://127.0.0.1:8553")
cap1 = cv2.VideoCapture("udp://127.0.0.1:8554")

while cap0.isOpened() and cap1.isOpened():
	# Capture frame-by-frame
	ret0, frame0 = cap0.read()
	ret1, frame1 = cap1.read()

	# Get current date and time  
	now=datetime.datetime.now()
	date_time = now.strftime("%H:%M:%S")
	font = cv2.FONT_HERSHEY_SIMPLEX

	# write the date time in the video frame
	#frame0 = cv2.putText(frame0, date_time,(10, 100),font, 1,(210, 155, 155), 4, cv2.LINE_4)
	#frame1 = cv2.putText(frame1, date_time,(10, 100),font, 1,(210, 155, 155), 4, cv2.LINE_4)
	
	#if (ret0):
	#	# Display the resulting frame
	#	cv2.imshow('Cam 0', frame0)

	#if (ret1):
	#	# Display the resulting frame
	#	cv2.imshow('Cam 1', frame1)

	if (ret0 and ret1):
		frame0 =cv2.resize(frame0, (640,480))
		frame1 =cv2.resize(frame1, (640,480))
		Hori = np.concatenate((frame0, frame1), axis=1)
		Hori = cv2.putText(Hori, date_time,(10, 100),font, 1,(210, 155, 155), 4, cv2.LINE_4)
		cv2.imshow('Cam 0&1', Hori)

	if cv2.waitKey(1) & 0xFF == ord('q'):
		break

#release captures
cap0.release()
cap1.release()
cv2.destroyAllWindows()

Existe um atraso de um segundo entre os dois fluxos de vídeo. Isto não é aceitável se quisermos processar as imagens de forma síncrona.

Captura de fluxo com multithreading

A solução mais simples é dedicar threads à captura de imagens. Para isso, usamos o pacote threading. Vamos criar uma classe VideoStream que irá gerir a reprodução do fluxo na sua própria thread

class VideoStream:
	def __init__(self, src=0):
		self.cap = cv2.VideoCapture(src)
		self.ret, self.frame = self.cap.read()
		self.started = False
		self.read_lock = Lock()
		
	def start(self):
		if self.started:
			return None
		self.started = True
		self.thread = Thread(target=self.update, args=())
		self.thread.start()
		return self
		
	def update(self):
		while self.started:
			ret,frame = self.cap.read()
			self.read_lock.acquire()
			self.ret, self.frame = ret,frame
			self.read_lock.release()
	
	def isOpened(self):
		return self.cap.isOpened()
				
	def read(self):
		self.read_lock.acquire()
		ret = self.ret
		frame = self.frame.copy()
		self.read_lock.release()
		return ret, frame
		
	def release(self):
		self.started = False
		self.thread.join()
		
	def __exit__(self, exc_type, exc_value, traceback):
		self.cap.release()

Podemos então instanciar os nossos dois objectos cap0 e cap1

cap0 = VideoStream("udp://127.0.0.1:8553").start()
cap1 = VideoStream("udp://127.0.0.1:8554").start()


while cap0.isOpened() and cap1.isOpened():
	ret0, frame0 = cap0.read()
	ret1, frame1 = cap1.read()

	# Get current date and time  
	#date_time = str(datetime.datetime.now())
	now=datetime.datetime.now()
	date_time = now.strftime("%H:%M:%S")
	font = cv2.FONT_HERSHEY_SIMPLEX
	
	if ret0 and ret1:
		frame0 =cv2.resize(frame0, (640,480))
		frame1 =cv2.resize(frame1, (640,480))
		Hori = np.concatenate((frame0, frame1), axis=1)
		Hori = cv2.putText(Hori, date_time,(10, 100),font, 1,(210, 155, 155), 4, cv2.LINE_4)
		cv2.imshow('Cam 0&1', Hori)
			
	if cv2.waitKey(1) & 0xFF == ord('q'):
		break
#release capture
cap0.release()
cap1.release()
cv2.destroyAllWindows()

Os dois fluxos de vídeo estão agora sincronizados e podem ser guardados num ficheiro de vídeo ou processados por imagem em ambos os fluxos ao mesmo tempo.

Código completo para receção de fluxos de vídeo sincronizados

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# pip install opencv-python
# pip install paramiko

import numpy as np
import cv2
import paramiko
import time
import datetime
from threading import Thread, Lock


#computer ip address

import socket
hostname = socket.gethostname()
ip_address = socket.gethostbyname(hostname)
print(f"IP Address: {ip_address}")
#ip_address= "192.168.1.70"

ssh0 = paramiko.SSHClient()
ssh0.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh0.connect("192.168.1.32", username="root", password="root")
ssh1 = paramiko.SSHClient()
ssh1.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh1.connect("192.168.1.33", username="root", password="root")

#python
#stream_cmd0 = "python3 video-stream.py {}:{}?pkt_size=1316".format(ip_address,8553)
#stream_cmd1 = "python3 video-stream.py {}:{}?pkt_size=1316".format(ip_address,8554)

#ffmpeg
stream_cmd0="""ffmpeg -video_size 640x480 -i /dev/video0 -vf "drawtext=fontfile=/usr/share/fonts/truetype/ttf-dejavu/DejaVuSans-Bold.tff: text='%{{localtime\:%T}}': fontsize=24: fontcolor=red@0.8: x=7: y=7" -f mpegts udp://{}:{}?pkt_size=1316""".format(ip_address,8553)
stream_cmd1="""ffmpeg -video_size 640x480 -i /dev/video0 -vf "drawtext=fontfile=/usr/share/fonts/truetype/ttf-dejavu/DejaVuSans-Bold.tff: text='%{{localtime\:%T}}': fontsize=24: fontcolor=white@0.8: x=7: y=7" -f mpegts udp://{}:{}?pkt_size=1316""".format(ip_address,8554)

#manual
#ffmpeg -video_size 640x480 -i /dev/video0 -f mpegts udp://192.168.1.70:8553?pkt_size=1316
#ffmpeg -video_size 640x480 -i /dev/video0 -f mpegts udp://192.168.1.70:8554?pkt_size=1316

ssh1_stdin, ssh1_stdout, ssh1_stderr = ssh1.exec_command(stream_cmd1)
ssh0_stdin, ssh0_stdout, ssh0_stderr = ssh0.exec_command(stream_cmd0)
#print(ssh0_stdout.read().decode())
#print(ssh1_stdout.read().decode())


class VideoStream:
	def __init__(self, src=0):
		self.cap = cv2.VideoCapture(src)
		self.ret, self.frame = self.cap.read()
		self.started = False
		self.read_lock = Lock()
		
	def start(self):
		if self.started:
			return None
		self.started = True
		self.thread = Thread(target=self.update, args=())
		self.thread.start()
		return self
		
	def update(self):
		while self.started:
			ret,frame = self.cap.read()
			self.read_lock.acquire()
			self.ret, self.frame = ret,frame
			self.read_lock.release()
	
	def isOpened(self):
		return self.cap.isOpened()
				
	def read(self):
		self.read_lock.acquire()
		ret = self.ret
		frame = self.frame.copy()
		self.read_lock.release()
		return ret, frame
		
	def release(self):
		self.started = False
		self.thread.join()
		
	def __exit__(self, exc_type, exc_value, traceback):
		self.cap.release()





print("waiting for response...")


#cap0 = cv2.VideoCapture("udp://127.0.0.1:8553")
#cap1 = cv2.VideoCapture("udp://127.0.0.1:8554")
cap0 = VideoStream("udp://127.0.0.1:8553").start()
cap1 = VideoStream("udp://127.0.0.1:8554").start()


while cap0.isOpened() and cap1.isOpened():
	ret0, frame0 = cap0.read()
	ret1, frame1 = cap1.read()

	# Get current date and time  
	#date_time = str(datetime.datetime.now())
	now=datetime.datetime.now()
	date_time = now.strftime("%H:%M:%S")
	font = cv2.FONT_HERSHEY_SIMPLEX
	
	if ret0 and ret1:
		frame0 =cv2.resize(frame0, (640,480))
		frame1 =cv2.resize(frame1, (640,480))
		Hori = np.concatenate((frame0, frame1), axis=1)
		Hori = cv2.putText(Hori, date_time,(10, 100),font, 1,(210, 155, 155), 4, cv2.LINE_4)
		cv2.imshow('Cam 0&1', Hori)
			
	if cv2.waitKey(1) & 0xFF == ord('q'):
		break

#release capture
cap0.release()
cap1.release()
cv2.destroyAllWindows()
#kill ffmpeg
ssh1_stdin, ssh1_stdout, ssh1_stderr = ssh1.exec_command("killall ffmpeg")
ssh0_stdin, ssh0_stdout, ssh0_stderr = ssh0.exec_command("killall ffmpeg")
#close ssh
ssh0.close()
ssh1.close()

Aplicações

Fontes

Exit mobile version