Site icon AranaCorp

Desenvolvendo um monitor UDP com Python

Neste projeto, vamos criar um monitor de comunicação de rede UDP usando Python (PyQt). Quando se desenvolve um projeto com Arduino, Raspberry Pi ou qualquer outro microcontrolador, é certamente necessário criar uma interface gráfica para gerir o sistema (depurar, observar medições, lançar acções, etc.). Existem muitas ferramentas disponíveis para criar interfaces gráficas. Neste projeto, vamos criar um monitor de comunicação de rede usando PyQt (PySide2).

Objetivo

Para este projeto, queremos criar uma interface gráfica em Python para Windows que se comporte como um monitor de comunicação UDP. Para fazer isso, precisaremos executar as seguintes funções

Recuperar o endereço IP da máquina

Na máquina recetora, obtenha o endereço IPv4 (no nosso caso, 192.168.1.67)

ipconfig #sur la machine windows

ou

ip addr #sur machine linux

Aplicação de monitorização da comunicação UDP

Primeiro, vamos criar a janela da aplicação, a que chamaremos AcApp e que será a base da nossa aplicação.

#!/usr/bin/python3
# -*-coding:Utf-8 -*

import sys,os
#from PyQt5.QtWidgets import *
#from PyQt5.QtCore import *
#from PyQt5.QtGui import *
from PySide2.QtWidgets import *
from PySide2.QtCore import *
from PySide2.QtGui import *
pyqtSignal=Signal #translate pyqt to Pyside

class AcApp(QMainWindow):

    def __init__(self,title='AcApp',mainFrame=QFrame):
        super().__init__()
        self.title=title
        self.mainFrame=mainFrame()
        self.initUI()

    def initUI(self):        
        self.setCentralWidget(self.mainFrame)
       
        #connect signals
        self.mainFrame.debugSignal.connect(self.debugMsg)
       
        #General configuration
        #self.resize(self.width, self.height)
        self.setWindowTitle(self.title)
        self.setGeometry(300, 300, 850, 450)
        #self.setWindowIcon(QIcon(__icon__))
       
        #Debug bar
        self.statusBar()
        self.statusBar().showMessage('Display debug messages')
       
        self.show()

    def debugMsg(self,val):
        self.statusBar().showMessage(val)

def main():
    app = QApplication(sys.argv)
    app.setQuitOnLastWindowClosed(True)
    ex = AcApp(__title__)
    #ex = AcApp(__title__,EthernetInterface)
    app.quit()
    sys.exit(app.exec_())

       
if __name__ == '__main__':
    main()

Criação de widgets de gestão de comunicações UDP

Em seguida, criamos uma classe que contém todos os Widgets necessários para criar e gerir o monitor de comunicação UDP (QLineEdit, QTextEdit, QButton, etc.). Este Widget será inicializado com a classe que vai gerir a comunicação

