Neste projeto, vamos criar um monitor de comunicação de rede UDP usando Python (PyQt). Quando se desenvolve um projeto com Arduino, Raspberry Pi ou qualquer outro microcontrolador, é certamente necessário criar uma interface gráfica para gerir o sistema (depurar, observar medições, lançar acções, etc.). Existem muitas ferramentas disponíveis para criar interfaces gráficas. Neste projeto, vamos criar um monitor de comunicação de rede usando PyQt (PySide2).
Objetivo
Para este projeto, queremos criar uma interface gráfica em Python para Windows que se comporte como um monitor de comunicação UDP. Para fazer isso, precisaremos executar as seguintes funções
- Caixa de introdução do endereço IP
- Zona de entrada no porto
- Botão de ligação
- Área de escrita para encomenda
- Botão Enviar
- Consola que apresenta os dados recebidos
Recuperar o endereço IP da máquina
Na máquina recetora, obtenha o endereço IPv4 (no nosso caso, 192.168.1.67)
ipconfig #sur la machine windows
ou
ip addr #sur machine linux
Aplicação de monitorização da comunicação UDP
Primeiro, vamos criar a janela da aplicação, a que chamaremos AcApp e que será a base da nossa aplicação.
#!/usr/bin/python3 # -*-coding:Utf-8 -* import sys,os #from PyQt5.QtWidgets import * #from PyQt5.QtCore import * #from PyQt5.QtGui import * from PySide2.QtWidgets import * from PySide2.QtCore import * from PySide2.QtGui import * pyqtSignal=Signal #translate pyqt to Pyside class AcApp(QMainWindow): def __init__(self,title='AcApp',mainFrame=QFrame): super().__init__() self.title=title self.mainFrame=mainFrame() self.initUI() def initUI(self): self.setCentralWidget(self.mainFrame) #connect signals self.mainFrame.debugSignal.connect(self.debugMsg) #General configuration #self.resize(self.width, self.height) self.setWindowTitle(self.title) self.setGeometry(300, 300, 850, 450) #self.setWindowIcon(QIcon(__icon__)) #Debug bar self.statusBar() self.statusBar().showMessage('Display debug messages') self.show() def debugMsg(self,val): self.statusBar().showMessage(val) def main(): app = QApplication(sys.argv) app.setQuitOnLastWindowClosed(True) ex = AcApp(__title__) #ex = AcApp(__title__,EthernetInterface) app.quit() sys.exit(app.exec_()) if __name__ == '__main__': main()
Criação de widgets de gestão de comunicações UDP
Em seguida, criamos uma classe que contém todos os Widgets necessários para criar e gerir o monitor de comunicação UDP (QLineEdit, QTextEdit, QButton, etc.). Este Widget será inicializado com a classe que vai gerir a comunicação
class EthernetInterface(QFrame): debugSignal=pyqtSignal(str) #define debug signal def __init__(self,parent=None): super(EthernetInterface,self).__init__(parent) self.grid=QGridLayout() self.setLayout(self.grid) self.defineWidgets() self.model=None if self.model is not None: self.model.debugSignal.connect(self.read) def defineWidgets(self): #self.setStyleSheet("""QGroupBox{background-color:white;border: 1px solid green;border-radius: 4px;} #QGroupBox::title {padding:1 5px;}""") #grooupbox widget container self.grp=QGroupBox(self) self.grp.setTitle("Connection Configuration") self.fields=QGridLayout() self.grp.setLayout(self.fields) self.grid.addWidget(self.grp,0,0) #Define widget UI #validator ipRange = "(?:[0-1]?[0-9]?[0-9]|2[0-4][0-9]|25[0-5])" # Part of the regular expression # Regulare expression ipRegex = QRegExp("^" + ipRange + "\\." + ipRange + "\\." + ipRange + "\\." + ipRange + "$") ipValidator = QRegExpValidator(ipRegex, self) #label self.selectlbl = QLabel("IP Address:") self.typeBox = QLineEdit(HOST) #self.typeBox.setInputMask("0.0.0.0"); self.typeBox.setValidator(ipValidator); self.baudlbl = QLabel("Port:") self.baudBox = QLineEdit("{}".format(PORT)) #btn self.button = QPushButton("Connect") self.button.clicked.connect(self.clicked) self.button.clicked.connect(self.connec) sendBtn = QPushButton("send") sendBtn.clicked.connect(self.clicked) sendBtn.clicked.connect(self.send) titlelbl= QLabel("Enter") self.edit = QLineEdit("") sentlbl=QLabel("Sent") self.sent = QTextEdit("") desclbl=QLabel("Console") self.desc = QTextEdit("") #row, column[, rowSpan=1[, columnSpan=1[ self.fields.addWidget(self.selectlbl,0,0,1,1) self.fields.addWidget(self.typeBox,0,1,1,1) self.fields.addWidget(self.baudlbl,0,2,1,1) self.fields.addWidget(self.baudBox,0,3,1,1) self.fields.addWidget(self.button,0,4,1,1) self.fields.addWidget(titlelbl,1,0,1,1) self.fields.addWidget(self.edit,1,1,1,3) self.fields.addWidget(sendBtn,1,4,1,1) self.fields.addWidget(sentlbl,2,0,1,1,Qt.AlignTop)#Qt.AlignmentFlag.AlignTop) self.fields.addWidget(self.sent,2,1,1,3) self.fields.addWidget(desclbl,3,0,1,1,Qt.AlignTop)#Qt.AlignmentFlag.AlignTop) self.fields.addWidget(self.desc,3,1,1,3) def debug(self,msg): sender = self.sender() self.debugSignal.emit(sender.__class__.__name__+" : "+msg) def clicked(self): sender = self.sender() if sender.__class__.__name__=="QPushButton": self.debugSignal.emit(sender.text()+ " clicked") if sender.__class__.__name__=="QComboBox": self.debugSignal.emit(sender.currentText()+ " selected") def connec(self): #self.desc.setText("") self.desc.clear() if self.model is not None: if self.button.text() == "Connect": self.desc.setText(">> trying to connect to address {} on port {} ...".format(self.typeBox.text(),self.baudBox.text())) print("Started") self.button.setText("Stop") #self.model = EthModel() self.model.connec(self.typeBox.text(),int(self.baudBox.text())) self.model.start() else: self.model.quit_flag = True print("Stop sent") self.model.close()#self.model.wait() print("Stopped") self.button.setText("Connect") self.desc.setText(">> deconnect address {} on port {} ...".format(self.typeBox.text(),self.baudBox.text())) def read(self,msg): self.desc.setText(self.desc.toPlainText()+msg+"\n") self.desc.verticalScrollBar().setValue(self.desc.verticalScrollBar().maximum()); def send(self): if self.edit.text() != "": self.sent.setText(self.sent.toPlainText()+self.edit.text()+"\n") if self.model is not None: self.model.write(self.edit.text())
Substituir ex = AcApp(__title__) por ex = AcApp(__title__,EthernetInterface) na função main()
Assim que os nossos widgets estiverem no sítio, podemos começar a utilizá-los.
Criar uma QThread para gerir a comunicação UDP
Para não bloquear a GUI quando os pacotes são recebidos ou enviados, vamos criar uma QThread que irá gerir a comunicação UDP
#Ethernet connection class EthModel(QThread): """Handle Ethernet connexion with remote device and connect to interface EthInterface""" debugSignal=pyqtSignal(str) #define debug signal def __init__(self): super(EthModel, self).__init__() self.quit_flag = False self.msgToEmit="" def run(self): while True: if not self.quit_flag: self.read() #time.sleep(1) else: self.close() break self.quit() #self.wait() def connec(self,addr='192.168.1.10',port=7): # Create a UDP/IP socket self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) #worls with esp8266 udp client self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) self.sock.bind((addr,port)) # Listen for incoming connections #self.sock.listen(1) #stream print('starting up on {} port {}'.format(addr,port)) self.debugSignal.emit('starting up on {} port {}'.format(addr,port)) self.quit_flag = False self._isRunning=True def read(self): while self._isRunning: #data = self.sock.recv(1024) try: data, addr = self.sock.recvfrom(1024) print(data) except: print("socket closed") data=False if not data: print("no data > break loop") break #self.sock.sendto("received OK".encode('utf-8'),addr) if self.msgToEmit!="": self.sock.sendto(self.msgToEmit.encode('utf-8'),addr) self.msgToEmit="" #clear message self.debugSignal.emit(str(data)) def write(self,msg): self.msgToEmit=msg def close(self): self._isRunning=False self.sock.close()
Código Python do cliente UDP
Para testar a sua interface, pode executar um código no mesmo computador utilizando o endereço 127.0.0.1 (endereço local). O código a seguir criará um socket e enviará o valor de um contador.
#!/usr/bin/env python # -*- coding: utf-8 -*- import socket import time HOST = "127.0.0.1" PORT = 8888 bufferSize = 1024 counter=0 while True: # Create a UDP socket at client side with socket.socket(family=socket.AF_INET, type=socket.SOCK_DGRAM) as client: # Send to server using created UDP socket client.sendto(str.encode(str(counter)), (HOST,PORT)) data,addr= client.recvfrom(bufferSize) msg = "Message from Server {}".format(data) print(msg) counter+=1 time.sleep(0.1)
Código Cliente UDP ESP8266
O código do ESP8266 para a comunicação UDP é bastante simples. Definimos a comunicação em rede utilizando o protocolo UDP e devolvemos o valor dado pela função millis().
#include <ESP8266WiFi.h> #include <WiFiUdp.h> WiFiUDP udp; char packetBuffer[256]; unsigned int localPort = 9999; const char *ssid = "******"; const char *password = "******"; // Set your Static IP address IPAddress local_IP(192, 168, 1, 80); IPAddress gateway(192, 168, 1, 1); IPAddress subnet(255, 255, 0, 0); // Set destination IP address const char *destaddr = "192.168.1.67"; unsigned int destPort = 8888; void setup() { Serial.begin(115200); WiFi.config(local_IP, gateway, subnet); WiFi.begin(ssid, password); while (WiFi.status() != WL_CONNECTED) { delay(500); Serial.print("."); } udp.begin(localPort); Serial.print(F("UDP Client : ")); Serial.println(WiFi.localIP()); } void loop() { int packetSize = udp.parsePacket(); Serial.print(" Received packet from : "); Serial.println(udp.remoteIP()); Serial.print(" Size : "); Serial.println(packetSize); Serial.print(" Data : "); packetBuffer[0] = '0'; //reset buffer if (packetSize) { int len = udp.read(packetBuffer, 256); Serial.print(packetBuffer); udp.flush(); } Serial.println("\n"); delay(500); Serial.print("[Client Connected] "); Serial.println(WiFi.localIP()); udp.beginPacket(destaddr, destPort); udp.write("Send millis: "); char buf[20]; unsigned long testID = millis(); sprintf(buf, "%lu", testID); Serial.print(" Sent : "); Serial.println(buf); udp.write(buf); udp.write("\r\n"); udp.endPacket(); }
N.B.: Para este projeto, utilizamos um esp8266, mas pode adaptar o código a qualquer dispositivo que utilize o protocolo UDP.
Resultados
Para utilizar o código de cliente Python, introduza o endereço IP 127.0.0.1
Para testar a comunicação com o ESP8266, utilize o endereço IP do seu computador (aqui, 192.168.1.67)
Criámos um monitor de comunicação UDP utilizando Python que pode interagir com dispositivos remotos como o ESP8266, ESP32, Raspberry Pi ou outros computadores. Pode agora melhorar a interface de acordo com as suas necessidades
Código completo
#!/usr/bin/python3 # -*-coding:Utf-8 -* """ Created on Thu Nov 17 16:59:13 2022 @author: X.Wiedmer AC windows Define the application window """ import sys,os #from PyQt5.QtWidgets import * #from PyQt5.QtCore import * #from PyQt5.QtGui import * from PySide2.QtWidgets import * from PySide2.QtCore import * from PySide2.QtGui import * pyqtSignal=Signal #translate pyqt to Pyside import socket import time """ App configuration """ __title__="ACTerminal" __version__="v0.1" HOST = '192.168.1.67' PORT = 8888 #Ethernet connection class EthModel(QThread): """Handle Ethernet connexion with remote device and connect to interface EthInterface""" debugSignal=pyqtSignal(str) #define debug signal def __init__(self): super(EthModel, self).__init__() self.quit_flag = False self.msgToEmit="" def run(self): while True: if not self.quit_flag: self.read() #time.sleep(1) else: self.close() break self.quit() self.exit() #self.wait() def connec(self,addr='192.168.1.10',port=7): # Create a UDP/IP socket self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) #worls with esp8266 udp client self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) self.sock.bind((addr,port)) # Listen for incoming connections #self.sock.listen(1) #stream print('starting up on {} port {}'.format(addr,port)) self.debugSignal.emit('starting up on {} port {}'.format(addr,port)) self.quit_flag = False self._isRunning=True def read(self): while self._isRunning: #data = self.sock.recv(1024) try: data, addr = self.sock.recvfrom(1024) print(data) except: print("socket closed") data=False if not data: print("no data > break loop") break #self.sock.sendto("received OK".encode('utf-8'),addr) if self.msgToEmit!="": self.sock.sendto(self.msgToEmit.encode('utf-8'),addr) self.msgToEmit="" #clear message self.debugSignal.emit(str(data)) def write(self,msg): self.msgToEmit=msg def close(self): self._isRunning=False self.sock.close() #define GUI class EthernetInterface(QFrame): debugSignal=pyqtSignal(str) #define debug signal def __init__(self,parent=None): super(EthernetInterface,self).__init__(parent) self.grid=QGridLayout() self.setLayout(self.grid) self.defineWidgets() self.model=EthModel()#self.model=None self.model.debugSignal.connect(self.read) def defineWidgets(self): #self.setStyleSheet("""QGroupBox{background-color:white;border: 1px solid green;border-radius: 4px;} #QGroupBox::title {padding:1 5px;}""") #grooupbox widget container self.grp=QGroupBox(self) self.grp.setTitle("Connection Configuration") self.fields=QGridLayout() self.grp.setLayout(self.fields) self.grid.addWidget(self.grp,0,0) #Define widget UI #validator ipRange = "(?:[0-1]?[0-9]?[0-9]|2[0-4][0-9]|25[0-5])" # Part of the regular expression # Regulare expression ipRegex = QRegExp("^" + ipRange + "\\." + ipRange + "\\." + ipRange + "\\." + ipRange + "$") ipValidator = QRegExpValidator(ipRegex, self) #label self.selectlbl = QLabel("IP Address:") self.typeBox = QLineEdit(HOST) #self.typeBox.setInputMask("0.0.0.0"); self.typeBox.setValidator(ipValidator); self.baudlbl = QLabel("Port:") self.baudBox = QLineEdit("{}".format(PORT)) #btn self.button = QPushButton("Connect") self.button.clicked.connect(self.clicked) self.button.clicked.connect(self.connec) sendBtn = QPushButton("send") sendBtn.clicked.connect(self.clicked) sendBtn.clicked.connect(self.send) titlelbl= QLabel("Enter") self.edit = QLineEdit("") sentlbl=QLabel("Sent") self.sent = QTextEdit("") desclbl=QLabel("Console") self.desc = QTextEdit("") #row, column[, rowSpan=1[, columnSpan=1[ self.fields.addWidget(self.selectlbl,0,0,1,1) self.fields.addWidget(self.typeBox,0,1,1,1) self.fields.addWidget(self.baudlbl,0,2,1,1) self.fields.addWidget(self.baudBox,0,3,1,1) self.fields.addWidget(self.button,0,4,1,1) self.fields.addWidget(titlelbl,1,0,1,1) self.fields.addWidget(self.edit,1,1,1,3) self.fields.addWidget(sendBtn,1,4,1,1) self.fields.addWidget(sentlbl,2,0,1,1,Qt.AlignTop)#Qt.AlignmentFlag.AlignTop) self.fields.addWidget(self.sent,2,1,1,3) self.fields.addWidget(desclbl,3,0,1,1,Qt.AlignTop)#Qt.AlignmentFlag.AlignTop) self.fields.addWidget(self.desc,3,1,1,3) def debug(self,msg): sender = self.sender() self.debugSignal.emit(sender.__class__.__name__+" : "+msg) def clicked(self): sender = self.sender() if sender.__class__.__name__=="QPushButton": self.debugSignal.emit(sender.text()+ " clicked") if sender.__class__.__name__=="QComboBox": self.debugSignal.emit(sender.currentText()+ " selected") def connec(self): #self.desc.setText("") self.desc.clear() if self.model is not None: if self.button.text() == "Connect": self.desc.setText(">> trying to connect to address {} on port {} ...\n".format(self.typeBox.text(),self.baudBox.text())) print("Started") self.button.setText("Stop") self.model.connec(self.typeBox.text(),int(self.baudBox.text())) self.model.start() else: self.model.quit_flag = True print("Stop sent") self.model.close()#self.model.wait() print("Stopped") self.button.setText("Connect") self.desc.setText(">> deconnect address {} on port {} ...".format(self.typeBox.text(),self.baudBox.text())) def read(self,msg): self.desc.setText(self.desc.toPlainText()+msg+"\n") self.desc.verticalScrollBar().setValue(self.desc.verticalScrollBar().maximum()); def send(self): if self.edit.text() != "": self.sent.setText(self.sent.toPlainText()+self.edit.text()+"\n") if self.model is not None: self.model.write(self.edit.text()) # Generic app container class AcApp(QMainWindow): def __init__(self,title='AcApp',mainFrame=QFrame): super().__init__() self.title=title self.mainFrame=mainFrame() self.initUI() def initUI(self): self.setCentralWidget(self.mainFrame) #connect signals self.mainFrame.debugSignal.connect(self.debugMsg) #General configuration #self.resize(self.width, self.height) self.setWindowTitle(self.title) self.setGeometry(300, 300, 850, 450) #self.setWindowIcon(QIcon(__icon__)) #Debug bar self.statusBar() self.statusBar().showMessage('Display debug messages') self.show() def debugMsg(self,val): self.statusBar().showMessage(val) def main(): app = QApplication(sys.argv) app.setQuitOnLastWindowClosed(True) #app.setStyleSheet(open("style.txt").read()); #set style according to file #ex = AcApp(__title__) ex = AcApp(__title__,EthernetInterface) app.quit() sys.exit(app.exec_()) if __name__ == '__main__': main()