class EthernetInterface(QFrame):
    debugSignal=pyqtSignal(str) #define debug signal   

    def __init__(self,parent=None):
        super(EthernetInterface,self).__init__(parent)
        self.grid=QGridLayout()
        self.setLayout(self.grid)
        self.defineWidgets()
        self.model=None 
        if self.model is not None: self.model.debugSignal.connect(self.read)
   
    def defineWidgets(self):
        #self.setStyleSheet("""QGroupBox{background-color:white;border: 1px solid green;border-radius: 4px;}
        #QGroupBox::title {padding:1 5px;}""")
               
        #grooupbox widget container
        self.grp=QGroupBox(self)
        self.grp.setTitle("Connection Configuration")
       
        self.fields=QGridLayout()
        self.grp.setLayout(self.fields)
        self.grid.addWidget(self.grp,0,0)
       
        #Define widget UI
        #validator
        ipRange = "(?:[0-1]?[0-9]?[0-9]|2[0-4][0-9]|25[0-5])"   # Part of the regular expression
        # Regulare expression
        ipRegex = QRegExp("^" + ipRange + "\\." + ipRange + "\\." + ipRange + "\\." + ipRange + "$")
        ipValidator = QRegExpValidator(ipRegex, self)  
       
        #label
        self.selectlbl = QLabel("IP Address:")
        self.typeBox = QLineEdit(HOST)
        #self.typeBox.setInputMask("0.0.0.0");
        self.typeBox.setValidator(ipValidator);
       
        self.baudlbl = QLabel("Port:")
        self.baudBox = QLineEdit("{}".format(PORT))
       
        #btn
        self.button = QPushButton("Connect")
        self.button.clicked.connect(self.clicked)
        self.button.clicked.connect(self.connec)
       
        sendBtn = QPushButton("send")
        sendBtn.clicked.connect(self.clicked)
        sendBtn.clicked.connect(self.send)
       
        titlelbl=  QLabel("Enter")
        self.edit = QLineEdit("")
        sentlbl=QLabel("Sent")
        self.sent = QTextEdit("")
        desclbl=QLabel("Console")
        self.desc = QTextEdit("")
           
        #row, column[, rowSpan=1[, columnSpan=1[
        self.fields.addWidget(self.selectlbl,0,0,1,1)
        self.fields.addWidget(self.typeBox,0,1,1,1)
        self.fields.addWidget(self.baudlbl,0,2,1,1)
        self.fields.addWidget(self.baudBox,0,3,1,1)
       
        self.fields.addWidget(self.button,0,4,1,1)
       
        self.fields.addWidget(titlelbl,1,0,1,1)
        self.fields.addWidget(self.edit,1,1,1,3)
        self.fields.addWidget(sendBtn,1,4,1,1)
       
        self.fields.addWidget(sentlbl,2,0,1,1,Qt.AlignTop)#Qt.AlignmentFlag.AlignTop)
        self.fields.addWidget(self.sent,2,1,1,3)  
        self.fields.addWidget(desclbl,3,0,1,1,Qt.AlignTop)#Qt.AlignmentFlag.AlignTop)
        self.fields.addWidget(self.desc,3,1,1,3)      

    def debug(self,msg):
        sender = self.sender()
        self.debugSignal.emit(sender.__class__.__name__+" : "+msg)
       
    def clicked(self):
        sender = self.sender()
        if sender.__class__.__name__=="QPushButton":
            self.debugSignal.emit(sender.text()+ " clicked")
        if sender.__class__.__name__=="QComboBox":
            self.debugSignal.emit(sender.currentText()+ " selected")

    def connec(self):
        #self.desc.setText("")
        self.desc.clear()
        if self.model is not None:
            if self.button.text() == "Connect":
                self.desc.setText(">> trying to connect to address {} on port {} ...".format(self.typeBox.text(),self.baudBox.text()))

                print("Started")
                self.button.setText("Stop")
                #self.model = EthModel()
                self.model.connec(self.typeBox.text(),int(self.baudBox.text()))
                self.model.start()
            else:
                self.model.quit_flag = True
                print("Stop sent")
                self.model.close()#self.model.wait()
                print("Stopped")
                self.button.setText("Connect")
                self.desc.setText(">> deconnect address {} on port {} ...".format(self.typeBox.text(),self.baudBox.text()))
                
   
    def read(self,msg):
        self.desc.setText(self.desc.toPlainText()+msg+"\n")
        self.desc.verticalScrollBar().setValue(self.desc.verticalScrollBar().maximum());
       
                   
    def send(self):
        if self.edit.text() != "":
            self.sent.setText(self.sent.toPlainText()+self.edit.text()+"\n")
           
            if self.model is not None:
                self.model.write(self.edit.text())

Substituir ex = AcApp(__title__) por ex = AcApp(__title__,EthernetInterface) na função main()

Assim que os nossos widgets estiverem no sítio, podemos começar a utilizá-los.

Criar uma QThread para gerir a comunicação UDP

Para não bloquear a GUI quando os pacotes são recebidos ou enviados, vamos criar uma QThread que irá gerir a comunicação UDP

#Ethernet connection
class EthModel(QThread):
    """Handle Ethernet connexion with remote device and connect to interface EthInterface"""
    debugSignal=pyqtSignal(str) #define debug signal   
    def __init__(self):
        super(EthModel, self).__init__()
        self.quit_flag = False
        self.msgToEmit=""

    def run(self):
        while True:
            if not self.quit_flag:
                self.read()
                #time.sleep(1)
            else:
                self.close()
                break

        self.quit()
        #self.wait()
                
    def connec(self,addr='192.168.1.10',port=7):
        # Create a UDP/IP socket
        self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) #worls with esp8266 udp client
        self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
        self.sock.bind((addr,port))
        # Listen for incoming connections
        #self.sock.listen(1) #stream
        
        print('starting up on {} port {}'.format(addr,port))
        self.debugSignal.emit('starting up on {} port {}'.format(addr,port))
        self.quit_flag = False
        self._isRunning=True
	
    def read(self):
        while self._isRunning:
            #data = self.sock.recv(1024)
            try:
                data, addr = self.sock.recvfrom(1024)
                print(data)
            except:
                print("socket closed")
                data=False

            if not data:
                print("no data > break loop")
                break
            #self.sock.sendto("received OK".encode('utf-8'),addr)
            if self.msgToEmit!="":
                self.sock.sendto(self.msgToEmit.encode('utf-8'),addr)
                self.msgToEmit="" #clear message
            self.debugSignal.emit(str(data))


    def write(self,msg):
        self.msgToEmit=msg

    def close(self):
        self._isRunning=False
        self.sock.close()

Código Python do cliente UDP

Para testar a sua interface, pode executar um código no mesmo computador utilizando o endereço 127.0.0.1 (endereço local). O código a seguir criará um socket e enviará o valor de um contador.

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import socket
import time
HOST = "127.0.0.1"
PORT = 8888
bufferSize = 1024



counter=0
while True:
	# Create a UDP socket at client side
	with socket.socket(family=socket.AF_INET, type=socket.SOCK_DGRAM) as client:
		# Send to server using created UDP socket
		client.sendto(str.encode(str(counter)), (HOST,PORT))

		data,addr= client.recvfrom(bufferSize)
		msg = "Message from Server {}".format(data)

		print(msg)
	counter+=1
	time.sleep(0.1)

Código Cliente UDP ESP8266

O código do ESP8266 para a comunicação UDP é bastante simples. Definimos a comunicação em rede utilizando o protocolo UDP e devolvemos o valor dado pela função millis().

#include <ESP8266WiFi.h>
#include <WiFiUdp.h>

WiFiUDP udp;
char packetBuffer[256];
unsigned int localPort = 9999;
const char *ssid = "******";
const char *password = "******";

// Set your Static IP address
IPAddress local_IP(192, 168, 1, 80);
IPAddress gateway(192, 168, 1, 1);
IPAddress subnet(255, 255, 0, 0);

// Set destination IP address
const char *destaddr = "192.168.1.67";
unsigned int destPort = 8888;

void setup() {
  Serial.begin(115200);
  WiFi.config(local_IP, gateway, subnet);
  WiFi.begin(ssid, password);
  while (WiFi.status() != WL_CONNECTED) {
    delay(500);
    Serial.print(".");
  }

  udp.begin(localPort);
  Serial.print(F("UDP Client : ")); Serial.println(WiFi.localIP());
}

void loop() {
  int packetSize = udp.parsePacket();
  Serial.print(" Received packet from : "); Serial.println(udp.remoteIP());
  Serial.print(" Size : "); Serial.println(packetSize);
  Serial.print(" Data : ");
  packetBuffer[0] = '0'; //reset buffer
  if (packetSize) {
    int len = udp.read(packetBuffer, 256);
    Serial.print(packetBuffer);
    
    udp.flush();
  }
  Serial.println("\n");
  delay(500);
  Serial.print("[Client Connected] ");
  Serial.println(WiFi.localIP());
  udp.beginPacket(destaddr, destPort);
  udp.write("Send millis: ");
  char buf[20];
  unsigned long testID = millis();
  sprintf(buf, "%lu", testID);
  Serial.print(" Sent : "); Serial.println(buf);
  udp.write(buf);
  udp.write("\r\n");
  udp.endPacket();
}

N.B.: Para este projeto, utilizamos um esp8266, mas pode adaptar o código a qualquer dispositivo que utilize o protocolo UDP.

Resultados

Para utilizar o código de cliente Python, introduza o endereço IP 127.0.0.1

Para testar a comunicação com o ESP8266, utilize o endereço IP do seu computador (aqui, 192.168.1.67)

Criámos um monitor de comunicação UDP utilizando Python que pode interagir com dispositivos remotos como o ESP8266, ESP32, Raspberry Pi ou outros computadores. Pode agora melhorar a interface de acordo com as suas necessidades

Código completo

#!/usr/bin/python3
# -*-coding:Utf-8 -*
"""
Created on Thu Nov 17 16:59:13 2022

@author: X.Wiedmer

AC windows
Define the application window
"""
import sys,os
#from PyQt5.QtWidgets import *
#from PyQt5.QtCore import *
#from PyQt5.QtGui import *
from PySide2.QtWidgets import *
from PySide2.QtCore import *
from PySide2.QtGui import *
pyqtSignal=Signal #translate pyqt to Pyside

import socket
import time

"""
App configuration
"""
__title__="ACTerminal"
__version__="v0.1"

HOST = '192.168.1.67'
PORT = 8888

#Ethernet connection
class EthModel(QThread):
    """Handle Ethernet connexion with remote device and connect to interface EthInterface"""
    debugSignal=pyqtSignal(str) #define debug signal   
    def __init__(self):
        super(EthModel, self).__init__()
        self.quit_flag = False
        self.msgToEmit=""

    def run(self):
        while True:
            if not self.quit_flag:
                self.read()
                #time.sleep(1)
            else:
                self.close()
                break

        self.quit()
        self.exit()
        #self.wait()
                
    def connec(self,addr='192.168.1.10',port=7):
        # Create a UDP/IP socket
        self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) #worls with esp8266 udp client
        self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
        self.sock.bind((addr,port))
        # Listen for incoming connections
        #self.sock.listen(1) #stream
        
        print('starting up on {} port {}'.format(addr,port))
        self.debugSignal.emit('starting up on {} port {}'.format(addr,port))
        self.quit_flag = False
        self._isRunning=True
	
    def read(self):
        while self._isRunning:
            #data = self.sock.recv(1024)
            try:
                data, addr = self.sock.recvfrom(1024)
                print(data)
            except:
                print("socket closed")
                data=False

            if not data:
                print("no data &gt; break loop")
                break
            #self.sock.sendto("received OK".encode('utf-8'),addr)
            if self.msgToEmit!="":
                self.sock.sendto(self.msgToEmit.encode('utf-8'),addr)
                self.msgToEmit="" #clear message
            self.debugSignal.emit(str(data))


    def write(self,msg):
        self.msgToEmit=msg

    def close(self):
        self._isRunning=False
        self.sock.close()

#define GUI    
class EthernetInterface(QFrame):
    debugSignal=pyqtSignal(str) #define debug signal   

    def __init__(self,parent=None):
        super(EthernetInterface,self).__init__(parent)
        self.grid=QGridLayout()
        self.setLayout(self.grid)
        self.defineWidgets()
        self.model=EthModel()#self.model=None
        self.model.debugSignal.connect(self.read)
   
    def defineWidgets(self):
        #self.setStyleSheet("""QGroupBox{background-color:white;border: 1px solid green;border-radius: 4px;}
        #QGroupBox::title {padding:1 5px;}""")
               
        #grooupbox widget container
        self.grp=QGroupBox(self)
        self.grp.setTitle("Connection Configuration")
       
        self.fields=QGridLayout()
        self.grp.setLayout(self.fields)
        self.grid.addWidget(self.grp,0,0)
       
        #Define widget UI
        #validator
        ipRange = "(?:[0-1]?[0-9]?[0-9]|2[0-4][0-9]|25[0-5])"   # Part of the regular expression
        # Regulare expression
        ipRegex = QRegExp("^" + ipRange + "\\." + ipRange + "\\." + ipRange + "\\." + ipRange + "$")
        ipValidator = QRegExpValidator(ipRegex, self)  
       
        #label
        self.selectlbl = QLabel("IP Address:")
        self.typeBox = QLineEdit(HOST)
        #self.typeBox.setInputMask("0.0.0.0");
        self.typeBox.setValidator(ipValidator);
       
        self.baudlbl = QLabel("Port:")
        self.baudBox = QLineEdit("{}".format(PORT))
       
        #btn
        self.button = QPushButton("Connect")
        self.button.clicked.connect(self.clicked)
        self.button.clicked.connect(self.connec)
       
        sendBtn = QPushButton("send")
        sendBtn.clicked.connect(self.clicked)
        sendBtn.clicked.connect(self.send)
       
        titlelbl=  QLabel("Enter")
        self.edit = QLineEdit("")
        sentlbl=QLabel("Sent")
        self.sent = QTextEdit("")
        desclbl=QLabel("Console")
        self.desc = QTextEdit("")
           
        #row, column[, rowSpan=1[, columnSpan=1[
        self.fields.addWidget(self.selectlbl,0,0,1,1)
        self.fields.addWidget(self.typeBox,0,1,1,1)
        self.fields.addWidget(self.baudlbl,0,2,1,1)
        self.fields.addWidget(self.baudBox,0,3,1,1)
       
        self.fields.addWidget(self.button,0,4,1,1)
       
        self.fields.addWidget(titlelbl,1,0,1,1)
        self.fields.addWidget(self.edit,1,1,1,3)
        self.fields.addWidget(sendBtn,1,4,1,1)
       
        self.fields.addWidget(sentlbl,2,0,1,1,Qt.AlignTop)#Qt.AlignmentFlag.AlignTop)
        self.fields.addWidget(self.sent,2,1,1,3)  
        self.fields.addWidget(desclbl,3,0,1,1,Qt.AlignTop)#Qt.AlignmentFlag.AlignTop)
        self.fields.addWidget(self.desc,3,1,1,3)      

    def debug(self,msg):
        sender = self.sender()
        self.debugSignal.emit(sender.__class__.__name__+" : "+msg)
       
    def clicked(self):
        sender = self.sender()
        if sender.__class__.__name__=="QPushButton":
            self.debugSignal.emit(sender.text()+ " clicked")
        if sender.__class__.__name__=="QComboBox":
            self.debugSignal.emit(sender.currentText()+ " selected")

    def connec(self):
        #self.desc.setText("")
        self.desc.clear()
        if self.model is not None:
            if self.button.text() == "Connect":
                self.desc.setText("&gt;&gt; trying to connect to address {} on port {} ...\n".format(self.typeBox.text(),self.baudBox.text()))

                print("Started")
                self.button.setText("Stop")
                self.model.connec(self.typeBox.text(),int(self.baudBox.text()))
                self.model.start()
            else:
                self.model.quit_flag = True
                print("Stop sent")
                self.model.close()#self.model.wait()
                print("Stopped")
                self.button.setText("Connect")
                self.desc.setText("&gt;&gt; deconnect address {} on port {} ...".format(self.typeBox.text(),self.baudBox.text()))
                
   
    def read(self,msg):
        self.desc.setText(self.desc.toPlainText()+msg+"\n")
        self.desc.verticalScrollBar().setValue(self.desc.verticalScrollBar().maximum());
       
                   
    def send(self):
        if self.edit.text() != "":
            self.sent.setText(self.sent.toPlainText()+self.edit.text()+"\n")
           
            if self.model is not None:
                self.model.write(self.edit.text())

# Generic app container     
class AcApp(QMainWindow):

    def __init__(self,title='AcApp',mainFrame=QFrame):
        super().__init__()
        self.title=title
        self.mainFrame=mainFrame()
        self.initUI()

    def initUI(self):        
        self.setCentralWidget(self.mainFrame)
       
        #connect signals
        self.mainFrame.debugSignal.connect(self.debugMsg)
       
        #General configuration
        #self.resize(self.width, self.height)
        self.setWindowTitle(self.title)
        self.setGeometry(300, 300, 850, 450)
        #self.setWindowIcon(QIcon(__icon__))
       
        #Debug bar
        self.statusBar()
        self.statusBar().showMessage('Display debug messages')
       
        self.show()

    def debugMsg(self,val):
        self.statusBar().showMessage(val)
   


       
def main():
    app = QApplication(sys.argv)
    app.setQuitOnLastWindowClosed(True)
    #app.setStyleSheet(open("style.txt").read()); #set style according to file 
    #ex = AcApp(__title__)
    ex = AcApp(__title__,EthernetInterface)
    app.quit()
    sys.exit(app.exec_())

       
if __name__ == '__main__':
    main()

Fontes

Exit mobile version